- Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
I've tried version 3.0.2 and 3.0.4 and both versions have this issue.
So here's the code that's causing the issue:
private Publisher<ShortCode> findAll() { // Yes, this is not just for this code example. I'm actually using Flowable.empty(). return Flowable.<Boolean>empty() .observeOn(Schedulers.io()) .filter(_b -> { System.out.println("111111111111111111111111111 " + (_b == null)); return _b; }) .map(_b -> { System.out.println("2222222222222222222222222222 " + (_b == null)); return _b; }) // The Flowable is always empty, so casting to anything is safe .map(ShortCode.class::cast); }The code causes the following NPE:
java.lang.NullPointerException: The mapper function returned a null value. at java.util.Objects.requireNonNull(Objects.java:228) at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapConditionalSubscriber.tryOnNext(FlowableMap.java:124) The console output is:
2222222222222222222222222222 true So the function inside filter never gets called, which is expected, but then the function inside map gets called with a null element, which is not expected. It seems like the filter has corrupted the source and has made it emit a null element.
And the weird thing is that, if you run that piece of code in a standalone main method, it will NOT cause the NPE.
Also, doing any of the following will stop the NPE from happening:
- Remove the
observeOn. - Move the
observeOnafter thefilter. - Add
.to(Flux::from)after theobserveOn. - Replace
Flowable.<Boolean>empty().observeOn(Schedulers.io())withFlux.<Boolean>empty().publishOn(reactor.core.scheduler.Schedulers.elastic()).
The examples involving Flux are simply to demonstrate that it is not an issue with Reactor Core.
I know that not being able to reproduce the NPE with just that code sample is not terribly helpful, but are there any circumstances where doing an observeOn followed by a filter can corrupt the source?
Thanks!