- Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
In some production code, I noticed that it is possible to create a situation where the retry method that accepts a predicate produces a MissingBackpressureException. However, other retry methods do not produce such an exception.
I've created a small test case here:
https://github.com/dfjones/RxJava/blob/67a18a453616539faf16204637b9399057e0bf8e/src/test/java/rx/RetryBackpressureTest.java
Apologies for this being a non-deterministic test, but on my laptop the testBackpressurePredicateRetry test will reliably fail due to MissingBackpressureException with the trace below.
However, the testBackpressureNormalRetry test will run for 1 minute without producing this exception.
Admittedly, this might be a convoluted setup, but it reflects a situation I had in some production code. Essentially, I have a flow where for each event produced by an Observable, I need to perform some IO with retries. So, I've spread that IO work over a Scheduler. The IO work produces many individual events. In my test case, imagine this inner Observable is performing IO:
AbstractOnSubscribe.create((state) -> state.onNext(2L))Everything in this flow should respond to back-pressure through the use of AbstractOnSubscribe, however I'm running into the issue described here when using retry with a predicate function.
rx.exceptions.MissingBackpressureException at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349) at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:345) at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:328) at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:802) at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76) at rx.internal.operators.OperatorRetryWithPredicate$SourceSubscriber$1$1.onNext(OperatorRetryWithPredicate.java:113) at rx.observables.AbstractOnSubscribe$SubscriptionState.accept(AbstractOnSubscribe.java:535) at rx.observables.AbstractOnSubscribe$SubscriptionProducer.doNext(AbstractOnSubscribe.java:369) at rx.observables.AbstractOnSubscribe$SubscriptionProducer.request(AbstractOnSubscribe.java:340) at rx.Subscriber.setProducer(Subscriber.java:209) at rx.observables.AbstractOnSubscribe.call(AbstractOnSubscribe.java:192) at rx.observables.AbstractOnSubscribe$LambdaOnSubscribe.call(AbstractOnSubscribe.java:275) at rx.Observable.unsafeSubscribe(Observable.java:7689) at rx.internal.operators.OperatorRetryWithPredicate$SourceSubscriber$1.call(OperatorRetryWithPredicate.java:120) at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:80) at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:59) at rx.internal.operators.OperatorRetryWithPredicate$SourceSubscriber.onNext(OperatorRetryWithPredicate.java:77) at rx.internal.operators.OperatorRetryWithPredicate$SourceSubscriber.onNext(OperatorRetryWithPredicate.java:45) at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46) at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable.unsafeSubscribe(Observable.java:7689) at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)