Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/main/java/org/dataloader/CacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
* CacheMap is used by data loaders that use caching promises to values aka {@link CompletableFuture}<V>. A better name for this
* class might have been FutureCache but that is history now.
* <p>
* The default implementation used by the data loader is based on a {@link java.util.LinkedHashMap}.
* The default implementation used by the data loader is based on a {@link java.util.concurrent.ConcurrentHashMap} because
* the data loader code requires the cache to prove atomic writes especially the {@link #putIfAbsentAtomically(Object, CompletableFuture)}
* method.
* <p>
* The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your
* own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is
* returned always.
* <p>
* This is really a cache of completed {@link CompletableFuture}&lt;V&gt; values in memory. It is used, when caching is enabled, to
* give back the same future to any code that may call it. If you need a cache of the underlying values that is possible external to the JVM
Expand All @@ -42,7 +48,7 @@
*/
@PublicSpi
@NullMarked
public interface CacheMap<K, V> {
public interface CacheMap<K,V> {

/**
* Creates a new cache map, using the default implementation that is based on a {@link java.util.LinkedHashMap}.
Expand Down Expand Up @@ -84,14 +90,21 @@ static <K, V> CacheMap<K, V> simpleMap() {
Collection<CompletableFuture<V>> getAll();

/**
* Creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
* Atomically creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
* <p>
* The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your
* own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is
* returned always.
* <p>
* The default implementation of this method uses {@link java.util.concurrent.ConcurrentHashMap} has its implementation so these CAS
* writes work as expected.
*
* @param key the key to cache
* @param value the value to cache
*
* @return the cache map for fluent coding
* @return atomically the previous value for the key or null if the value is not present.
*/
CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value);
@Nullable CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value);

/**
* Deletes the entry with the specified key from the cache map, if it exists.
Expand All @@ -114,7 +127,7 @@ static <K, V> CacheMap<K, V> simpleMap() {
* and intended for testing and debugging.
* If a cache doesn't support it, it can throw an Exception.
*
* @return
* @return the size of the cache
*/
int size();
}
33 changes: 17 additions & 16 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
@Internal
class DataLoaderHelper<K, V> {

static class LoaderQueueEntry<K, V> {
private static class LoaderQueueEntry<K, V> {

final K key;
final CompletableFuture<V> value;
Expand Down Expand Up @@ -155,11 +155,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
try {
CompletableFuture<V> cachedFuture = futureCache.get(cacheKey);
if (cachedFuture != null) {
// We already have a promise for this key, no need to check value cache or queue up load
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
ctx.onDispatched();
cachedFuture.whenComplete(ctx::onCompleted);
return cachedFuture;
// We already have a promise for this key, no need to check value cache or queue this load
return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
}
} catch (Exception ignored) {
}
Expand All @@ -170,11 +167,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
if (futureCachingEnabled) {
CompletableFuture<V> cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture);
if (cachedFuture != null) {
// another thread was faster and created a matching CF ... hence this is really a cachehit and we are done
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
ctx.onDispatched();
cachedFuture.whenComplete(ctx::onCompleted);
return cachedFuture;
// another thread was faster and created a matching CF ... hence this is really a cache hit and we are done
return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
}
}
addEntryToLoaderQueue(key, loadCallFuture, loadContext);
Expand All @@ -186,12 +180,9 @@ CompletableFuture<V> load(K key, Object loadContext) {
CompletableFuture<V> cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture);
if (cachedFuture != null) {
// another thread was faster and the loader was invoked twice with the same key
// we are disregarding the resul of our dispatch call and use the already cached value
// we are disregarding the result of our dispatch call and use the already cached value
// meaning this is a cache hit and we are done
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
ctx.onDispatched();
cachedFuture.whenComplete(ctx::onCompleted);
return cachedFuture;
return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
}
}
}
Expand All @@ -201,6 +192,13 @@ CompletableFuture<V> load(K key, Object loadContext) {
return loadCallFuture;
}

