- Notifications
You must be signed in to change notification settings - Fork 135
feat!: add support for CommitStats #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
3872199 def8768 5372dae 34fbda6 8f61c2a 041b34d 8299054 9b710a7 8c15641 919cf02 97ec917 afeb0fd 8524526 06b3e22 1c17d16 664b87d 6573a0f 0b448c3 83039fb 57ea714 66c5f88 1d17973 48ef21b 1f33c42 7d45ace File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -16,19 +16,23 @@ | |
| | ||
| package com.google.cloud.spanner; | ||
| | ||
| import com.google.api.core.ApiFunction; | ||
| import com.google.api.core.ApiFuture; | ||
| import com.google.api.core.ApiFutures; | ||
| import com.google.api.core.SettableApiFuture; | ||
| import com.google.cloud.Timestamp; | ||
| import com.google.cloud.spanner.TransactionRunner.TransactionCallable; | ||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.util.concurrent.MoreExecutors; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Executor; | ||
| | ||
| class AsyncRunnerImpl implements AsyncRunner { | ||
| private final TransactionRunnerImpl delegate; | ||
| private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create(); | ||
| private final SettableApiFuture<CommitResponse> commitResponse = SettableApiFuture.create(); | ||
| | ||
| AsyncRunnerImpl(TransactionRunnerImpl delegate) { | ||
| this.delegate = delegate; | ||
| this.delegate = Preconditions.checkNotNull(delegate); | ||
| } | ||
| | ||
| @Override | ||
| | @@ -43,7 +47,7 @@ public void run() { | |
| } catch (Throwable t) { | ||
| res.setException(t); | ||
| } finally { | ||
| setCommitTimestamp(); | ||
| setCommitResponse(); | ||
| } | ||
| } | ||
| }); | ||
| | @@ -66,16 +70,28 @@ public R run(TransactionContext transaction) throws Exception { | |
| }); | ||
| } | ||
| | ||
| private void setCommitTimestamp() { | ||
| private void setCommitResponse() { | ||
| try { | ||
| commitTimestamp.set(delegate.getCommitTimestamp()); | ||
| commitResponse.set(delegate.getCommitResponse()); | ||
| } catch (Throwable t) { | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. letting it slide because it isn't changed in this PR, but catching Throwable is only rarely what you want. This is probably worth filing a bug on. Collaborator Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added issue: #875 | ||
| commitTimestamp.setException(t); | ||
| commitResponse.setException(t); | ||
| } | ||
| } | ||
| | ||
| @Override | ||
| public ApiFuture<Timestamp> getCommitTimestamp() { | ||
| return commitTimestamp; | ||
| return ApiFutures.transform( | ||
| commitResponse, | ||
| new ApiFunction<CommitResponse, Timestamp>() { | ||
| @Override | ||
| public Timestamp apply(CommitResponse input) { | ||
| return input.getCommitTimestamp(); | ||
| } | ||
| }, | ||
| MoreExecutors.directExecutor()); | ||
| } | ||
| | ||
| public ApiFuture<CommitResponse> getCommitResponse() { | ||
| return commitResponse; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -17,6 +17,7 @@ | |
| package com.google.cloud.spanner; | ||
| | ||
| import com.google.api.core.ApiAsyncFunction; | ||
| import com.google.api.core.ApiFunction; | ||
| import com.google.api.core.ApiFuture; | ||
| import com.google.api.core.ApiFutureCallback; | ||
| import com.google.api.core.ApiFutures; | ||
| | @@ -40,14 +41,16 @@ final class AsyncTransactionManagerImpl | |
| | ||
| private final SessionImpl session; | ||
| private Span span; | ||
| private final Options options; | ||
| | ||
| private TransactionRunnerImpl.TransactionContextImpl txn; | ||
| private TransactionState txnState; | ||
| private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create(); | ||
| private final SettableApiFuture<CommitResponse> commitResponse = SettableApiFuture.create(); | ||
| | ||
| AsyncTransactionManagerImpl(SessionImpl session, Span span) { | ||
| AsyncTransactionManagerImpl(SessionImpl session, Span span, Options options) { | ||
| this.session = session; | ||
| this.span = span; | ||
| this.options = options; | ||
| } | ||
| | ||
| @Override | ||
| | @@ -80,7 +83,7 @@ public TransactionContextFutureImpl beginAsync() { | |
| | ||
| private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) { | ||
| txnState = TransactionState.STARTED; | ||
| txn = session.newTransaction(); | ||
| txn = session.newTransaction(options); | ||
| if (firstAttempt) { | ||
| session.setActive(this); | ||
| } | ||
| | @@ -126,28 +129,36 @@ public ApiFuture<Timestamp> commitAsync() { | |
| SpannerExceptionFactory.newSpannerException( | ||
| ErrorCode.ABORTED, "Transaction already aborted")); | ||
| } | ||
| ApiFuture<Timestamp> res = txn.commitAsync(); | ||
| ApiFuture<CommitResponse> res = txn.commitAsync(); | ||
| ||
| txnState = TransactionState.COMMITTED; | ||
| ApiFutures.addCallback( | ||
| res, | ||
| new ApiFutureCallback<Timestamp>() { | ||
| new ApiFutureCallback<CommitResponse>() { | ||
| @Override | ||
| public void onFailure(Throwable t) { | ||
| if (t instanceof AbortedException) { | ||
| txnState = TransactionState.ABORTED; | ||
| } else { | ||
| txnState = TransactionState.COMMIT_FAILED; | ||
| commitTimestamp.setException(t); | ||
| commitResponse.setException(t); | ||
| } | ||
| } | ||
| | ||
| @Override | ||
| public void onSuccess(Timestamp result) { | ||
| commitTimestamp.set(result); | ||
| public void onSuccess(CommitResponse result) { | ||
| commitResponse.set(result); | ||
| } | ||
| }, | ||
| MoreExecutors.directExecutor()); | ||
| return ApiFutures.transform( | ||
| res, | ||
| new ApiFunction<CommitResponse, Timestamp>() { | ||
| @Override | ||
| public Timestamp apply(CommitResponse input) { | ||
| return input.getCommitTimestamp(); | ||
| } | ||
| }, | ||
| MoreExecutors.directExecutor()); | ||
| return res; | ||
| } | ||
| | ||
| @Override | ||
| | @@ -184,6 +195,11 @@ public TransactionState getState() { | |
| return txnState; | ||
| } | ||
| | ||
| @Override | ||
| public ApiFuture<CommitResponse> getCommitResponse() { | ||
| return commitResponse; | ||
| } | ||
| | ||
| @Override | ||
| public void invalidate() { | ||
| if (txnState == TransactionState.STARTED || txnState == null) { | ||
| | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| /* | ||
| * Copyright 2020 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| | ||
| package com.google.cloud.spanner; | ||
| | ||
| import com.google.common.base.Preconditions; | ||
| import org.threeten.bp.Duration; | ||
| import org.threeten.bp.temporal.ChronoUnit; | ||
| | ||
| /** | ||
| * Commit statistics are returned by a read/write transaction if specifically requested by passing | ||
| * in {@link Options#commitStats()} to the transaction. | ||
| */ | ||
| public class CommitStats { | ||
| private final long mutationCount; | ||
| private final Duration overloadDelay; | ||
| | ||
| private CommitStats(long mutationCount, Duration overloadDelay) { | ||
| this.mutationCount = mutationCount; | ||
| this.overloadDelay = overloadDelay; | ||
| } | ||
| | ||
| static CommitStats fromProto(com.google.spanner.v1.CommitResponse.CommitStats proto) { | ||
| Preconditions.checkNotNull(proto); | ||
| return new CommitStats( | ||
| proto.getMutationCount(), | ||
| Duration.of(proto.getOverloadDelay().getSeconds(), ChronoUnit.SECONDS) | ||
| .plusNanos(proto.getOverloadDelay().getNanos())); | ||
| } | ||
| | ||
| /** | ||
| * The number of mutations that were executed by the transaction. Insert and update operations | ||
| * count with the multiplicity of the number of columns they affect. For example, inserting a new | ||
| * record may count as five mutations, if values are inserted into five columns. Delete and delete | ||
| * range operations count as one mutation regardless of the number of columns affected. Deleting a | ||
| * row from a parent table that has the ON DELETE CASCADE annotation is also counted as one | ||
| * mutation regardless of the number of interleaved child rows present. The exception to this is | ||
| * if there are secondary indexes defined on rows being deleted, then the changes to the secondary | ||
| * indexes will be counted individually. For example, if a table has 2 secondary indexes, deleting | ||
| ||
| * a range of rows in the table will count as 1 mutation for the table, plus 2 mutations for each | ||
| ||
| * row that is deleted because the rows in the secondary index might be scattered over the | ||
| * key-space, making it impossible for Cloud Spanner to call a single delete range operation on | ||
| * the secondary indexes. Secondary indexes include the foreign keys backing indexes. | ||
| */ | ||
| public long getMutationCount() { | ||
| return mutationCount; | ||
| } | ||
| | ||
| /** The duration that the commit was delayed due to overloaded servers. */ | ||
| public Duration getOverloadDelay() { | ||
| return overloadDelay; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another one where a major version bump is required