Skip to content

3.x: Observable.window() operators do not dispose upstream while there is an active window #7048

@arkivanov

Description

@arkivanov

RxJava version: 3.0.5
OS: Android

Hello. Today I discovered an odd behaviour of some of the Observable.window(...) operators. Here are some samples:

Sample 1

 val disposable = Observable .create<Int> { emitter -> Log.v("MyTest", "Upstream subscribed") emitter.setCancellable { Log.v("MyTest", "Upstream disposed") } repeat(100) { emitter.onNext(it) Thread.sleep(100L) } } .subscribeOn(Schedulers.io()) .window(50) .subscribe { it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) } Handler().postDelayed(disposable::dispose, 1000L)

The output of the code is:

2020-08-03 17:34:45.193 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed 2020-08-03 17:34:50.158 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed 2020-08-03 17:34:50.158 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Window completed 

If you comment the line it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) then the output is:

2020-08-03 17:35:38.969 11704-11745/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed 

Concerns:

  1. In the first case the upstream is disposed after 5 seconds, not after 1 second
  2. In the second case the upstream is never disposed at all

Sample 2
The window(timeSpan, TimeUnit) operator behaves differently.

 val disposable = Observable .create<Int> { emitter -> Log.v("MyTest", "Upstream subscribed") emitter.setCancellable { Log.v("MyTest", "Upstream disposed") } repeat(100) { emitter.onNext(it) Thread.sleep(100L) } } .subscribeOn(Schedulers.io()) .window(5000L, TimeUnit.MILLISECONDS) .subscribe { it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) } Handler().postDelayed(disposable::dispose, 1000L)

So the output is:

2020-08-03 17:32:04.738 11310-11361/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed 2020-08-03 17:32:09.736 11310-11360/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed 2020-08-03 17:32:09.736 11310-11360/com.badoo.reaktive.sample.android V/MyTest: Window completed 

If you comment the line it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) ten the output is:

2020-08-03 17:33:23.535 11427-11480/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed 2020-08-03 17:33:24.536 11427-11427/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed 

Concerns:

  1. In the first case the upstream is disposed after 5 seconds, not after 1 second (same as window(count))
  2. In the second case the upstream is disposed after 1 second (different from window(count))

Questions
Is it by design or a bug? I would expect the upstream to be disposed and all active windows completed once the downstream is disposed. Or at least behaviour to be consistent across all window(...) operators.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions