Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix ReactiveBatchLoad unit tests
  • Loading branch information
reugn committed Sep 16, 2022
commit 48ebb86cc1cb7169bcc95f746ab1e848660ab251
156 changes: 87 additions & 69 deletions src/test/java/com/aerospike/mapper/reactive/ReactiveBatchLoadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.aerospike.mapper.annotations.AerospikeRecord;
import com.aerospike.mapper.annotations.ParamFrom;
import com.aerospike.mapper.tools.ReactiveAeroMapper;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -15,7 +16,12 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertNotNull;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ReactiveBatchLoadTest extends ReactiveAeroMapperBaseTest {
Expand All @@ -24,39 +30,6 @@ public class ReactiveBatchLoadTest extends ReactiveAeroMapperBaseTest {
private final B[] bees = new B[100];
private final A[] as = new A[10];

@AerospikeRecord(namespace = "test", set = "batchB")
public static class B {
@AerospikeKey
public int id;
public String name;

public B(@ParamFrom("id") int id, @ParamFrom("name") String name) {
this.id = id;
this.name = name;
}
}

@AerospikeRecord(namespace = "test", set = "batchA")
public static class A {
@AerospikeKey
public int id;
public String name;
public List<B> data;

public A(int id, String name) {
this.id = id;
this.name = name;
data = new ArrayList<>();
}

public void setBList(List<B> bees) {
data = bees;
}

public A() {
}
}

@BeforeAll
public void populateStaticData() {
for (int i = 0; i < 100; i++) {
Expand All @@ -76,26 +49,28 @@ public void clear() {
client.truncate(null, "test", "batchB", null);
}

@SneakyThrows
private ReactiveAeroMapper populate() {
client.truncate(null, "test", "batchA", null);
client.truncate(null, "test", "batchB", null);
Thread.sleep(1000);

ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();

reactiveMapper.save((Object[]) bees).subscribeOn(Schedulers.parallel()).collectList().block();
reactiveMapper.save((Object[]) as).subscribeOn(Schedulers.parallel()).collectList().block();
reactiveMapper.save((Object[]) bees).subscribeOn(Schedulers.single()).collectList().block();
reactiveMapper.save((Object[]) as).subscribeOn(Schedulers.single()).collectList().block();

return reactiveMapper;
}

@Test
public void testBatchLoad() {
void testBatchLoad() {
ReactiveAeroMapper reactiveMapper = populate();

B resultB = reactiveMapper.read(B.class, bees[1].id).subscribeOn(Schedulers.parallel()).block();
B resultB = reactiveMapper.read(B.class, bees[1].id).subscribeOn(Schedulers.single()).block();
compare(bees[1], resultB);

A resultA = reactiveMapper.read(A.class, as[1].id).subscribeOn(Schedulers.parallel()).block();
A resultA = reactiveMapper.read(A.class, as[1].id).subscribeOn(Schedulers.single()).block();
compare(as[1], resultA);

Integer[] ids = new Integer[6];
Expand All @@ -106,28 +81,26 @@ public void testBatchLoad() {
ids[4] = as[1].id;
ids[5] = 3000;

A[] results = new A[6];
List<A> resultsObjects = reactiveMapper.read(A.class, ids)
.subscribeOn(Schedulers.parallel()).collectList().block();

assert resultsObjects != null;
results = resultsObjects.toArray(results);
compare(results[0], as[4]);
compare(results[1], as[7]);
compare(results[2], as[5]);
compare(results[3], as[0]);
compare(results[4], as[1]);
compare(results[5], null);
List<A> expected = Stream.of(as[4], as[7], as[5], as[0], as[1])
.sorted(Comparator.nullsFirst(A::compareTo))
.collect(Collectors.toList());

List<A> resultsList = reactiveMapper.read(A.class, ids)
.subscribeOn(Schedulers.single()).collectList().block();

assertNotNull(resultsList);
resultsList.sort(Comparator.nullsFirst(A::compareTo));
compare(expected, resultsList);
}

@Test
public void testBatchLoadWithOperations() {
void testBatchLoadWithOperations() {
ReactiveAeroMapper reactiveMapper = populate();

B resultB = reactiveMapper.read(B.class, bees[1].id).subscribeOn(Schedulers.parallel()).block();
B resultB = reactiveMapper.read(B.class, bees[1].id).subscribeOn(Schedulers.single()).block();
compare(bees[1], resultB);

A resultA = reactiveMapper.read(A.class, as[1].id).subscribeOn(Schedulers.parallel()).block();
A resultA = reactiveMapper.read(A.class, as[1].id).subscribeOn(Schedulers.single()).block();
compare(as[1], resultA);

Integer[] userKeys = new Integer[6];
Expand All @@ -142,21 +115,66 @@ public void testBatchLoadWithOperations() {
ops[0] = ListOperation.size(DATA_BIN);
ops[1] = ListOperation.getByIndex(DATA_BIN, -1, ListReturnType.VALUE);

A[] results = new A[6];
List<A> resultsList = reactiveMapper.read(A.class, userKeys, ops).subscribeOn(Schedulers.parallel()).collectList().block();
assert resultsList != null;
results = resultsList.toArray(results);

compare(results[0].data.get(0).id, 10);
compare(results[0].data.get(1).id, as[4].data.get(as[4].data.size() - 1).id);
compare(results[1].data.get(0).id, 10);
compare(results[1].data.get(1).id, as[7].data.get(as[7].data.size() - 1).id);
compare(results[2].data.get(0).id, 10);
compare(results[2].data.get(1).id, as[5].data.get(as[5].data.size() - 1).id);
compare(results[3].data.get(0).id, 10);
compare(results[3].data.get(1).id, as[0].data.get(as[0].data.size() - 1).id);
compare(results[4].data.get(0).id, 10);
compare(results[4].data.get(1).id, as[1].data.get(as[1].data.size() - 1).id);
compare(results[5], null);
List<List<B>> expected = Stream.of(as[4], as[7], as[5], as[0], as[1])
.map(a -> a.data)
.sorted(Comparator.comparing((List<B> o) -> o.get(1)))
.collect(Collectors.toList());

List<List<B>> resultsList = reactiveMapper.read(A.class, userKeys, ops)
.subscribeOn(Schedulers.parallel()).collectList().block()
.stream().map(a -> a.data)
.sorted(Comparator.comparing((List<B> o) -> o.get(1)))
.collect(Collectors.toList());

assertNotNull(resultsList);
for (int i = 0; i < expected.size(); i++) {
compare(resultsList.get(i).get(0).id, 10);
compare(expected.get(i).get(expected.get(i).size() - 1), resultsList.get(i).get(1));
}
}

@AerospikeRecord(namespace = "test", set = "batchB")
public static class B implements Comparable<B> {
@AerospikeKey
public int id;
public String name;

public B(@ParamFrom("id") int id, @ParamFrom("name") String name) {
this.id = id;
this.name = name;
}

@Override
public int compareTo(B o) {
if (o == null) return 1;
return Integer.compare(id, o.id);
}
}

@AerospikeRecord(namespace = "test", set = "batchA")
public static class A implements Comparable<A> {
@AerospikeKey
public int id;
public String name;
public List<B> data;

public A(int id, String name) {
this.id = id;
this.name = name;
data = new ArrayList<>();
}

public A() {
}

public void setBList(List<B> bees) {
data = bees;
}

@Override
public int compareTo(A o) {
if (o == null) return 1;
return Integer.compare(id, o.id);
}
}
}