private CompletableFuture<V> incrementCacheHitAndReturnCF(DataLoaderInstrumentationContext<Object> ctx, K key, Object loadContext, CompletableFuture<V> cachedFuture) {
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
ctx.onDispatched();
cachedFuture.whenComplete(ctx::onCompleted);
return cachedFuture;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was common in 3 spots


private void addEntryToLoaderQueue(K key, CompletableFuture<V> future, Object loadContext) {
while (true) {
LoaderQueueEntry<K, V> prev = loaderQueue.get();
Expand All @@ -223,6 +221,7 @@ Object getCacheKeyWithContext(K key, Object context) {
loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key;
}

@SuppressWarnings("unchecked")
DispatchResult<V> dispatch() {
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));

Expand All @@ -232,6 +231,8 @@ DispatchResult<V> dispatch() {
while (true) {
loaderQueueEntryHead = loaderQueue.get();
if (loaderQueue.compareAndSet(loaderQueueEntryHead, null)) {
// one or more threads competed and this one won. This thread holds
// the loader queue root in the local variable loaderQueueEntryHead
break;
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/dataloader/impl/DefaultCacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.dataloader.CacheMap;
import org.dataloader.annotations.Internal;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,6 +34,7 @@
* @author <a href="https://github.com/aschrijver/">Arnold Schrijver</a>
*/
@Internal
@NullMarked
public class DefaultCacheMap<K, V> implements CacheMap<K, V> {

private final ConcurrentHashMap<K, CompletableFuture<V>> cache;
Expand All @@ -56,7 +59,7 @@ public boolean containsKey(K key) {
* {@inheritDoc}
*/
@Override
public CompletableFuture<V> get(K key) {
public @Nullable CompletableFuture<V> get(K key) {
return cache.get(key);
}

Expand All @@ -72,7 +75,7 @@ public Collection<CompletableFuture<V>> getAll() {
* {@inheritDoc}
*/
@Override
public CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value) {
public @Nullable CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value) {
return cache.putIfAbsent(key, value);
}

Expand Down
32 changes: 27 additions & 5 deletions src/test/java/org/dataloader/DataLoaderCacheMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
/**
* Tests for cacheMap functionality..
*/
@SuppressWarnings("NullableProblems")
public class DataLoaderCacheMapTest {

private <T> BatchLoader<T, T> keysAsValues() {
Expand All @@ -24,12 +25,33 @@ private <T> BatchLoader<T, T> keysAsValues() {
public void should_provide_all_futures_from_cache() {
DataLoader<Integer, Integer> dataLoader = newDataLoader(keysAsValues());

dataLoader.load(1);
dataLoader.load(2);
dataLoader.load(1);
CompletableFuture<Integer> cf1 = dataLoader.load(1);
CompletableFuture<Integer> cf2 = dataLoader.load(2);
CompletableFuture<Integer> cf3 = dataLoader.load(3);

CacheMap<Object, Integer> cacheMap = dataLoader.getCacheMap();
Collection<CompletableFuture<Integer>> futures = cacheMap.getAll();
assertThat(futures.size(), equalTo(3));


assertThat(cacheMap.get(1), equalTo(cf1));
assertThat(cacheMap.get(2), equalTo(cf2));
assertThat(cacheMap.get(3), equalTo(cf3));
assertThat(cacheMap.containsKey(1), equalTo(true));
assertThat(cacheMap.containsKey(2), equalTo(true));
assertThat(cacheMap.containsKey(3), equalTo(true));
assertThat(cacheMap.containsKey(4), equalTo(false));

cacheMap.delete(1);
assertThat(cacheMap.containsKey(1), equalTo(false));
assertThat(cacheMap.containsKey(2), equalTo(true));

cacheMap.clear();
assertThat(cacheMap.containsKey(1), equalTo(false));
assertThat(cacheMap.containsKey(2), equalTo(false));
assertThat(cacheMap.containsKey(3), equalTo(false));
assertThat(cacheMap.containsKey(4), equalTo(false));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some more tests for coverage reasons


Collection<CompletableFuture<Integer>> futures = dataLoader.getCacheMap().getAll();
assertThat(futures.size(), equalTo(2));
}

@Test
Expand Down