webmvc: fix AbstractEmitterSubscriber #1377
Closed
Add this suggestion to a batch that can be applied as a single commit. This suggestion is invalid because no changes were made to the code. Suggestions cannot be applied while the pull request is closed. Suggestions cannot be applied while viewing a subset of changes. Only one suggestion per line can be applied in a batch. Add this suggestion to a batch that can be applied as a single commit. Applying suggestions on deleted lines is not supported. You must change the existing code in this line in order to create a valid suggestion. Outdated suggestions cannot be applied. This suggestion has been applied or marked resolved. Suggestions cannot be applied from pending reviews. Suggestions cannot be applied on multi-line comments. Suggestions cannot be applied while the pull request is queued to merge. Suggestion cannot be applied right now. Please check back later.
@smaldini brought this class to my attention and asked me to verify it. There were a couple of problems with the
AbstractEmitterSubscriber:Exceptions as values, the logic may treat it as an error even if it came throughonNext.onErrorandonCompleteare at most once events, there is no need to queue them up but could be simply saved in a field and read out in the emission side.AtomicReferencebecause this class requests items one-by-one.onTimeoutbody would trigger in respect to the regular signals, thus callingemitterdirectly from it is likely violating the serialization requirement of the emitter. The code now pretends to callonCompletewhich gets properly serialized with respect to any other signal.sendmethod crashed, the exception was not forwarded to the emitter for some reason. I assume the error should be reported since the source is otherwise cancelled and won't send any new item/signal.executingto a linearizing atomic counter because the original CAS+Set based trampolining is generally hard to verify on not losing request for scheduling.I also kept the one-by-one nature and the rescheduling logic assuming the intent was not to "cannibalize" the executor if there was plenty of source items available and behave "fairly" with other tasks running on the same executor.