Skip to content

Commit 3c05037

Browse files
author
Brian Chen
authored
feat: add success and error callbacks to BulkWriter (#483)
1 parent e2aecfe commit 3c05037

File tree

13 files changed

+1619
-559
lines changed

13 files changed

+1619
-559
lines changed

google-cloud-firestore/clirr-ignored-differences.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,25 @@
223223
<to>java.util.List</to>
224224
</difference>
225225

226+
<!--
227+
UpdateBuilder
228+
-->
229+
<difference>
230+
<differenceType>6001</differenceType>
231+
<className>com/google/cloud/firestore/UpdateBuilder</className>
232+
<field>pendingOperations</field>
233+
</difference>
234+
<difference>
235+
<differenceType>6001</differenceType>
236+
<className>com/google/cloud/firestore/UpdateBuilder</className>
237+
<field>state</field>
238+
</difference>
239+
<difference>
240+
<differenceType>6010</differenceType>
241+
<className>com/google/cloud/firestore/UpdateBuilder</className>
242+
<field>writes</field>
243+
</difference>
244+
226245
<!--
227246
FakeCredentials Refactor
228247
com.google.cloud.firestore.FirestoreOptions$Builder$FakeCredentials -> com.google.cloud.firestore.FirestoreOptions$EmulatorCredentials

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java

Lines changed: 160 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,180 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import com.google.api.core.ApiAsyncFunction;
20+
import com.google.api.core.ApiFunction;
1921
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.api.core.SettableApiFuture;
24+
import com.google.cloud.Timestamp;
2025
import com.google.common.base.Preconditions;
26+
import com.google.common.collect.ImmutableMap;
27+
import com.google.common.util.concurrent.MoreExecutors;
28+
import com.google.firestore.v1.BatchWriteRequest;
29+
import com.google.firestore.v1.BatchWriteResponse;
30+
import io.grpc.Status;
31+
import io.opencensus.trace.AttributeValue;
32+
import io.opencensus.trace.Tracing;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.Set;
36+
import java.util.concurrent.CopyOnWriteArraySet;
37+
import javax.annotation.Nullable;
2138

2239
/** Used to represent a batch on the BatchQueue. */
2340
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
41+
/**
42+
* Used to represent the state of batch.
43+
*
44+
* <p>Writes can only be added while the batch is OPEN. For a batch to be sent, the batch must be
45+
* READY_TO_SEND. After a batch is sent, it is marked as SENT.
46+
*/
47+
enum BatchState {
48+
OPEN,
49+
READY_TO_SEND,
50+
SENT,
51+
}
52+
53+
private BatchState state = BatchState.OPEN;
54+
55+
private final List<SettableApiFuture<BatchWriteResult>> pendingOperations = new ArrayList<>();
56+
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
57+
private final int maxBatchSize;
2458

2559
BulkCommitBatch(FirestoreImpl firestore, int maxBatchSize) {
26-
super(firestore, maxBatchSize);
60+
super(firestore);
61+
this.maxBatchSize = maxBatchSize;
2762
}
2863

29-
BulkCommitBatch(FirestoreImpl firestore, BulkCommitBatch retryBatch) {
30-
super(firestore);
64+
@Override
65+
boolean isCommitted() {
66+
return state == BatchState.SENT;
67+
}
68+
69+
ApiFuture<WriteResult> wrapResult(DocumentReference documentReference) {
70+
return processLastOperation(documentReference);
71+
}
72+
73+
/**
74+
* Commits all pending operations to the database and verifies all preconditions.
75+
*
76+
* <p>The writes in the batch are not applied atomically and can be applied out of order.
77+
*/
78+
ApiFuture<List<BatchWriteResult>> bulkCommit() {
79+
Tracing.getTracer()
80+
.getCurrentSpan()
81+
.addAnnotation(
82+
TraceUtil.SPAN_NAME_BATCHWRITE,
83+
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size())));
3184

32-
// Create a new BulkCommitBatch containing only the indexes from the provided indexes to retry.
33-
for (int index : retryBatch.getPendingIndexes()) {
34-
this.writes.add(retryBatch.writes.get(index));
85+
Preconditions.checkState(
86+
isReadyToSend(), "The batch should be marked as READY_TO_SEND before committing");
87+
state = BatchState.SENT;
88+
89+
final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
90+
request.setDatabase(firestore.getDatabaseName());
91+
92+
for (WriteOperation writeOperation : getWrites()) {
93+
request.addWrites(writeOperation.write);
3594
}
3695

96+
ApiFuture<BatchWriteResponse> response =
97+
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable());
98+
99+
return ApiFutures.transform(
100+
response,
101+
new ApiFunction<BatchWriteResponse, List<BatchWriteResult>>() {
102+
@Override
103+
public List<BatchWriteResult> apply(BatchWriteResponse batchWriteResponse) {
104+
List<com.google.firestore.v1.WriteResult> writeResults =
105+
batchWriteResponse.getWriteResultsList();
106+
107+
List<com.google.rpc.Status> statuses = batchWriteResponse.getStatusList();
108+
109+
List<BatchWriteResult> result = new ArrayList<>();
110+
111+
for (int i = 0; i < writeResults.size(); ++i) {
112+
com.google.firestore.v1.WriteResult writeResult = writeResults.get(i);
113+
com.google.rpc.Status status = statuses.get(i);
114+
Status code = Status.fromCodeValue(status.getCode());
115+
@Nullable Timestamp updateTime = null;
116+
@Nullable Exception exception = null;
117+
if (code == Status.OK) {
118+
updateTime = Timestamp.fromProto(writeResult.getUpdateTime());
119+
} else {
120+
exception = FirestoreException.serverRejected(code, status.getMessage());
121+
}
122+
result.add(new BatchWriteResult(updateTime, exception));
123+
}
124+
125+
return result;
126+
}
127+
},
128+
MoreExecutors.directExecutor());
129+
}
130+
131+
int getPendingOperationCount() {
132+
return pendingOperations.size();
133+
}
134+
135+
ApiFuture<WriteResult> processLastOperation(DocumentReference documentReference) {
37136
Preconditions.checkState(
38-
retryBatch.state == BatchState.SENT,
39-
"Batch should be SENT when creating a new BulkCommitBatch for retry");
40-
this.state = retryBatch.state;
41-
this.pendingOperations = retryBatch.pendingOperations;
137+
!documents.contains(documentReference),
138+
"Batch should not contain writes to the same document");
139+
documents.add(documentReference);
140+
Preconditions.checkState(state == BatchState.OPEN, "Batch should be OPEN when adding writes");
141+
SettableApiFuture<BatchWriteResult> resultFuture = SettableApiFuture.create();
142+
pendingOperations.add(resultFuture);
143+
144+
if (getPendingOperationCount() == maxBatchSize) {
145+
state = BatchState.READY_TO_SEND;
146+
}
147+
148+
return ApiFutures.transformAsync(
149+
resultFuture,
150+
new ApiAsyncFunction<BatchWriteResult, WriteResult>() {
151+
public ApiFuture<WriteResult> apply(BatchWriteResult batchWriteResult) throws Exception {
152+
if (batchWriteResult.getException() == null) {
153+
return ApiFutures.immediateFuture(new WriteResult(batchWriteResult.getWriteTime()));
154+
} else {
155+
throw batchWriteResult.getException();
156+
}
157+
}
158+
},
159+
MoreExecutors.directExecutor());
160+
}
161+
162+
/**
163+
* Resolves the individual operations in the batch with the results and removes the entry from the
164+
* pendingOperations map if the result is not retryable.
165+
*/
166+
void processResults(List<BatchWriteResult> results) {
167+
for (int i = 0; i < results.size(); i++) {
168+
SettableApiFuture<BatchWriteResult> resultFuture = pendingOperations.get(i);
169+
BatchWriteResult result = results.get(i);
170+
if (result.getException() == null) {
171+
resultFuture.set(result);
172+
} else {
173+
resultFuture.setException(result.getException());
174+
}
175+
}
176+
}
177+
178+
void markReadyToSend() {
179+
if (state == BatchState.OPEN) {
180+
state = BatchState.READY_TO_SEND;
181+
}
182+
}
183+
184+
boolean isOpen() {
185+
return state == BatchState.OPEN;
186+
}
187+
188+
boolean isReadyToSend() {
189+
return state == BatchState.READY_TO_SEND;
42190
}
43191

44-
ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) {
45-
return result;
192+
boolean has(DocumentReference documentReference) {
193+
return documents.contains(documentReference);
46194
}
47195
}

0 commit comments

Comments
 (0)