Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stream transactions: test
  • Loading branch information
michele committed Aug 12, 2019
commit 78c510c08dd92fec60f824d2bca54a5ab5c026a7
261 changes: 233 additions & 28 deletions src/test/java/com/arangodb/StreamTransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,23 @@

import com.arangodb.ArangoDB.Builder;
import com.arangodb.entity.*;
import com.arangodb.model.DocumentCreateOptions;
import com.arangodb.model.DocumentReadOptions;
import com.arangodb.model.DocumentReplaceOptions;
import com.arangodb.model.StreamTransactionOptions;
import com.arangodb.model.*;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue;

/**
* @author Mark Vollmary
* @author Michele Rastelli
*/
@RunWith(Parameterized.class)
public class StreamTransactionTest extends BaseTest {
Expand Down Expand Up @@ -152,21 +149,22 @@ public void insertDocumentsWithinStreamTransaction() {
.insertDocuments(Arrays.asList(new BaseDocument(), new BaseDocument(), new BaseDocument()),
new DocumentCreateOptions().streamTransactionId(tx.getId()));

//noinspection OptionalGetWithoutIsPresent
String id1 = txDocs.getDocuments().stream().findFirst().get().getKey();
List<String> keys = txDocs.getDocuments().stream().map(DocumentEntity::getKey).collect(Collectors.toList());

// assert that the document is not found from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocument(id1, BaseDocument.class, null), is(nullValue()));
// assert that the documents are not found from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments(),
is(empty()));

// assert that the document is found from within the tx
// assert that the documents are found from within the tx
assertThat(db.collection(COLLECTION_NAME)
.getDocument(id1, BaseDocument.class, new DocumentReadOptions().streamTransactionId(tx.getId())),
is(notNullValue()));
.getDocuments(keys, BaseDocument.class, new DocumentReadOptions().streamTransactionId(tx.getId()))
.getDocuments(), hasSize(keys.size()));

db.commitStreamTransaction(tx.getId());

// assert that the document is found after commit
assertThat(db.collection(COLLECTION_NAME).getDocument(id1, BaseDocument.class, null), is(notNullValue()));
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments(),
hasSize(keys.size()));
}

