33import org .dataloader .annotations .GuardedBy ;
44import org .dataloader .annotations .Internal ;
55import org .dataloader .impl .CompletableFutureKit ;
6- import org .dataloader .impl .DataLoaderAssertionException ;
6+ import org .dataloader .reactive .HelperIntegration ;
7+ import org .dataloader .reactive .DataLoaderMapEntrySubscriber ;
8+ import org .dataloader .reactive .DataLoaderSubscriber ;
79import org .dataloader .scheduler .BatchLoaderScheduler ;
810import org .dataloader .stats .StatisticsCollector ;
911import org .dataloader .stats .context .IncrementBatchLoadCountByStatisticsContext ;
1214import org .dataloader .stats .context .IncrementLoadCountStatisticsContext ;
1315import org .dataloader .stats .context .IncrementLoadErrorCountStatisticsContext ;
1416import org .reactivestreams .Subscriber ;
15- import org .reactivestreams .Subscription ;
1617
1718import java .time .Clock ;
1819import java .time .Instant ;
1920import java .util .ArrayList ;
2021import java .util .Collection ;
21- import java .util .HashMap ;
2222import java .util .LinkedHashSet ;
2323import java .util .List ;
2424import java .util .Map ;
@@ -510,7 +510,7 @@ private CompletableFuture<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoade
510510
511511 private CompletableFuture <List <V >> invokeBatchPublisher (List <K > keys , List <Object > keyContexts , List <CompletableFuture <V >> queuedFutures , BatchLoaderEnvironment environment ) {
512512 CompletableFuture <List <V >> loadResult = new CompletableFuture <>();
513- Subscriber <V > subscriber = new DataLoaderSubscriber (loadResult , keys , keyContexts , queuedFutures );
513+ Subscriber <V > subscriber = new DataLoaderSubscriber <> (loadResult , keys , keyContexts , queuedFutures , helperIntegration () );
514514
515515 BatchLoaderScheduler batchLoaderScheduler = loaderOptions .getBatchLoaderScheduler ();
516516 if (batchLoadFunction instanceof BatchPublisherWithContext ) {
@@ -535,9 +535,28 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec
535535 return loadResult ;
536536 }
537537
538+ private HelperIntegration <K > helperIntegration () {
539+ return new HelperIntegration <>() {
540+ @ Override
541+ public StatisticsCollector getStats () {
542+ return stats ;
543+ }
544+
545+ @ Override
546+ public void clearCacheView (K key ) {
547+ dataLoader .clear (key );
548+ }
549+
550+ @ Override
551+ public void clearCacheEntriesOnExceptions (List <K > keys ) {
552+ possiblyClearCacheEntriesOnExceptions (keys );
553+ }
554+ };
555+ }
556+
538557 private CompletableFuture <List <V >> invokeMappedBatchPublisher (List <K > keys , List <Object > keyContexts , List <CompletableFuture <V >> queuedFutures , BatchLoaderEnvironment environment ) {
539558 CompletableFuture <List <V >> loadResult = new CompletableFuture <>();
540- Subscriber <Map .Entry <K , V >> subscriber = new DataLoaderMapEntrySubscriber (loadResult , keys , keyContexts , queuedFutures );
559+ Subscriber <Map .Entry <K , V >> subscriber = new DataLoaderMapEntrySubscriber <> (loadResult , keys , keyContexts , queuedFutures , helperIntegration () );
541560
542561 BatchLoaderScheduler batchLoaderScheduler = loaderOptions .getBatchLoaderScheduler ();
543562 if (batchLoadFunction instanceof MappedBatchPublisherWithContext ) {
@@ -625,246 +644,4 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
625644 return (DispatchResult <T >) EMPTY_DISPATCH_RESULT ;
626645 }
627646
628- /**********************************************************************************************
629- * ********************************************************************************************
630- * <p>
631- * The reactive support classes start here
632- *
633- * @param <T> for two
634- **********************************************************************************************
635- **********************************************************************************************
636- */
637- private abstract class DataLoaderSubscriberBase <T > implements Subscriber <T > {
638-
639- final CompletableFuture <List <V >> valuesFuture ;
640- final List <K > keys ;
641- final List <Object > callContexts ;
642- final List <CompletableFuture <V >> queuedFutures ;
643-
644- List <K > clearCacheKeys = new ArrayList <>();
645- List <V > completedValues = new ArrayList <>();
646- boolean onErrorCalled = false ;
647- boolean onCompleteCalled = false ;
648-
649- DataLoaderSubscriberBase (
650- CompletableFuture <List <V >> valuesFuture ,
651- List <K > keys ,
652- List <Object > callContexts ,
653- List <CompletableFuture <V >> queuedFutures
654- ) {
655- this .valuesFuture = valuesFuture ;
656- this .keys = keys ;
657- this .callContexts = callContexts ;
658- this .queuedFutures = queuedFutures ;
659- }
660-
661- @ Override
662- public void onSubscribe (Subscription subscription ) {
663- subscription .request (keys .size ());
664- }
665-
666- @ Override
667- public void onNext (T v ) {
668- assertState (!onErrorCalled , () -> "onError has already been called; onNext may not be invoked." );
669- assertState (!onCompleteCalled , () -> "onComplete has already been called; onNext may not be invoked." );
670- }
671-
672- @ Override
673- public void onComplete () {
674- assertState (!onErrorCalled , () -> "onError has already been called; onComplete may not be invoked." );
675- onCompleteCalled = true ;
676- }
677-
678- @ Override
679- public void onError (Throwable throwable ) {
680- assertState (!onCompleteCalled , () -> "onComplete has already been called; onError may not be invoked." );
681- onErrorCalled = true ;
682-
683- stats .incrementBatchLoadExceptionCount (new IncrementBatchLoadExceptionCountStatisticsContext <>(keys , callContexts ));
684- }
685-
686- /*
687- * A value has arrived - how do we complete the future that's associated with it in a common way
688- */
689- void onNextValue (K key , V value , Object callContext , List <CompletableFuture <V >> futures ) {
690- if (value instanceof Try ) {
691- // we allow the batch loader to return a Try so we can better represent a computation
692- // that might have worked or not.
693- //noinspection unchecked
694- Try <V > tryValue = (Try <V >) value ;
695- if (tryValue .isSuccess ()) {
696- futures .forEach (f -> f .complete (tryValue .get ()));
697- } else {
698- stats .incrementLoadErrorCount (new IncrementLoadErrorCountStatisticsContext <>(key , callContext ));
699- futures .forEach (f -> f .completeExceptionally (tryValue .getThrowable ()));
700- clearCacheKeys .add (key );
701- }
702- } else {
703- futures .forEach (f -> f .complete (value ));
704- }
705- }
706-
707- Throwable unwrapThrowable (Throwable ex ) {
708- if (ex instanceof CompletionException ) {
709- ex = ex .getCause ();
710- }
711- return ex ;
712- }
713- }
714-
715- private class DataLoaderSubscriber extends DataLoaderSubscriberBase <V > {
716-
717- private int idx = 0 ;
718-
719- private DataLoaderSubscriber (
720- CompletableFuture <List <V >> valuesFuture ,
721- List <K > keys ,
722- List <Object > callContexts ,
723- List <CompletableFuture <V >> queuedFutures
724- ) {
725- super (valuesFuture , keys , callContexts , queuedFutures );
726- }
727-
728- // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
729- // correctness (at the cost of speed).
730- @ Override
731- public synchronized void onNext (V value ) {
732- super .onNext (value );
733-
734- if (idx >= keys .size ()) {
735- // hang on they have given us more values than we asked for in keys
736- // we cant handle this
737- return ;
738- }
739- K key = keys .get (idx );
740- Object callContext = callContexts .get (idx );
741- CompletableFuture <V > future = queuedFutures .get (idx );
742- onNextValue (key , value , callContext , List .of (future ));
743-
744- completedValues .add (value );
745- idx ++;
746- }
747-
748-
749- @ Override
750- public synchronized void onComplete () {
751- super .onComplete ();
752- if (keys .size () != completedValues .size ()) {
753- // we have more or less values than promised
754- // we will go through all the outstanding promises and mark those that
755- // have not finished as failed
756- for (CompletableFuture <V > queuedFuture : queuedFutures ) {
757- if (!queuedFuture .isDone ()) {
758- queuedFuture .completeExceptionally (new DataLoaderAssertionException ("The size of the promised values MUST be the same size as the key list" ));
759- }
760- }
761- }
762- possiblyClearCacheEntriesOnExceptions (clearCacheKeys );
763- valuesFuture .complete (completedValues );
764- }
765-
766- @ Override
767- public synchronized void onError (Throwable ex ) {
768- super .onError (ex );
769- ex = unwrapThrowable (ex );
770- // Set the remaining keys to the exception.
771- for (int i = idx ; i < queuedFutures .size (); i ++) {
772- K key = keys .get (i );
773- CompletableFuture <V > future = queuedFutures .get (i );
774- if (! future .isDone ()) {
775- future .completeExceptionally (ex );
776- // clear any cached view of this key because it failed
777- dataLoader .clear (key );
778- }
779- }
780- valuesFuture .completeExceptionally (ex );
781- }
782-
783- }
784-
785- private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase <Map .Entry <K , V >> {
786-
787- private final Map <K , Object > callContextByKey ;
788- private final Map <K , List <CompletableFuture <V >>> queuedFuturesByKey ;
789- private final Map <K , V > completedValuesByKey = new HashMap <>();
790-
791-
792- private DataLoaderMapEntrySubscriber (
793- CompletableFuture <List <V >> valuesFuture ,
794- List <K > keys ,
795- List <Object > callContexts ,
796- List <CompletableFuture <V >> queuedFutures
797- ) {
798- super (valuesFuture , keys , callContexts , queuedFutures );
799- this .callContextByKey = new HashMap <>();
800- this .queuedFuturesByKey = new HashMap <>();
801- for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
802- K key = keys .get (idx );
803- Object callContext = callContexts .get (idx );
804- CompletableFuture <V > queuedFuture = queuedFutures .get (idx );
805- callContextByKey .put (key , callContext );
806- queuedFuturesByKey .computeIfAbsent (key , k -> new ArrayList <>()).add (queuedFuture );
807- }
808- }
809-
810-
811- @ Override
812- public synchronized void onNext (Map .Entry <K , V > entry ) {
813- super .onNext (entry );
814- K key = entry .getKey ();
815- V value = entry .getValue ();
816-
817- Object callContext = callContextByKey .get (key );
818- List <CompletableFuture <V >> futures = queuedFuturesByKey .getOrDefault (key , List .of ());
819-
820- onNextValue (key , value , callContext , futures );
821-
822- // did we have an actual key for this value - ignore it if they send us one outside the key set
823- if (!futures .isEmpty ()) {
824- completedValuesByKey .put (key , value );
825- }
826- }
827-
828- @ Override
829- public synchronized void onComplete () {
830- super .onComplete ();
831-
832- possiblyClearCacheEntriesOnExceptions (clearCacheKeys );
833- List <V > values = new ArrayList <>(keys .size ());
834- for (K key : keys ) {
835- V value = completedValuesByKey .get (key );
836- values .add (value );
837-
838- List <CompletableFuture <V >> futures = queuedFuturesByKey .getOrDefault (key , List .of ());
839- for (CompletableFuture <V > future : futures ) {
840- if (! future .isDone ()) {
841- // we have a future that never came back for that key
842- // but the publisher is done sending in data - it must be null
843- // e.g. for key X when found no value
844- future .complete (null );
845- }
846- }
847- }
848- valuesFuture .complete (values );
849- }
850-
851- @ Override
852- public synchronized void onError (Throwable ex ) {
853- super .onError (ex );
854- ex = unwrapThrowable (ex );
855- // Complete the futures for the remaining keys with the exception.
856- for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
857- K key = keys .get (idx );
858- List <CompletableFuture <V >> futures = queuedFuturesByKey .get (key );
859- if (!completedValuesByKey .containsKey (key )) {
860- for (CompletableFuture <V > future : futures ) {
861- future .completeExceptionally (ex );
862- }
863- // clear any cached view of this key because they all failed
864- dataLoader .clear (key );
865- }
866- }
867- valuesFuture .completeExceptionally (ex );
868- }
869- }
870647}
0 commit comments