33import org .dataloader .annotations .GuardedBy ;
44import org .dataloader .annotations .Internal ;
55import org .dataloader .impl .CompletableFutureKit ;
6- import org .dataloader .impl . DataLoaderAssertionException ;
6+ import org .dataloader .reactive . ReactiveSupport ;
77import org .dataloader .scheduler .BatchLoaderScheduler ;
88import org .dataloader .stats .StatisticsCollector ;
99import org .dataloader .stats .context .IncrementBatchLoadCountByStatisticsContext ;
1212import org .dataloader .stats .context .IncrementLoadCountStatisticsContext ;
1313import org .dataloader .stats .context .IncrementLoadErrorCountStatisticsContext ;
1414import org .reactivestreams .Subscriber ;
15- import org .reactivestreams .Subscription ;
1615
1716import java .time .Clock ;
1817import java .time .Instant ;
1918import java .util .ArrayList ;
2019import java .util .Collection ;
21- import java .util .HashMap ;
2220import java .util .LinkedHashSet ;
2321import java .util .List ;
2422import java .util .Map ;
@@ -510,7 +508,7 @@ private CompletableFuture<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoade
510508
511509 private CompletableFuture <List <V >> invokeBatchPublisher (List <K > keys , List <Object > keyContexts , List <CompletableFuture <V >> queuedFutures , BatchLoaderEnvironment environment ) {
512510 CompletableFuture <List <V >> loadResult = new CompletableFuture <>();
513- Subscriber <V > subscriber = new DataLoaderSubscriber (loadResult , keys , keyContexts , queuedFutures );
511+ Subscriber <V > subscriber = ReactiveSupport . batchSubscriber (loadResult , keys , keyContexts , queuedFutures , helperIntegration () );
514512
515513 BatchLoaderScheduler batchLoaderScheduler = loaderOptions .getBatchLoaderScheduler ();
516514 if (batchLoadFunction instanceof BatchPublisherWithContext ) {
@@ -537,7 +535,7 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec
537535
538536 private CompletableFuture <List <V >> invokeMappedBatchPublisher (List <K > keys , List <Object > keyContexts , List <CompletableFuture <V >> queuedFutures , BatchLoaderEnvironment environment ) {
539537 CompletableFuture <List <V >> loadResult = new CompletableFuture <>();
540- Subscriber <Map .Entry <K , V >> subscriber = new DataLoaderMapEntrySubscriber (loadResult , keys , keyContexts , queuedFutures );
538+ Subscriber <Map .Entry <K , V >> subscriber = ReactiveSupport . mappedBatchSubscriber (loadResult , keys , keyContexts , queuedFutures , helperIntegration () );
541539
542540 BatchLoaderScheduler batchLoaderScheduler = loaderOptions .getBatchLoaderScheduler ();
543541 if (batchLoadFunction instanceof MappedBatchPublisherWithContext ) {
@@ -625,246 +623,22 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
625623 return (DispatchResult <T >) EMPTY_DISPATCH_RESULT ;
626624 }
627625
628- /**********************************************************************************************
629- * ********************************************************************************************
630- * <p>
631- * The reactive support classes start here
632- *
633- * @param <T> for two
634- **********************************************************************************************
635- **********************************************************************************************
636- */
637- private abstract class DataLoaderSubscriberBase <T > implements Subscriber <T > {
638-
639- final CompletableFuture <List <V >> valuesFuture ;
640- final List <K > keys ;
641- final List <Object > callContexts ;
642- final List <CompletableFuture <V >> queuedFutures ;
643-
644- List <K > clearCacheKeys = new ArrayList <>();
645- List <V > completedValues = new ArrayList <>();
646- boolean onErrorCalled = false ;
647- boolean onCompleteCalled = false ;
648-
649- DataLoaderSubscriberBase (
650- CompletableFuture <List <V >> valuesFuture ,
651- List <K > keys ,
652- List <Object > callContexts ,
653- List <CompletableFuture <V >> queuedFutures
654- ) {
655- this .valuesFuture = valuesFuture ;
656- this .keys = keys ;
657- this .callContexts = callContexts ;
658- this .queuedFutures = queuedFutures ;
659- }
660-
661- @ Override
662- public void onSubscribe (Subscription subscription ) {
663- subscription .request (keys .size ());
664- }
665-
666- @ Override
667- public void onNext (T v ) {
668- assertState (!onErrorCalled , () -> "onError has already been called; onNext may not be invoked." );
669- assertState (!onCompleteCalled , () -> "onComplete has already been called; onNext may not be invoked." );
670- }
671-
672- @ Override
673- public void onComplete () {
674- assertState (!onErrorCalled , () -> "onError has already been called; onComplete may not be invoked." );
675- onCompleteCalled = true ;
676- }
677-
678- @ Override
679- public void onError (Throwable throwable ) {
680- assertState (!onCompleteCalled , () -> "onComplete has already been called; onError may not be invoked." );
681- onErrorCalled = true ;
682-
683- stats .incrementBatchLoadExceptionCount (new IncrementBatchLoadExceptionCountStatisticsContext <>(keys , callContexts ));
684- }
685-
686- /*
687- * A value has arrived - how do we complete the future that's associated with it in a common way
688- */
689- void onNextValue (K key , V value , Object callContext , List <CompletableFuture <V >> futures ) {
690- if (value instanceof Try ) {
691- // we allow the batch loader to return a Try so we can better represent a computation
692- // that might have worked or not.
693- //noinspection unchecked
694- Try <V > tryValue = (Try <V >) value ;
695- if (tryValue .isSuccess ()) {
696- futures .forEach (f -> f .complete (tryValue .get ()));
697- } else {
698- stats .incrementLoadErrorCount (new IncrementLoadErrorCountStatisticsContext <>(key , callContext ));
699- futures .forEach (f -> f .completeExceptionally (tryValue .getThrowable ()));
700- clearCacheKeys .add (key );
701- }
702- } else {
703- futures .forEach (f -> f .complete (value ));
626+ private ReactiveSupport .HelperIntegration <K > helperIntegration () {
627+ return new ReactiveSupport .HelperIntegration <>() {
628+ @ Override
629+ public StatisticsCollector getStats () {
630+ return stats ;
704631 }
705- }
706632
707- Throwable unwrapThrowable ( Throwable ex ) {
708- if ( ex instanceof CompletionException ) {
709- ex = ex . getCause ( );
633+ @ Override
634+ public void clearCacheView ( K key ) {
635+ dataLoader . clear ( key );
710636 }
711- return ex ;
712- }
713- }
714-
715- private class DataLoaderSubscriber extends DataLoaderSubscriberBase <V > {
716-
717- private int idx = 0 ;
718-
719- private DataLoaderSubscriber (
720- CompletableFuture <List <V >> valuesFuture ,
721- List <K > keys ,
722- List <Object > callContexts ,
723- List <CompletableFuture <V >> queuedFutures
724- ) {
725- super (valuesFuture , keys , callContexts , queuedFutures );
726- }
727637
728- // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
729- // correctness (at the cost of speed).
730- @ Override
731- public synchronized void onNext (V value ) {
732- super .onNext (value );
733-
734- if (idx >= keys .size ()) {
735- // hang on they have given us more values than we asked for in keys
736- // we cant handle this
737- return ;
738- }
739- K key = keys .get (idx );
740- Object callContext = callContexts .get (idx );
741- CompletableFuture <V > future = queuedFutures .get (idx );
742- onNextValue (key , value , callContext , List .of (future ));
743-
744- completedValues .add (value );
745- idx ++;
746- }
747-
748-
749- @ Override
750- public synchronized void onComplete () {
751- super .onComplete ();
752- if (keys .size () != completedValues .size ()) {
753- // we have more or less values than promised
754- // we will go through all the outstanding promises and mark those that
755- // have not finished as failed
756- for (CompletableFuture <V > queuedFuture : queuedFutures ) {
757- if (!queuedFuture .isDone ()) {
758- queuedFuture .completeExceptionally (new DataLoaderAssertionException ("The size of the promised values MUST be the same size as the key list" ));
759- }
760- }
638+ @ Override
639+ public void clearCacheEntriesOnExceptions (List <K > keys ) {
640+ possiblyClearCacheEntriesOnExceptions (keys );
761641 }
762- possiblyClearCacheEntriesOnExceptions (clearCacheKeys );
763- valuesFuture .complete (completedValues );
764- }
765-
766- @ Override
767- public synchronized void onError (Throwable ex ) {
768- super .onError (ex );
769- ex = unwrapThrowable (ex );
770- // Set the remaining keys to the exception.
771- for (int i = idx ; i < queuedFutures .size (); i ++) {
772- K key = keys .get (i );
773- CompletableFuture <V > future = queuedFutures .get (i );
774- if (! future .isDone ()) {
775- future .completeExceptionally (ex );
776- // clear any cached view of this key because it failed
777- dataLoader .clear (key );
778- }
779- }
780- valuesFuture .completeExceptionally (ex );
781- }
782-
783- }
784-
785- private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase <Map .Entry <K , V >> {
786-
787- private final Map <K , Object > callContextByKey ;
788- private final Map <K , List <CompletableFuture <V >>> queuedFuturesByKey ;
789- private final Map <K , V > completedValuesByKey = new HashMap <>();
790-
791-
792- private DataLoaderMapEntrySubscriber (
793- CompletableFuture <List <V >> valuesFuture ,
794- List <K > keys ,
795- List <Object > callContexts ,
796- List <CompletableFuture <V >> queuedFutures
797- ) {
798- super (valuesFuture , keys , callContexts , queuedFutures );
799- this .callContextByKey = new HashMap <>();
800- this .queuedFuturesByKey = new HashMap <>();
801- for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
802- K key = keys .get (idx );
803- Object callContext = callContexts .get (idx );
804- CompletableFuture <V > queuedFuture = queuedFutures .get (idx );
805- callContextByKey .put (key , callContext );
806- queuedFuturesByKey .computeIfAbsent (key , k -> new ArrayList <>()).add (queuedFuture );
807- }
808- }
809-
810-
811- @ Override
812- public synchronized void onNext (Map .Entry <K , V > entry ) {
813- super .onNext (entry );
814- K key = entry .getKey ();
815- V value = entry .getValue ();
816-
817- Object callContext = callContextByKey .get (key );
818- List <CompletableFuture <V >> futures = queuedFuturesByKey .getOrDefault (key , List .of ());
819-
820- onNextValue (key , value , callContext , futures );
821-
822- // did we have an actual key for this value - ignore it if they send us one outside the key set
823- if (!futures .isEmpty ()) {
824- completedValuesByKey .put (key , value );
825- }
826- }
827-
828- @ Override
829- public synchronized void onComplete () {
830- super .onComplete ();
831-
832- possiblyClearCacheEntriesOnExceptions (clearCacheKeys );
833- List <V > values = new ArrayList <>(keys .size ());
834- for (K key : keys ) {
835- V value = completedValuesByKey .get (key );
836- values .add (value );
837-
838- List <CompletableFuture <V >> futures = queuedFuturesByKey .getOrDefault (key , List .of ());
839- for (CompletableFuture <V > future : futures ) {
840- if (! future .isDone ()) {
841- // we have a future that never came back for that key
842- // but the publisher is done sending in data - it must be null
843- // e.g. for key X when found no value
844- future .complete (null );
845- }
846- }
847- }
848- valuesFuture .complete (values );
849- }
850-
851- @ Override
852- public synchronized void onError (Throwable ex ) {
853- super .onError (ex );
854- ex = unwrapThrowable (ex );
855- // Complete the futures for the remaining keys with the exception.
856- for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
857- K key = keys .get (idx );
858- List <CompletableFuture <V >> futures = queuedFuturesByKey .get (key );
859- if (!completedValuesByKey .containsKey (key )) {
860- for (CompletableFuture <V > future : futures ) {
861- future .completeExceptionally (ex );
862- }
863- // clear any cached view of this key because they all failed
864- dataLoader .clear (key );
865- }
866- }
867- valuesFuture .completeExceptionally (ex );
868- }
642+ };
869643 }
870644}
0 commit comments