Skip to content

Commit 1b52a10

Browse files
authored
feat: BlobWriteChannelV2 - same throughput less GC (#2110)
Use stable buffer allocation with laziness. Leverage new JsonResumableSession to provide more robustness and easier separation of concerns compared to BlobWriteChannel * rename blobWriteChannel.ser.properties to the correct blobReadChannel.ser.properties ### Runtime improvments Throughput is on par with the existing v1 implementation, however GC impact has been lightened with the new implementation. Below is the summary of the GC improvement between v1 and v2. These GC numbers were collected while uploading 4096 randomly sized objects, from 128KiB..2GiB across 16 concurrent threads, using a default chunkSize of 16MiB. | metric | unit | v1 | v2 | % decrease | |---------------------------------|--------|-------------:|-------------:|-----------:| | gc.alloc.rate | MB/sec | 2240.056 | 1457.731 | 34.924 | | gc.alloc.rate.norm | B/op | 955796726217 | 638403730507 | 33.207 | | gc.churn.G1_Eden_Space | MB/sec | 1597.009 | 1454.304 | 8.936 | | gc.churn.G1_Eden_Space.norm | B/op | 681418424320 | 636902965248 | 6.533 | | gc.churn.G1_Old_Gen | MB/sec | 691.877 | 11.316 | 98.364 | | gc.churn.G1_Old_Gen.norm | B/op | 295213237398 | 4955944331 | 98.321 | | gc.churn.G1_Survivor_Space | MB/sec | 0.004 | 0.002 | 50.000 | | gc.churn.G1_Survivor_Space.norm | B/op | 1572864 | 786432 | 50.000 | | gc.count | counts | 1670 | 1319 | 21.018 | | gc.time | ms | 15936 | 9527 | 40.217 | Overall allocation rate is decreased, while Old_Gen use is almost entirely eliminated. ``` openjdk version "11.0.18" 2023-01-17 OpenJDK Runtime Environment (build 11.0.18+10-post-Debian-1deb11u1) OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Debian-1deb11u1, mixed mode, sharing) -Xms12g -Xmx12g ``` All other java parameters are defaults.
1 parent 29feeaf commit 1b52a10

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1802
-1923
lines changed

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
<!-- Not breaking, internal only interface and the new methods have default implementations -->
55
<difference>
66
<differenceType>7012</differenceType>
7-
<className>com/google/cloud/storage/UnbufferedReadableByteChannelSession$UnbufferedReadableByteChannel</className>
8-
<method>* read(*)</method>
7+
<className>com/google/cloud/storage/UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel</className>
8+
<method>* write(*)</method>
99
</difference>
1010
<!-- Allow accessing the underlying Apiary instance -->
1111
<difference>
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.core.SettableApiFuture;
20+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
21+
import com.google.api.services.storage.model.StorageObject;
22+
import com.google.cloud.storage.Retrying.RetryingDependencies;
23+
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import java.nio.channels.ClosedChannelException;
27+
import java.util.function.LongConsumer;
28+
import javax.annotation.ParametersAreNonnullByDefault;
29+
import org.checkerframework.checker.nullness.qual.Nullable;
30+
31+
@ParametersAreNonnullByDefault
32+
final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
33+
34+
private final ResumableSession<StorageObject> session;
35+
36+
private final SettableApiFuture<StorageObject> result;
37+
private final LongConsumer committedBytesCallback;
38+
39+
private boolean open = true;
40+
private long cumulativeByteCount;
41+
private boolean finished = false;
42+
43+
ApiaryUnbufferedWritableByteChannel(
44+
HttpClientContext httpClientContext,
45+
RetryingDependencies deps,
46+
ResultRetryAlgorithm<?> alg,
47+
JsonResumableWrite resumableWrite,
48+
SettableApiFuture<StorageObject> result,
49+
LongConsumer committedBytesCallback) {
50+
this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite);
51+
this.result = result;
52+
this.committedBytesCallback = committedBytesCallback;
53+
}
54+
55+
@Override
56+
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
57+
if (!open) {
58+
throw new ClosedChannelException();
59+
}
60+
RewindableHttpContent content = RewindableHttpContent.of(Utils.subArray(srcs, offset, length));
61+
long available = content.getLength();
62+
long newFinalByteOffset = cumulativeByteCount + available;
63+
final HttpContentRange header;
64+
ByteRangeSpec rangeSpec = ByteRangeSpec.explicit(cumulativeByteCount, newFinalByteOffset);
65+
if (available % ByteSizeConstants._256KiB == 0) {
66+
header = HttpContentRange.of(rangeSpec);
67+
} else {
68+
header = HttpContentRange.of(rangeSpec, newFinalByteOffset);
69+
finished = true;
70+
}
71+
try {
72+
ResumableOperationResult<@Nullable StorageObject> operationResult =
73+
session.put(content, header);
74+
long persistedSize = operationResult.getPersistedSize();
75+
committedBytesCallback.accept(persistedSize);
76+
this.cumulativeByteCount = persistedSize;
77+
if (finished) {
78+
StorageObject storageObject = operationResult.getObject();
79+
result.set(storageObject);
80+
}
81+
return available;
82+
} catch (Exception e) {
83+
result.setException(e);
84+
throw StorageException.coalesce(e);
85+
}
86+
}
87+
88+
@Override
89+
public boolean isOpen() {
90+
return open;
91+
}
92+
93+
@Override
94+
public void close() throws IOException {
95+
open = false;
96+
if (!finished) {
97+
try {
98+
ResumableOperationResult<@Nullable StorageObject> operationResult =
99+
session.put(RewindableHttpContent.empty(), HttpContentRange.of(cumulativeByteCount));
100+
long persistedSize = operationResult.getPersistedSize();
101+
committedBytesCallback.accept(persistedSize);
102+
result.set(operationResult.getObject());
103+
} catch (Exception e) {
104+
result.setException(e);
105+
throw StorageException.coalesce(e);
106+
}
107+
}
108+
}
109+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.cloud.storage.ByteSizeConstants._16MiB;
20+
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
21+
22+
import com.google.api.core.ApiFuture;
23+
import com.google.api.core.ApiFutureCallback;
24+
import com.google.api.core.ApiFutures;
25+
import com.google.api.core.SettableApiFuture;
26+
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
27+
import com.google.cloud.storage.Conversions.Decoder;
28+
import com.google.common.base.Preconditions;
29+
import com.google.common.util.concurrent.MoreExecutors;
30+
import java.io.IOException;
31+
import java.nio.ByteBuffer;
32+
import java.nio.channels.ClosedChannelException;
33+
import org.checkerframework.checker.nullness.qual.Nullable;
34+
35+
abstract class BaseStorageWriteChannel<T> implements StorageWriteChannel {
36+
37+
private final Decoder<T, BlobInfo> objectDecoder;
38+
private final SettableApiFuture<T> result;
39+
40+
private long position;
41+
private boolean open;
42+
private int chunkSize;
43+
private LazyWriteChannel<T> lazyWriteChannel;
44+
private BufferHandle bufferHandle;
45+
46+
/**
47+
* This is tracked for compatibility with BlobWriteChannel, such that simply creating a writer
48+
* will create an object.
49+
*
50+
* <p>In the future we should move away from this behavior, and only create an object if write is
51+
* called.
52+
*/
53+
protected boolean writeCalledAtLeastOnce;
54+
55+
protected BaseStorageWriteChannel(Decoder<T, BlobInfo> objectDecoder) {
56+
this.objectDecoder = objectDecoder;
57+
this.result = SettableApiFuture.create();
58+
this.open = true;
59+
this.chunkSize = _16MiB;
60+
this.writeCalledAtLeastOnce = false;
61+
}
62+
63+
@Override
64+
public final synchronized void setChunkSize(int chunkSize) {
65+
Preconditions.checkArgument(chunkSize > 0, "chunkSize must be > 0, received %d", chunkSize);
66+
Preconditions.checkState(
67+
bufferHandle == null || bufferHandle.position() == 0,
68+
"unable to change chunk size with data buffered");
69+
this.chunkSize = chunkSize;
70+
}
71+
72+
@Override
73+
public final synchronized boolean isOpen() {
74+
return open;
75+
}
76+
77+
@Override
78+
public final synchronized void close() throws IOException {
79+
try {
80+
if (open && !writeCalledAtLeastOnce) {
81+
this.write(ByteBuffer.allocate(0));
82+
}
83+
if (internalGetLazyChannel().isOpen()) {
84+
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
85+
}
86+
} finally {
87+
open = false;
88+
}
89+
}
90+
91+
@Override
92+
public final synchronized int write(ByteBuffer src) throws IOException {
93+
if (!open) {
94+
throw new ClosedChannelException();
95+
}
96+
writeCalledAtLeastOnce = true;
97+
try {
98+
BufferedWritableByteChannel tmp = internalGetLazyChannel().getChannel();
99+
if (!tmp.isOpen()) {
100+
return 0;
101+
}
102+
int write = tmp.write(src);
103+
return write;
104+
} catch (StorageException e) {
105+
throw new IOException(e);
106+
} catch (IOException e) {
107+
throw e;
108+
} catch (Exception e) {
109+
throw new IOException(StorageException.coalesce(e));
110+
}
111+
}
112+
113+
@Override
114+
public final ApiFuture<BlobInfo> getObject() {
115+
return ApiFutures.transform(result, objectDecoder::decode, MoreExecutors.directExecutor());
116+
}
117+
118+
protected final BufferHandle getBufferHandle() {
119+
if (bufferHandle == null) {
120+
bufferHandle = BufferHandle.allocate(Buffers.alignSize(getChunkSize(), _256KiB));
121+
}
122+
return bufferHandle;
123+
}
124+
125+
protected final int getChunkSize() {
126+
return chunkSize;
127+
}
128+
129+
@Nullable
130+
protected final T getResolvedObject() {
131+
if (result.isDone()) {
132+
return StorageException.wrapFutureGet(result);
133+
} else {
134+
return null;
135+
}
136+
}
137+
138+
protected final long getCommittedPosition() {
139+
return position;
140+
}
141+
142+
protected final void setCommittedPosition(long l) {
143+
position = l;
144+
}
145+
146+
protected final void setOpen(boolean isOpen) {
147+
this.open = isOpen;
148+
}
149+
150+
protected abstract LazyWriteChannel<T> newLazyWriteChannel();
151+
152+
private LazyWriteChannel<T> internalGetLazyChannel() {
153+
if (lazyWriteChannel == null) {
154+
LazyWriteChannel<T> tmp = newLazyWriteChannel();
155+
ApiFuture<T> future = tmp.getSession().getResult();
156+
ApiFutures.addCallback(
157+
future,
158+
new ApiFutureCallback<T>() {
159+
@Override
160+
public void onFailure(Throwable t) {
161+
if (!result.isDone()) {
162+
result.setException(t);
163+
}
164+
}
165+
166+
@Override
167+
public void onSuccess(T t) {
168+
if (!result.isDone()) {
169+
result.set(t);
170+
}
171+
}
172+
},
173+
MoreExecutors.directExecutor());
174+
lazyWriteChannel = tmp;
175+
}
176+
return lazyWriteChannel;
177+
}
178+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,18 @@ public String toString() {
143143
static final class BlobReadChannelContext {
144144
private final HttpStorageOptions storageOptions;
145145
private final HttpRetryAlgorithmManager retryAlgorithmManager;
146+
private final HttpClientContext httpClientContext;
146147
private final Storage apiaryClient;
147148

148149
private BlobReadChannelContext(
149150
HttpStorageOptions storageOptions,
150-
Storage apiaryClient,
151-
HttpRetryAlgorithmManager retryAlgorithmManager) {
151+
HttpRetryAlgorithmManager retryAlgorithmManager,
152+
HttpClientContext httpClientContext,
153+
Storage apiaryClient) {
152154
this.storageOptions = storageOptions;
153-
this.apiaryClient = apiaryClient;
154155
this.retryAlgorithmManager = retryAlgorithmManager;
156+
this.httpClientContext = httpClientContext;
157+
this.apiaryClient = apiaryClient;
155158
}
156159

157160
public HttpStorageOptions getStorageOptions() {
@@ -162,13 +165,20 @@ public HttpRetryAlgorithmManager getRetryAlgorithmManager() {
162165
return retryAlgorithmManager;
163166
}
164167

168+
public HttpClientContext getHttpClientContext() {
169+
return httpClientContext;
170+
}
171+
165172
public Storage getApiaryClient() {
166173
return apiaryClient;
167174
}
168175

169176
static BlobReadChannelContext from(HttpStorageOptions options) {
170177
return new BlobReadChannelContext(
171-
options, options.getStorageRpcV1().getStorage(), options.getRetryAlgorithmManager());
178+
options,
179+
options.getRetryAlgorithmManager(),
180+
HttpClientContext.from(options.getStorageRpcV1()),
181+
options.getStorageRpcV1().getStorage());
172182
}
173183

174184
static BlobReadChannelContext from(com.google.cloud.storage.Storage s) {

0 commit comments

Comments
 (0)