- Notifications
You must be signed in to change notification settings - Fork 75
fix: close the Watch stream when we receive an error #834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -21,7 +21,9 @@ | |
| import com.google.api.gax.retrying.ExponentialRetryAlgorithm; | ||
| import com.google.api.gax.retrying.TimedAttemptSettings; | ||
| import com.google.api.gax.rpc.ApiException; | ||
| import com.google.api.gax.rpc.ApiStreamObserver; | ||
| import com.google.api.gax.rpc.BidiStreamObserver; | ||
| import com.google.api.gax.rpc.ClientStream; | ||
| import com.google.api.gax.rpc.StreamController; | ||
| import com.google.cloud.Timestamp; | ||
| import com.google.cloud.firestore.DocumentChange.Type; | ||
| import com.google.common.base.Preconditions; | ||
| | @@ -57,7 +59,7 @@ | |
| * It synchronizes on its own instance so it is advisable not to use this class for external | ||
| * synchronization. | ||
| */ | ||
| class Watch implements ApiStreamObserver<ListenResponse> { | ||
| class Watch implements BidiStreamObserver<ListenRequest, ListenResponse> { | ||
| /** | ||
| * Target ID used by watch. Watch uses a fixed target id since we only support one target per | ||
| * stream. The actual target ID we use is arbitrary. | ||
| | @@ -71,7 +73,7 @@ class Watch implements ApiStreamObserver<ListenResponse> { | |
| private final ExponentialRetryAlgorithm backoff; | ||
| private final Target target; | ||
| private TimedAttemptSettings nextAttempt; | ||
| private ApiStreamObserver<ListenRequest> stream; | ||
| private ClientStream<ListenRequest> stream; | ||
| | ||
| /** The sorted tree of DocumentSnapshots as sent in the last snapshot. */ | ||
| private DocumentSet documentSet; | ||
| | @@ -167,7 +169,13 @@ static Watch forQuery(Query query) { | |
| } | ||
| | ||
| @Override | ||
| public synchronized void onNext(ListenResponse listenResponse) { | ||
| public void onStart(StreamController streamController) {} | ||
| | ||
| @Override | ||
| public void onReady(ClientStream<ListenRequest> clientStream) {} | ||
| | ||
| @Override | ||
| public synchronized void onResponse(ListenResponse listenResponse) { | ||
| switch (listenResponse.getResponseTypeCase()) { | ||
| case TARGET_CHANGE: | ||
| TargetChange change = listenResponse.getTargetChange(); | ||
| | @@ -258,7 +266,7 @@ public synchronized void onError(Throwable throwable) { | |
| } | ||
| | ||
| @Override | ||
| public synchronized void onCompleted() { | ||
| public synchronized void onComplete() { | ||
| maybeReopenStream(new StatusException(Status.fromCode(Code.UNKNOWN))); | ||
| } | ||
| | ||
| | @@ -289,7 +297,7 @@ ListenerRegistration runWatch( | |
| .execute( | ||
| () -> { | ||
| synchronized (Watch.this) { | ||
| stream.onCompleted(); | ||
| stream.closeSend(); | ||
| Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This now closes the stream instead of invoking a no-op method. | ||
| stream = null; | ||
| } | ||
| }); | ||
| | @@ -321,7 +329,7 @@ private void resetDocs() { | |
| /** Closes the stream and calls onError() if the stream is still active. */ | ||
| private void closeStream(final Throwable throwable) { | ||
| if (stream != null) { | ||
| stream.onCompleted(); | ||
| stream.closeSend(); | ||
| stream = null; | ||
| } | ||
| | ||
| | @@ -363,7 +371,7 @@ private void maybeReopenStream(Throwable throwable) { | |
| /** Helper to restart the outgoing stream to the backend. */ | ||
| private void resetStream() { | ||
| if (stream != null) { | ||
| stream.onCompleted(); | ||
| stream.closeSend(); | ||
| stream = null; | ||
| } | ||
| | ||
| | @@ -399,7 +407,7 @@ private void initStream() { | |
| request.getAddTargetBuilder().setResumeToken(resumeToken); | ||
| } | ||
| | ||
| stream.onNext(request.build()); | ||
| stream.send(request.build()); | ||
| } | ||
| } catch (Throwable throwable) { | ||
| onError(throwable); | ||
| | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -70,7 +70,6 @@ | |
| import org.mockito.Mockito; | ||
| import org.mockito.Spy; | ||
| import org.mockito.runners.MockitoJUnitRunner; | ||
| import org.mockito.stubbing.Answer; | ||
| | ||
| @RunWith(MockitoJUnitRunner.class) | ||
| public class BulkWriterTest { | ||
| | @@ -1119,11 +1118,10 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) | |
| }; | ||
| | ||
| doAnswer( | ||
| (Answer<ApiFuture<GeneratedMessageV3>>) | ||
| mock -> { | ||
| retryAttempts[0]++; | ||
| return RETRYABLE_FAILED_FUTURE; | ||
| }) | ||
| mock -> { | ||
| Collaborator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this test the new changes? Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it's hard to test these changes. We could mock the backend stream, but then we are essentially only testing the behavior of the mock. If you know of a pre-existing implementation/fake of a GRPC stream that we can use to test this behavior, then we can add a test. A homegrown implementation that validates that our code follows our ow assumptions will not provide us with much meaningful test coverage. Collaborator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would something like this be helpful? https://github.com/googleapis/java-bigtable/blob/main/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/testing/FakeStreamingApi.java (It's using ApiStreamObserver, but I imagine could be refactored to this use case) Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added test queryWatchShutsDownStreamOnPermissionDenied() that re-uses some of the existing functionality. | ||
| retryAttempts[0]++; | ||
| return RETRYABLE_FAILED_FUTURE; | ||
| }) | ||
| .when(firestoreMock) | ||
| .sendRequest( | ||
| batchWriteCapture.capture(), | ||
| | @@ -1170,11 +1168,10 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) | |
| }; | ||
| | ||
| doAnswer( | ||
| (Answer<ApiFuture<GeneratedMessageV3>>) | ||
| mock -> { | ||
| retryAttempts[0]++; | ||
| return RESOURCE_EXHAUSTED_FAILED_FUTURE; | ||
| }) | ||
| mock -> { | ||
| retryAttempts[0]++; | ||
| return RESOURCE_EXHAUSTED_FAILED_FUTURE; | ||
| }) | ||
| .when(firestoreMock) | ||
| .sendRequest( | ||
| batchWriteCapture.capture(), | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the main changes, the rest is plumbing and cleanup.