Skip to content
Prev Previous commit
Next Next commit
streamTx: vertex operations
  • Loading branch information
michele committed Sep 3, 2019
commit 7f2189fed4e1b188a5bf5df1cc449183cb485abf
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ protected <T> Request replaceVertexRequest(final String key, final T value, fina
final Request request = request(graph.db().name(), RequestType.PUT, PATH_API_GHARIAL, graph.name(), VERTEX,
DocumentUtil.createDocumentHandle(name, key));
final VertexReplaceOptions params = (options != null ? options : new VertexReplaceOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
request.setBody(util(Serializer.CUSTOM).serialize(value));
Expand All @@ -148,6 +149,7 @@ protected <T> Request updateVertexRequest(final String key, final T value, final
request = request(graph.db().name(), RequestType.PATCH, PATH_API_GHARIAL, graph.name(), VERTEX,
DocumentUtil.createDocumentHandle(name, key));
final VertexUpdateOptions params = (options != null ? options : new VertexUpdateOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.KEEP_NULL, params.getKeepNull());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
Expand All @@ -174,6 +176,7 @@ protected Request deleteVertexRequest(final String key, final VertexDeleteOption
final Request request = request(graph.db().name(), RequestType.DELETE, PATH_API_GHARIAL, graph.name(), VERTEX,
DocumentUtil.createDocumentHandle(name, key));
final VertexDeleteOptions params = (options != null ? options : new VertexDeleteOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
return request;
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/arangodb/model/VertexDeleteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class VertexDeleteOptions {

private Boolean waitForSync;
private String ifMatch;
private String streamTransactionId;

public VertexDeleteOptions() {
super();
Expand Down Expand Up @@ -61,4 +62,19 @@ public VertexDeleteOptions ifMatch(final String ifMatch) {
this.ifMatch = ifMatch;
return this;
}

public String getStreamTransactionId() {
return streamTransactionId;
}

/**
* @param streamTransactionId If set, the operation will be executed within the transaction.
* @return options
* @since ArangoDB 3.5.1
*/
public VertexDeleteOptions streamTransactionId(final String streamTransactionId) {
this.streamTransactionId = streamTransactionId;
return this;
}

}
16 changes: 16 additions & 0 deletions src/main/java/com/arangodb/model/VertexReplaceOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class VertexReplaceOptions {

private Boolean waitForSync;
private String ifMatch;
private String streamTransactionId;

public VertexReplaceOptions() {
super();
Expand Down Expand Up @@ -61,4 +62,19 @@ public VertexReplaceOptions ifMatch(final String ifMatch) {
this.ifMatch = ifMatch;
return this;
}

public String getStreamTransactionId() {
return streamTransactionId;
}

/**
* @param streamTransactionId If set, the operation will be executed within the transaction.
* @return options
* @since ArangoDB 3.5.1
*/
public VertexReplaceOptions streamTransactionId(final String streamTransactionId) {
this.streamTransactionId = streamTransactionId;
return this;
}

}
15 changes: 15 additions & 0 deletions src/main/java/com/arangodb/model/VertexUpdateOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class VertexUpdateOptions {
private Boolean keepNull;
private Boolean waitForSync;
private String ifMatch;
private String streamTransactionId;

public VertexUpdateOptions() {
super();
Expand Down Expand Up @@ -79,4 +80,18 @@ public VertexUpdateOptions ifMatch(final String ifMatch) {
this.ifMatch = ifMatch;
return this;
}

public String getStreamTransactionId() {
return streamTransactionId;
}

/**
* @param streamTransactionId If set, the operation will be executed within the transaction.
* @return options
* @since ArangoDB 3.5.1
*/
public VertexUpdateOptions streamTransactionId(final String streamTransactionId) {
this.streamTransactionId = streamTransactionId;
return this;
}
}
129 changes: 126 additions & 3 deletions src/test/java/com/arangodb/StreamTransactionGraphTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@

import com.arangodb.ArangoDB.Builder;
import com.arangodb.entity.*;
import com.arangodb.model.GraphDocumentReadOptions;
import com.arangodb.model.StreamTransactionOptions;
import com.arangodb.model.VertexCreateOptions;
import com.arangodb.model.*;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -90,6 +88,28 @@ private BaseEdgeDocument createEdgeValue(String streamTransactionId) {
return value;
}

@Test
public void getVertex() {
assumeTrue(isSingleServer());
assumeTrue(isAtLeastVersion(3, 5));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, one thing:
assumeTrue(isAtLeastVersion(3, 5));

^-- does this needs to be instead isAtLeastVersion(3,5,1) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still discussing about which db versions to support and to test against... For the moment we only support and test against the latest patch of every minor release. Eg. atm 3.3.23, 3.4.8 and 3.5.0.

assumeTrue(isStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

StreamTransactionEntity tx = db
.beginStreamTransaction(new StreamTransactionOptions()
.readCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION)
.writeCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION));

// insert a vertex from outside the tx
VertexEntity createdVertex = vertexCollection1.insertVertex(new BaseDocument());

// assert that the vertex is not found from within the tx
assertThat(vertexCollection1.getVertex(createdVertex.getKey(), BaseDocument.class,
new GraphDocumentReadOptions().streamTransactionId(tx.getId())), is(nullValue()));

db.abortStreamTransaction(tx.getId());
}


@Test
public void createVertex() {
assumeTrue(isSingleServer());
Expand Down Expand Up @@ -117,4 +137,107 @@ public void createVertex() {
assertThat(vertexCollection1.getVertex(createdVertex.getKey(), BaseDocument.class, null), is(notNullValue()));
}

@Test
public void replaceVertex() {
assumeTrue(isSingleServer());
assumeTrue(isAtLeastVersion(3, 5));
assumeTrue(isStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

BaseDocument doc = new BaseDocument();
doc.addAttribute("test", "foo");

VertexEntity createdVertex = vertexCollection1.insertVertex(doc, null);

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions()
.readCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION)
.writeCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION));

// replace vertex from within the tx
doc.getProperties().clear();
doc.addAttribute("test", "bar");
vertexCollection1.replaceVertex(createdVertex.getKey(), doc,
new VertexReplaceOptions().streamTransactionId(tx.getId()));

// assert that the vertex has not been replaced from outside the tx
assertThat(vertexCollection1.getVertex(createdVertex.getKey(), BaseDocument.class, null)
.getProperties().get("test"), is("foo"));

// assert that the vertex has been replaced from within the tx
assertThat(vertexCollection1.getVertex(createdVertex.getKey(), BaseDocument.class,
new GraphDocumentReadOptions().streamTransactionId(tx.getId())).getProperties().get("test"), is("bar"));

db.commitStreamTransaction(tx.getId());

// assert that the vertex has been replaced after commit
assertThat(vertexCollection1.getVertex(createdVertex.getKey(), BaseDocument.class, null)
.getProperties().get("test"), is("bar"));
}

@Test
public void updateVertex() {
assumeTrue(isSingleServer());
assumeTrue(isAtLeastVersion(3, 5));
assumeTrue(isStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

BaseDocument doc = new BaseDocument();
doc.addAttribute("test", "foo");

VertexEntity createdDoc = vertexCollection1.insertVertex(doc, null);

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions()
.readCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION)
.writeCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION));

// update vertex from within the tx
doc.getProperties().clear();
doc.addAttribute("test", "bar");
vertexCollection1.updateVertex(createdDoc.getKey(), doc, new VertexUpdateOptions().streamTransactionId(tx.getId()));

// assert that the vertex has not been updated from outside the tx
assertThat(vertexCollection1.getVertex(createdDoc.getKey(), BaseDocument.class, null)
.getProperties().get("test"), is("foo"));

// assert that the vertex has been updated from within the tx
assertThat(vertexCollection1.getVertex(createdDoc.getKey(), BaseDocument.class,
new GraphDocumentReadOptions().streamTransactionId(tx.getId())).getProperties().get("test"), is("bar"));

db.commitStreamTransaction(tx.getId());

// assert that the vertex has been updated after commit
assertThat(vertexCollection1.getVertex(createdDoc.getKey(), BaseDocument.class, null)
.getProperties().get("test"), is("bar"));
}

@Test
public void deleteVertex() {
assumeTrue(isSingleServer());
assumeTrue(isAtLeastVersion(3, 5));
assumeTrue(isStorageEngine(ArangoDBEngine.StorageEngineName.rocksdb));

VertexEntity createdDoc = vertexCollection1.insertVertex(new BaseDocument(), null);

StreamTransactionEntity tx = db.beginStreamTransaction(
new StreamTransactionOptions()
.readCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION)
.writeCollections(VERTEX_COLLECTION_1, VERTEX_COLLECTION_2, EDGE_COLLECTION));

// delete vertex from within the tx
vertexCollection1.deleteVertex(createdDoc.getKey(), new VertexDeleteOptions().streamTransactionId(tx.getId()));

// assert that the vertex has not been deleted from outside the tx
assertThat(vertexCollection1.getVertex(createdDoc.getKey(), BaseDocument.class, null), is(notNullValue()));

// assert that the vertex has been deleted from within the tx
assertThat(vertexCollection1.getVertex(createdDoc.getKey(), BaseDocument.class,
new GraphDocumentReadOptions().streamTransactionId(tx.getId())), is(nullValue()));

db.commitStreamTransaction(tx.getId());

// assert that the vertex has been deleted after commit
assertThat(vertexCollection1.getVertex(createdDoc.getKey(), BaseDocument.class, null),
is(nullValue()));
}

}