Skip to content

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Jun 19, 2019

This PR changes the connectable API to have a specific reset method to reset a terminated connectable source as part of the official API.

In 2.x, when publish() terminated, it reset itself to a fresh state which could lead to late consumers not receiving events as there might be no one to call connect() again (see #6501). However, replay() did not reset itself, thus late consumers got the cached events, however, a reconnect started the sequence and new consumers may have missed items.

In 3.x, this two corner cases have been fixed by the introduction of reset(). Both publish and replay now remain in their terminated state until reset is called. If the connection is disposed, it will automatically reset their state just like before. The state transitions are as follows:

  1. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> reset() -> fresh
  2. fresh -> connect() -> running -> dispose() -> fresh
  3. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> dispose() -> fresh
  4. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> connect() -> running

This does resolve the race condition with publish().refCount() described in #6501.

In addition, there are some changes to Flowable.publish()'s behavior:

It no longer keeps consuming the upstream if there are no subscribers. This implies if the source terminates while there are unconsumed items in the internal buffer, those will be available for observation.
I have no strong preference on this property and in comparison, Observable.publish drops items because there is no backpressure buffer in its implementation.

Upstream errors are not reported to the RxJavaPlugins.onError handler when if there are no subscribers but have to be observed via a subscriber.
Because terminal events are available until reset now, we can't know really if there is going to be a subscriber or not. However, it might be possible to detect the no-consumer case upon an error and still report it when reset or dispose is called.

Resolves #5628
Resolves #5899

@akarnokd akarnokd added this to the 3.0 milestone Jun 19, 2019
@akarnokd akarnokd changed the title 3.x: ConnectableFlowable/ConnetableFlowable redesign 3.x: ConnectableFlowable/ConnetableObservabe redesign Jun 19, 2019
@codecov
Copy link

codecov bot commented Jun 19, 2019

Codecov Report

Merging #6519 into 3.x will decrease coverage by 0.02%.
The diff coverage is 95.35%.

Impacted file tree graph

@@ Coverage Diff @@ ## 3.x #6519 +/- ## =========================================== - Coverage 98.2% 98.17% -0.03%  - Complexity 6245 6247 +2  =========================================== Files 675 675 Lines 45051 45011 -40 Branches 6225 6211 -14 =========================================== - Hits 44241 44189 -52  - Misses 275 283 +8  - Partials 535 539 +4
Impacted Files Coverage Δ Complexity Δ
...o/reactivex/observables/ConnectableObservable.java 100% <ø> (ø) 12 <0> (ø) ⬇️
...va/io/reactivex/flowables/ConnectableFlowable.java 100% <ø> (ø) 12 <0> (ø) ⬇️
src/main/java/io/reactivex/Flowable.java 100% <100%> (ø) 565 <0> (ø) ⬇️
...ternal/operators/observable/ObservablePublish.java 100% <100%> (+5.3%) 18 <18> (+7) ⬆️
src/main/java/io/reactivex/Observable.java 100% <100%> (ø) 540 <1> (ø) ⬇️
...ernal/operators/observable/ObservableRefCount.java 100% <100%> (ø) 24 <0> (-4) ⬇️
.../internal/operators/flowable/FlowableRefCount.java 100% <100%> (ø) 24 <0> (-4) ⬇️
...nternal/operators/observable/ObservableReplay.java 98.37% <40%> (-0.26%) 20 <1> (ø)
...ex/internal/operators/flowable/FlowableReplay.java 94.35% <40%> (-0.59%) 20 <1> (ø)
...x/internal/operators/flowable/FlowablePublish.java 96.55% <96.32%> (+0.34%) 18 <17> (+7) ⬆️
... and 33 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 583c4e7...36a8733. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

1 participant