Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/org/dataloader/BatchLoaderEnvironment.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.dataloader;

import org.dataloader.impl.Assertions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Objects.nonNull;

/**
* This object is passed to a batch loader as calling context. It could contain security credentials
* of the calling users for example or database parameters that allow the data layer call to succeed.
Expand Down Expand Up @@ -78,8 +78,8 @@ public Builder context(Object context) {
}

public <K> Builder keyContexts(List<K> keys, List<Object> keyContexts) {
nonNull(keys);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I believe the intent was to throw NPE, correct?

nonNull(keyContexts);
Assertions.nonNull(keys);
Assertions.nonNull(keyContexts);

Map<Object, Object> map = new HashMap<>();
List<Object> list = new ArrayList<>();
Expand Down
13 changes: 4 additions & 9 deletions src/main/java/org/dataloader/DataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
public class DataLoader<K, V> {

private final DataLoaderHelper<K, V> helper;
private final DataLoaderOptions loaderOptions;
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final StatisticsCollector stats;

Expand Down Expand Up @@ -246,7 +245,6 @@ public static <K, V> DataLoader<K, V> newMappedDataLoaderWithTry(MappedBatchLoad
* @return a new DataLoader
* @see #newDataLoaderWithTry(BatchLoader)
*/
@SuppressWarnings("unchecked")
public static <K, V> DataLoader<K, V> newMappedDataLoaderWithTry(MappedBatchLoader<K, Try<V>> batchLoadFunction, DataLoaderOptions options) {
return new DataLoader<>(batchLoadFunction, options);
}
Expand Down Expand Up @@ -309,7 +307,6 @@ public static <K, V> DataLoader<K, V> newMappedDataLoaderWithTry(MappedBatchLoad
* @return a new DataLoader
* @see #newDataLoaderWithTry(BatchLoader)
*/
@SuppressWarnings("unchecked")
public static <K, V> DataLoader<K, V> newMappedDataLoaderWithTry(MappedBatchLoaderWithContext<K, Try<V>> batchLoadFunction, DataLoaderOptions options) {
return new DataLoader<>(batchLoadFunction, options);
}
Expand All @@ -334,12 +331,12 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
}

private DataLoader(Object batchLoadFunction, DataLoaderOptions options) {
this.loaderOptions = options == null ? new DataLoaderOptions() : options;
DataLoaderOptions loaderOptions = options == null ? new DataLoaderOptions() : options;
this.futureCache = determineCacheMap(loaderOptions);
// order of keys matter in data loader
this.stats = nonNull(this.loaderOptions.getStatisticsCollector());
this.stats = nonNull(loaderOptions.getStatisticsCollector());

this.helper = new DataLoaderHelper<>(this, batchLoadFunction, this.loaderOptions, this.futureCache, this.stats);
this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.stats);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -496,10 +493,8 @@ public DispatchResult<V> dispatchWithCounts() {
* @return the list of all results when the {@link #dispatchDepth()} reached 0
*/
public List<V> dispatchAndJoin() {
List<V> results = new ArrayList<>();

List<V> joinedResults = dispatch().join();
results.addAll(joinedResults);
List<V> results = new ArrayList<>(joinedResults);
while (this.dispatchDepth() > 0) {
joinedResults = dispatch().join();
results.addAll(joinedResults);
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
class DataLoaderHelper<K, V> {


class LoaderQueueEntry<K, V> {
static class LoaderQueueEntry<K, V> {

final K key;
final V value;
Expand Down Expand Up @@ -151,7 +151,7 @@ DispatchResult<V> dispatch() {
loaderQueue.clear();
}
if (!batchingEnabled || keys.isEmpty()) {
return new DispatchResult<V>(CompletableFuture.completedFuture(emptyList()), 0);
return new DispatchResult<>(CompletableFuture.completedFuture(emptyList()), 0);
}
final int totalEntriesHandled = keys.size();
//
Expand All @@ -172,7 +172,7 @@ DispatchResult<V> dispatch() {
} else {
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
}
return new DispatchResult<V>(futureList, totalEntriesHandled);
return new DispatchResult<>(futureList, totalEntriesHandled);
}

private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
Expand All @@ -194,7 +194,7 @@ private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<
}
//
// now reassemble all the futures into one that is the complete set of results
return CompletableFuture.allOf(allBatches.toArray(new CompletableFuture[allBatches.size()]))
return CompletableFuture.allOf(allBatches.toArray(new CompletableFuture[0]))
.thenApply(v -> allBatches.stream()
.map(CompletableFuture::join)
.flatMap(Collection::stream)
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/org/dataloader/impl/CompletableFutureKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ public static <V> CompletableFuture<V> failedFuture(Exception e) {
return future;
}

public static Throwable cause(CompletableFuture completableFuture) {
public static <V> Throwable cause(CompletableFuture<V> completableFuture) {
if (!completableFuture.isCompletedExceptionally()) {
return null;
}
try {
completableFuture.get();
return null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return e;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Expand All @@ -38,11 +39,11 @@ public static Throwable cause(CompletableFuture completableFuture) {
}
}

public static boolean succeeded(CompletableFuture future) {
public static <V> boolean succeeded(CompletableFuture<V> future) {
return future.isDone() && !future.isCompletedExceptionally();
}

public static boolean failed(CompletableFuture future) {
public static <V> boolean failed(CompletableFuture<V> future) {
return future.isDone() && future.isCompletedExceptionally();
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/impl/DefaultCacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@Internal
public class DefaultCacheMap<U, V> implements CacheMap<U, V> {

private Map<U, V> cache;
private final Map<U, V> cache;

/**
* Default constructor
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/org/dataloader/impl/PromisedValuesImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ public class PromisedValuesImpl<T> implements PromisedValues<T> {

private final List<? extends CompletionStage<T>> futures;
private final CompletionStage<Void> controller;
private AtomicReference<Throwable> cause;
private final AtomicReference<Throwable> cause;

private PromisedValuesImpl(List<? extends CompletionStage<T>> cs) {
this.futures = nonNull(cs);
this.cause = new AtomicReference<>();
List<CompletableFuture> cfs = cs.stream().map(CompletionStage::toCompletableFuture).collect(Collectors.toList());
CompletableFuture[] futuresArray = cfs.toArray(new CompletableFuture[cfs.size()]);
CompletableFuture[] futuresArray = cs.stream().map(CompletionStage::toCompletableFuture).toArray(CompletableFuture[]::new);
this.controller = CompletableFuture.allOf(futuresArray).handle((result, throwable) -> {
setCause(throwable);
return null;
Expand Down Expand Up @@ -104,7 +103,6 @@ public Throwable cause(int index) {
}

@Override
@SuppressWarnings("unchecked")
public T get(int index) {
assertState(isDone(), "The PromisedValues MUST be complete before calling the get() method");
try {
Expand Down