- Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Description
RxJava version: 3.0.5
OS: Android
Hello. Today I discovered an odd behaviour of some of the Observable.window(...) operators. Here are some samples:
Sample 1
val disposable = Observable .create<Int> { emitter -> Log.v("MyTest", "Upstream subscribed") emitter.setCancellable { Log.v("MyTest", "Upstream disposed") } repeat(100) { emitter.onNext(it) Thread.sleep(100L) } } .subscribeOn(Schedulers.io()) .window(50) .subscribe { it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) } Handler().postDelayed(disposable::dispose, 1000L)The output of the code is:
2020-08-03 17:34:45.193 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed 2020-08-03 17:34:50.158 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed 2020-08-03 17:34:50.158 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Window completed If you comment the line it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) then the output is:
2020-08-03 17:35:38.969 11704-11745/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed Concerns:
- In the first case the upstream is disposed after 5 seconds, not after 1 second
- In the second case the upstream is never disposed at all
Sample 2
The window(timeSpan, TimeUnit) operator behaves differently.
val disposable = Observable .create<Int> { emitter -> Log.v("MyTest", "Upstream subscribed") emitter.setCancellable { Log.v("MyTest", "Upstream disposed") } repeat(100) { emitter.onNext(it) Thread.sleep(100L) } } .subscribeOn(Schedulers.io()) .window(5000L, TimeUnit.MILLISECONDS) .subscribe { it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) } Handler().postDelayed(disposable::dispose, 1000L)So the output is:
2020-08-03 17:32:04.738 11310-11361/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed 2020-08-03 17:32:09.736 11310-11360/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed 2020-08-03 17:32:09.736 11310-11360/com.badoo.reaktive.sample.android V/MyTest: Window completed If you comment the line it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) ten the output is:
2020-08-03 17:33:23.535 11427-11480/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed 2020-08-03 17:33:24.536 11427-11427/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed Concerns:
- In the first case the upstream is disposed after 5 seconds, not after 1 second (same as
window(count)) - In the second case the upstream is disposed after 1 second (different from
window(count))
Questions
Is it by design or a bug? I would expect the upstream to be disposed and all active windows completed once the downstream is disposed. Or at least behaviour to be consistent across all window(...) operators.