@Test
Expand All @@ -185,9 +183,8 @@ public void replaceDocument() {
// replace document from within the tx
doc.getProperties().clear();
doc.addAttribute("test", "bar");
DocumentUpdateEntity<BaseDocument> replacedDoc = db.collection(COLLECTION_NAME)
.replaceDocument(createdDoc.getKey(), doc,
new DocumentReplaceOptions().streamTransactionId(tx.getId()));
db.collection(COLLECTION_NAME).replaceDocument(createdDoc.getKey(), doc,
new DocumentReplaceOptions().streamTransactionId(tx.getId()));

// assert that the document has not been replaced from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocument(createdDoc.getKey(), BaseDocument.class, null)
Expand All @@ -209,40 +206,248 @@ public void replaceDocuments() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

List<String> ids = Arrays.asList("1", "2");
List<BaseDocument> docs = ids.stream().map(BaseDocument::new).peek(doc -> doc.addAttribute("test", "foo"))
.collect(Collectors.toList());
List<BaseDocument> docs = IntStream.range(0, 3).mapToObj(it -> new BaseDocument())
.peek(doc -> doc.addAttribute("test", "foo")).collect(Collectors.toList());

MultiDocumentEntity<DocumentCreateEntity<BaseDocument>> createdDocs = db.collection(COLLECTION_NAME)
.insertDocuments(docs, null);
List<BaseDocument> createdDocs = db.collection(COLLECTION_NAME)
.insertDocuments(docs, new DocumentCreateOptions().returnNew(true)).getDocuments().stream()
.map(DocumentCreateEntity::getNew).collect(Collectors.toList());

List<String> keys = createdDocs.stream().map(BaseDocument::getKey).collect(Collectors.toList());

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

List<BaseDocument> modifiedDocs = docs.stream().peek(doc -> {
List<BaseDocument> modifiedDocs = createdDocs.stream().peek(doc -> {
doc.getProperties().clear();
doc.addAttribute("test", "bar");
}).collect(Collectors.toList());

// replace document from within the tx
MultiDocumentEntity<DocumentUpdateEntity<BaseDocument>> replacedDocs = db.collection(COLLECTION_NAME)
db.collection(COLLECTION_NAME)
.replaceDocuments(modifiedDocs, new DocumentReplaceOptions().streamTransactionId(tx.getId()));

// assert that the documents has not been replaced from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocuments(ids, BaseDocument.class, null).getDocuments().stream()
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments().stream()
.map(it -> ((String) it.getAttribute("test"))).collect(Collectors.toList()), everyItem(is("foo")));

// assert that the document has been replaced from within the tx
assertThat(db.collection(COLLECTION_NAME)
.getDocuments(ids, BaseDocument.class, new DocumentReadOptions().streamTransactionId(tx.getId()))
.getDocuments(keys, BaseDocument.class, new DocumentReadOptions().streamTransactionId(tx.getId()))
.getDocuments().stream().map(it -> ((String) it.getAttribute("test"))).collect(Collectors.toList()),
everyItem(is("bar")));

db.commitStreamTransaction(tx.getId());

// assert that the document has been replaced after commit
assertThat(db.collection(COLLECTION_NAME).getDocuments(ids, BaseDocument.class, null).getDocuments().stream()
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments().stream()
.map(it -> ((String) it.getAttribute("test"))).collect(Collectors.toList()), everyItem(is("bar")));
}

@Test
public void updateDocument() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

BaseDocument doc = new BaseDocument();
doc.addAttribute("test", "foo");

DocumentCreateEntity<BaseDocument> createdDoc = db.collection(COLLECTION_NAME).insertDocument(doc, null);

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

// update document from within the tx
doc.getProperties().clear();
doc.addAttribute("test", "bar");
db.collection(COLLECTION_NAME)
.updateDocument(createdDoc.getKey(), doc, new DocumentUpdateOptions().streamTransactionId(tx.getId()));

// assert that the document has not been updated from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocument(createdDoc.getKey(), BaseDocument.class, null)
.getProperties().get("test"), is("foo"));

// assert that the document has been updated from within the tx
assertThat(db.collection(COLLECTION_NAME).getDocument(createdDoc.getKey(), BaseDocument.class,
new DocumentReadOptions().streamTransactionId(tx.getId())).getProperties().get("test"), is("bar"));

db.commitStreamTransaction(tx.getId());

// assert that the document has been updated after commit
assertThat(db.collection(COLLECTION_NAME).getDocument(createdDoc.getKey(), BaseDocument.class, null)
.getProperties().get("test"), is("bar"));

}

@Test
public void updateDocuments() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

List<BaseDocument> docs = IntStream.range(0, 3).mapToObj(it -> new BaseDocument())
.peek(doc -> doc.addAttribute("test", "foo")).collect(Collectors.toList());

List<BaseDocument> createdDocs = db.collection(COLLECTION_NAME)
.insertDocuments(docs, new DocumentCreateOptions().returnNew(true)).getDocuments().stream()
.map(DocumentCreateEntity::getNew).collect(Collectors.toList());

List<String> keys = createdDocs.stream().map(BaseDocument::getKey).collect(Collectors.toList());

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

List<BaseDocument> modifiedDocs = createdDocs.stream().peek(doc -> {
doc.getProperties().clear();
doc.addAttribute("test", "bar");
}).collect(Collectors.toList());

// update documents from within the tx
db.collection(COLLECTION_NAME)
.updateDocuments(modifiedDocs, new DocumentUpdateOptions().streamTransactionId(tx.getId()));

// assert that the documents have not been updated from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments().stream()
.map(it -> ((String) it.getAttribute("test"))).collect(Collectors.toList()), everyItem(is("foo")));

// assert that the documents have been updated from within the tx
List<String> values = db.collection(COLLECTION_NAME)
.getDocuments(keys, BaseDocument.class, new DocumentReadOptions().streamTransactionId(tx.getId()))
.getDocuments().stream().map(it -> ((String) it.getAttribute("test"))).collect(Collectors.toList());
assertThat(values, everyItem(is("bar")));

