Skip to content

Conversation

@akarnokd
Copy link
Contributor

@akarnokd akarnokd commented Apr 6, 2017

@smaldini brought this class to my attention and asked me to verify it. There were a couple of problems with the AbstractEmitterSubscriber:

  • Using a shared queue for items, error and completion signals. The problem with this is that if the user is in the business of processing Exceptions as values, the logic may treat it as an error even if it came through onNext.
  • Since onError and onComplete are at most once events, there is no need to queue them up but could be simply saved in a field and read out in the emission side.
  • Now that the terminal events have their own fields, the queue can be simplified to an AtomicReference because this class requests items one-by-one.
  • It is unclear where the onTimeout body would trigger in respect to the regular signals, thus calling emitter directly from it is likely violating the serialization requirement of the emitter. The code now pretends to call onComplete which gets properly serialized with respect to any other signal.
  • In the handler method, if the send method crashed, the exception was not forwarded to the emitter for some reason. I assume the error should be reported since the source is otherwise cancelled and won't send any new item/signal.
  • Changed the executing to a linearizing atomic counter because the original CAS+Set based trampolining is generally hard to verify on not losing request for scheduling.

I also kept the one-by-one nature and the rescheduling logic assuming the intent was not to "cannibalize" the executor if there was plenty of source items available and behave "fairly" with other tasks running on the same executor.

@rstoyanchev
Copy link
Contributor

@akarnokd, first of all thanks for taking the time!

The simplifications around signal related fields make sense.

For onTimeout it comes from the underlying runtime (Tomcat, Jetty, etc) after a period of inactivity, completely independent thread from the Publisher. I did have have a "pretend onComplete" at one point but if we don't handle the timeout in the callback the Servlet container takes action and it's not the desired outcome. ResponseBodyEmitter is synchronized so it should be fine to call terminate() I think.

For the send method crashing it cleans up internally so we don't have to.

Any pointers on why the CAS+set may not be enough? Curious to learn more..

Couple more questions on the run method. It orders error signal checking ahead of getting the item but complete signal is after. It seems like this can re-order error and item signals if onError arrives just when run starts processing an item signal? Why re-order? Also at the end it calls the taskExecutor directly, wouldn't it make sense to call trySchedule after decrementing since that has the added try-catch protection around successful task submission?

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Apr 6, 2017

Another thought on onTimeout -- since pretty much any outcome of run requires obtaining delegating to and hence synchronization on ResponseBodyEmitter, I could protect the entire run with synchronized(emitter){ ... } and that would properly guard against onTimeout completing at the same time without any extra cost to what is already the case.

@akarnokd
Copy link
Contributor Author

akarnokd commented Apr 6, 2017

For onTimeout it comes from the underlying runtime ...

I'll restore the original direct emission

For the send method crashing ...

Will restore the original pattern

Any pointers on why the CAS+set may not be enough? Curious to learn more.

I'm not aware of any studies about this, but all of RxJava 2 and Reactor 3 are written via the getAndIncrement() approach. After thinking more about the code, it is likely correct in this case.

It orders error signal checking ahead of getting the item but complete signal is after.

I guess you wanted to delay errors after items. I'll update the code.

Also at the end it calls the taskExecutor directly ... added try-catch protection

I forgot about that. I'll update the code for that as well.

ResponseBodyEmitter .. synchronized(emitter){ ... }

Isn't ResponseBodyEmitter already synchronized on its methods?

@rstoyanchev
Copy link
Contributor

Thanks! This is now merged.

Isn't ResponseBodyEmitter already synchronized on its methods?

Yes, send could raise an ISE if the emitter was completed but run would catch it and since subscription.cancel() should be idempotent, no harm done in calling terminate() again.

@rstoyanchev rstoyanchev closed this Apr 6, 2017
@akarnokd
Copy link
Contributor Author

akarnokd commented Apr 6, 2017

43eea41#diff-8dc8847c9dbfc64ca96195331492e86eR294

if (this.terminated) {

This change is incorrect because the request() before may result in an item and an onComplete signal and this only picks up the complete signal. The correct order is: read terminated, read element, if terminated and element is null -> signal terminated.

@akarnokd akarnokd deleted the AbstractEmitterSubscriberFix branch April 6, 2017 17:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants