Skip to content

Commit 34391d3

Browse files
committed
Merge remote-tracking branch 'origin/master' into fixed-delegate-load-handling
# Conflicts: # src/main/java/org/dataloader/DataLoader.java
2 parents 2f3fa1e + 77bac05 commit 34391d3

File tree

5 files changed

+170
-104
lines changed

5 files changed

+170
-104
lines changed

src/main/java/org/dataloader/DataLoader.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.util.Map;
3636
import java.util.Optional;
3737
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.locks.Lock;
39+
import java.util.concurrent.locks.ReentrantLock;
3840
import java.util.function.BiConsumer;
3941
import java.util.function.Consumer;
4042

@@ -77,6 +79,7 @@ public class DataLoader<K, V extends @Nullable Object> {
7779
private final ValueCache<K, V> valueCache;
7880
private final DataLoaderOptions options;
7981
private final Object batchLoadFunction;
82+
final Lock lock;
8083

8184
@VisibleForTesting
8285
DataLoader(@Nullable String name, Object batchLoadFunction, @Nullable DataLoaderOptions options) {
@@ -93,7 +96,7 @@ public class DataLoader<K, V extends @Nullable Object> {
9396
this.batchLoadFunction = nonNull(batchLoadFunction);
9497
this.options = loaderOptions;
9598
this.name = name;
96-
99+
this.lock = new ReentrantLock();
97100
this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.valueCache, this.stats, clock);
98101
}
99102

@@ -265,18 +268,16 @@ public CompletableFuture<List<V>> loadMany(List<K> keys, List<Object> keyContext
265268
nonNull(keys);
266269
nonNull(keyContexts);
267270

268-
synchronized (this) {
269-
List<CompletableFuture<V>> collect = new ArrayList<>(keys.size());
270-
for (int i = 0; i < keys.size(); i++) {
271-
K key = keys.get(i);
272-
Object keyContext = null;
273-
if (i < keyContexts.size()) {
274-
keyContext = keyContexts.get(i);
275-
}
276-
collect.add(loadImpl(key, keyContext));
271+
List<CompletableFuture<V>> collect = new ArrayList<>(keys.size());
272+
for (int i = 0; i < keys.size(); i++) {
273+
K key = keys.get(i);
274+
Object keyContext = null;
275+
if (i < keyContexts.size()) {
276+
keyContext = keyContexts.get(i);
277277
}
278-
return CompletableFutureKit.allOf(collect);
278+
collect.add(loadImpl(key, keyContext));
279279
}
280+
return CompletableFutureKit.allOf(collect);
280281
}
281282

282283
/**
@@ -296,15 +297,13 @@ public CompletableFuture<List<V>> loadMany(List<K> keys, List<Object> keyContext
296297
public CompletableFuture<Map<K, V>> loadMany(Map<K, ?> keysAndContexts) {
297298
nonNull(keysAndContexts);
298299

299-
synchronized (this) {
300-
Map<K, CompletableFuture<V>> collect = new HashMap<>(keysAndContexts.size());
301-
for (Map.Entry<K, ?> entry : keysAndContexts.entrySet()) {
302-
K key = entry.getKey();
303-
Object keyContext = entry.getValue();
304-
collect.put(key, loadImpl(key, keyContext));
305-
}
306-
return CompletableFutureKit.allOf(collect);
300+
Map<K, CompletableFuture<V>> collect = new HashMap<>(keysAndContexts.size());
301+
for (Map.Entry<K, ?> entry : keysAndContexts.entrySet()) {
302+
K key = entry.getKey();
303+
Object keyContext = entry.getValue();
304+
collect.put(key, loadImpl(key, keyContext));
307305
}
306+
return CompletableFutureKit.allOf(collect);
308307
}
309308

310309
/**
@@ -380,9 +379,12 @@ public DataLoader<K, V> clear(K key) {
380379
*/
381380
public DataLoader<K, V> clear(K key, BiConsumer<Void, Throwable> handler) {
382381
Object cacheKey = getCacheKey(key);
383-
synchronized (this) {
382+
try {
383+
lock.lock();
384384
futureCache.delete(cacheKey);
385385
valueCache.delete(key).whenComplete(handler);
386+
} finally {
387+
lock.unlock();
386388
}
387389
return this;
388390
}
@@ -404,9 +406,12 @@ public DataLoader<K, V> clearAll() {
404406
* @return the data loader for fluent coding
405407
*/
406408
public DataLoader<K, V> clearAll(BiConsumer<Void, Throwable> handler) {
407-
synchronized (this) {
409+
try {
410+
lock.lock();
408411
futureCache.clear();
409412
valueCache.clear().whenComplete(handler);
413+
} finally {
414+
lock.unlock();
410415
}
411416
return this;
412417
}
@@ -446,10 +451,13 @@ public DataLoader<K, V> prime(K key, Exception error) {
446451
*/
447452
public DataLoader<K, V> prime(K key, CompletableFuture<V> value) {
448453
Object cacheKey = getCacheKey(key);
449-
synchronized (this) {
454+
try {
455+
lock.lock();
450456
if (!futureCache.containsKey(cacheKey)) {
451457
futureCache.set(cacheKey, value);
452458
}
459+
} finally {
460+
lock.unlock();
453461
}
454462
return this;
455463
}

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.CompletionException;
2929
import java.util.concurrent.CompletionStage;
3030
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.Lock;
3132

3233
import static java.util.Collections.emptyList;
3334
import static java.util.Collections.singletonList;
@@ -82,6 +83,7 @@ Object getCallContext() {
8283
private final StatisticsCollector stats;
8384
private final Clock clock;
8485
private final AtomicReference<Instant> lastDispatchTime;
86+
private final Lock lock;
8587

8688
DataLoaderHelper(DataLoader<K, V> dataLoader,
8789
Object batchLoadFunction,
@@ -95,6 +97,7 @@ Object getCallContext() {
9597
this.loaderOptions = loaderOptions;
9698
this.futureCache = futureCache;
9799
this.valueCache = valueCache;
100+
this.lock = dataLoader.lock;
98101
this.loaderQueue = new ArrayList<>();
99102
this.stats = stats;
100103
this.clock = clock;
@@ -111,7 +114,8 @@ public Instant getLastDispatchTime() {
111114
}
112115

113116
Optional<CompletableFuture<V>> getIfPresent(K key) {
114-
synchronized (dataLoader) {
117+
try {
118+
lock.lock();
115119
boolean cachingEnabled = loaderOptions.cachingEnabled();
116120
if (cachingEnabled) {
117121
Object cacheKey = getCacheKey(nonNull(key));
@@ -124,26 +128,35 @@ Optional<CompletableFuture<V>> getIfPresent(K key) {
124128
} catch (Exception ignored) {
125129
}
126130
}
131+
} finally {
132+
lock.unlock();
127133
}
128134
return Optional.empty();
129135
}
130136

131137
Optional<CompletableFuture<V>> getIfCompleted(K key) {
132-
synchronized (dataLoader) {
138+
try {
139+
lock.lock();
140+
133141
Optional<CompletableFuture<V>> cachedPromise = getIfPresent(key);
134142
if (cachedPromise.isPresent()) {
135143
CompletableFuture<V> promise = cachedPromise.get();
136144
if (promise.isDone()) {
137145
return cachedPromise;
138146
}
139147
}
148+
} finally {
149+
lock.unlock();
140150
}
141151
return Optional.empty();
142152
}
143153

144154

155+
@GuardedBy("lock")
145156
CompletableFuture<V> load(K key, Object loadContext) {
146-
synchronized (dataLoader) {
157+
try {
158+
lock.lock();
159+
147160
boolean batchingEnabled = loaderOptions.batchingEnabled();
148161
boolean cachingEnabled = loaderOptions.cachingEnabled();
149162

@@ -158,6 +171,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
158171
ctx.onDispatched();
159172
cf.whenComplete(ctx::onCompleted);
160173
return cf;
174+
} finally {
175+
lock.unlock();
161176
}
162177
}
163178

@@ -173,14 +188,17 @@ Object getCacheKeyWithContext(K key, Object context) {
173188
loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key;
174189
}
175190

191+
@GuardedBy("lock")
176192
DispatchResult<V> dispatch() {
177193
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));
178194

179195
boolean batchingEnabled = loaderOptions.batchingEnabled();
180196
final List<K> keys;
181197
final List<Object> callContexts;
182198
final List<CompletableFuture<V>> queuedFutures;
183-
synchronized (dataLoader) {
199+
try {
200+
lock.lock();
201+
184202
int queueSize = loaderQueue.size();
185203
if (queueSize == 0) {
186204
lastDispatchTime.set(now());
@@ -200,6 +218,8 @@ DispatchResult<V> dispatch() {
200218
});
201219
loaderQueue.clear();
202220
lastDispatchTime.set(now());
221+
} finally {
222+
lock.unlock();
203223
}
204224
if (!batchingEnabled) {
205225
instrCtx.onDispatched();
@@ -334,7 +354,7 @@ private void possiblyClearCacheEntriesOnExceptions(List<K> keys) {
334354
}
335355
}
336356

337-
@GuardedBy("dataLoader")
357+
@GuardedBy("lock")
338358
private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean batchingEnabled) {
339359
final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext);
340360

@@ -353,7 +373,7 @@ private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean ba
353373
return loadCallFuture;
354374
}
355375

356-
@GuardedBy("dataLoader")
376+
@GuardedBy("lock")
357377
private CompletableFuture<V> queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, boolean cachingEnabled) {
358378
if (batchingEnabled) {
359379
CompletableFuture<V> loadCallFuture = new CompletableFuture<>();
@@ -606,8 +626,11 @@ private DataLoaderInstrumentation instrumentation() {
606626
}
607627

608628
int dispatchDepth() {
609-
synchronized (dataLoader) {
629+
try {
630+
lock.lock();
610631
return loaderQueue.size();
632+
} finally {
633+
lock.unlock();
611634
}
612635
}
613636

src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import java.util.List;
1111
import java.util.concurrent.CompletableFuture;
1212
import java.util.concurrent.CompletionException;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReentrantLock;
1315

1416
import static org.dataloader.impl.Assertions.assertState;
1517

@@ -25,6 +27,7 @@ abstract class AbstractBatchSubscriber<K, V, T> implements Subscriber<T> {
2527
final List<Object> callContexts;
2628
final List<CompletableFuture<V>> queuedFutures;
2729
final ReactiveSupport.HelperIntegration<K> helperIntegration;
30+
final Lock lock = new ReentrantLock();
2831

2932
List<K> clearCacheKeys = new ArrayList<>();
3033
List<V> completedValues = new ArrayList<>();

src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,58 +29,74 @@ class BatchSubscriberImpl<K, V> extends AbstractBatchSubscriber<K, V, V> {
2929
super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration);
3030
}
3131

32-
// onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
32+
// onNext may be called by multiple threads - for the time being, we use a ReentrantLock to guarantee
3333
// correctness (at the cost of speed).
3434
@Override
35-
public synchronized void onNext(V value) {
36-
super.onNext(value);
35+
public void onNext(V value) {
36+
try {
37+
lock.lock();
3738

38-
if (idx >= keys.size()) {
39-
// hang on they have given us more values than we asked for in keys
40-
// we cant handle this
41-
return;
42-
}
43-
K key = keys.get(idx);
44-
Object callContext = callContexts.get(idx);
45-
CompletableFuture<V> future = queuedFutures.get(idx);
46-
onNextValue(key, value, callContext, List.of(future));
39+
super.onNext(value);
40+
41+
if (idx >= keys.size()) {
42+
// hang on they have given us more values than we asked for in keys
43+
// we cant handle this
44+
return;
45+
}
46+
K key = keys.get(idx);
47+
Object callContext = callContexts.get(idx);
48+
CompletableFuture<V> future = queuedFutures.get(idx);
49+
onNextValue(key, value, callContext, List.of(future));
4750

48-
completedValues.add(value);
49-
idx++;
51+
completedValues.add(value);
52+
idx++;
53+
} finally {
54+
lock.unlock();
55+
}
5056
}
5157

5258

5359
@Override
54-
public synchronized void onComplete() {
55-
super.onComplete();
56-
if (keys.size() != completedValues.size()) {
57-
// we have more or less values than promised
58-
// we will go through all the outstanding promises and mark those that
59-
// have not finished as failed
60-
for (CompletableFuture<V> queuedFuture : queuedFutures) {
61-
if (!queuedFuture.isDone()) {
62-
queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
60+
public void onComplete() {
61+
try {
62+
lock.lock();
63+
super.onComplete();
64+
if (keys.size() != completedValues.size()) {
65+
// we have more or less values than promised
66+
// we will go through all the outstanding promises and mark those that
67+
// have not finished as failed
68+
for (CompletableFuture<V> queuedFuture : queuedFutures) {
69+
if (!queuedFuture.isDone()) {
70+
queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
71+
}
6372
}
6473
}
74+
possiblyClearCacheEntriesOnExceptions();
75+
valuesFuture.complete(completedValues);
76+
} finally {
77+
lock.unlock();
6578
}
66-
possiblyClearCacheEntriesOnExceptions();
67-
valuesFuture.complete(completedValues);
6879
}
6980

7081
@Override
71-
public synchronized void onError(Throwable ex) {
72-
super.onError(ex);
73-
ex = unwrapThrowable(ex);
74-
// Set the remaining keys to the exception.
75-
for (int i = idx; i < queuedFutures.size(); i++) {
76-
K key = keys.get(i);
77-
CompletableFuture<V> future = queuedFutures.get(i);
78-
if (!future.isDone()) {
79-
future.completeExceptionally(ex);
80-
// clear any cached view of this key because it failed
81-
helperIntegration.clearCacheView(key);
82+
public void onError(Throwable ex) {
83+
try {
84+
lock.lock();
85+
super.onError(ex);
86+
ex = unwrapThrowable(ex);
87+
// Set the remaining keys to the exception.
88+
for (int i = idx; i < queuedFutures.size(); i++) {
89+
K key = keys.get(i);
90+
CompletableFuture<V> future = queuedFutures.get(i);
91+
if (!future.isDone()) {
92+
future.completeExceptionally(ex);
93+
// clear any cached view of this key because it failed
94+
helperIntegration.clearCacheView(key);
95+
}
8296
}
97+
valuesFuture.completeExceptionally(ex);
98+
} finally {
99+
lock.unlock();
83100
}
84-
valuesFuture.completeExceptionally(ex);
85101
}
86102
}

0 commit comments

Comments
 (0)