db.commitStreamTransaction(tx.getId());

// assert that the document has been updated after commit
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments().stream()
.map(it -> ((String) it.getAttribute("test"))).collect(Collectors.toList()), everyItem(is("bar")));
}

@Test
public void deleteDocument() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

DocumentCreateEntity<BaseDocument> createdDoc = db.collection(COLLECTION_NAME)
.insertDocument(new BaseDocument(), null);

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

// delete document from within the tx
db.collection(COLLECTION_NAME)
.deleteDocument(createdDoc.getKey(), null, new DocumentDeleteOptions().streamTransactionId(tx.getId()));

// assert that the document has not been deleted from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocument(createdDoc.getKey(), BaseDocument.class, null),
is(notNullValue()));

// assert that the document has been deleted from within the tx
assertThat(db.collection(COLLECTION_NAME).getDocument(createdDoc.getKey(), BaseDocument.class,
new DocumentReadOptions().streamTransactionId(tx.getId())), is(nullValue()));

db.commitStreamTransaction(tx.getId());

// assert that the document has been deleted after commit
assertThat(db.collection(COLLECTION_NAME).getDocument(createdDoc.getKey(), BaseDocument.class, null),
is(nullValue()));
}

@Test
public void deleteDocuments() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

List<String> keys = db.collection(COLLECTION_NAME)
.insertDocuments(Arrays.asList(new BaseDocument(), new BaseDocument(), new BaseDocument()), null)
.getDocuments().stream().map(DocumentEntity::getId).collect(Collectors.toList());

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

// delete document from within the tx
db.collection(COLLECTION_NAME)
.deleteDocuments(keys, null, new DocumentDeleteOptions().streamTransactionId(tx.getId()));

// assert that the documents has not been deleted from outside the tx
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments(),
hasSize(keys.size()));

// assert that the document has been deleted from within the tx
assertThat(db.collection(COLLECTION_NAME)
.getDocuments(keys, BaseDocument.class, new DocumentReadOptions().streamTransactionId(tx.getId()))
.getDocuments(), is(empty()));

db.commitStreamTransaction(tx.getId());

// assert that the document has been deleted after commit
assertThat(db.collection(COLLECTION_NAME).getDocuments(keys, BaseDocument.class, null).getDocuments(),
is(empty()));
}

@Test
public void documentExists() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

// insert a document from outside the tx
DocumentCreateEntity<BaseDocument> externalDoc = db.collection(COLLECTION_NAME)
.insertDocument(new BaseDocument(), null);

// assert that the document is not found from within the tx
assertThat(db.collection(COLLECTION_NAME)
.documentExists(externalDoc.getKey(), new DocumentExistsOptions().streamTransactionId(tx.getId())),
is(false));

db.abortStreamTransaction(tx.getId());
}

@Test
public void count() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

Long initialCount = db.collection(COLLECTION_NAME).count().getCount();

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

// insert a document from outside the tx
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null);

// assert that the document is not counted from within the tx
assertThat(db.collection(COLLECTION_NAME).count(new CollectionCountOptions().streamTransactionId(tx.getId()))
.getCount(), is(initialCount));

db.abortStreamTransaction(tx.getId());
}

@Test
public void truncate() {
assumeTrue(requireVersion(3, 5));
assumeTrue(requireStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null);

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions().readCollections(COLLECTION_NAME).writeCollections(COLLECTION_NAME));

// truncate document from within the tx
db.collection(COLLECTION_NAME).truncate(new CollectionTruncateOptions().streamTransactionId(tx.getId()));

// assert that the collection has not been truncated from outside the tx
assertThat(db.collection(COLLECTION_NAME).count().getCount(), is(greaterThan(0L)));

// assert that the collection has been truncated from inside the tx
assertThat(db.collection(COLLECTION_NAME).count(new CollectionCountOptions().streamTransactionId(tx.getId()))
.getCount(), is(0L));

db.commitStreamTransaction(tx.getId());

// assert that the collection has been truncated after commit
assertThat(db.collection(COLLECTION_NAME).count().getCount(), is(0L));
}

}