Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,25 +409,15 @@ public void populateDisplayData(DisplayData.Builder builder) {
configuration.populateDisplayData(builder);
}

/**
* The writeReplace method allows the developer to provide a replacement object that will be
* serialized instead of the original one. We use this to keep the enclosed class immutable. For
* more details on the technique see <a
* href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
* article</a>.
*/
private Object writeReplace() {
return new SerializationProxy(getConfiguration());
}
abstract static class AbstractSerializationProxy implements Serializable {

static class SerializationProxy implements Serializable {
private ValueProvider<String> projectId;
private ValueProvider<String> instanceId;
private ValueProvider<String> tableId;
private Map<String, ValueProvider<String>> additionalConfiguration;
private transient ValueProvider<Scan> scan;

public SerializationProxy(CloudBigtableScanConfiguration configuration) {
public AbstractSerializationProxy(CloudBigtableScanConfiguration configuration) {
this.projectId = configuration.getProjectIdValueProvider();
this.instanceId = configuration.getInstanceIdValueProvider();
this.tableId = configuration.getTableIdValueProvider();
Expand Down Expand Up @@ -472,11 +462,13 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}
}

Object readResolve() {
abstract Object readResolve();

protected CloudBigtableScanConfiguration buildScanConfig() {
CloudBigtableScanConfiguration conf =
CloudBigtableScanConfiguration.createConfig(
projectId, instanceId, tableId, scan, additionalConfiguration);
return CloudBigtableIO.read(conf);
return conf;
}
}
}
Expand Down Expand Up @@ -532,8 +524,26 @@ public Coder<Result> getOutputCoder() {
return getResultCoder();
}

/**
* The writeReplace method allows the developer to provide a replacement object that will be
* serialized instead of the original one. We use this to keep the enclosed class immutable. For
* more details on the technique see <a
* href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
* article</a>.
*/
private Object writeReplace() {
return new SerializationProxy(getConfiguration());
return new SourceSerializationProxy(getConfiguration());
}

static class SourceSerializationProxy extends AbstractSerializationProxy {
public SourceSerializationProxy(CloudBigtableScanConfiguration configuration) {
super(configuration);
}

@Override
Object readResolve() {
return new Source(buildScanConfig());
}
}
}

Expand All @@ -554,7 +564,6 @@ protected static class SourceWithKeys extends AbstractSource {

protected SourceWithKeys(CloudBigtableScanConfiguration configuration, long estimatedSize) {
super(configuration);

byte[] stopRow = configuration.getStopRow();
if (stopRow.length > 0) {
byte[] startRow = configuration.getStartRow();
Expand Down Expand Up @@ -618,7 +627,23 @@ public String toString() {
}

private Object writeReplace() {
return new SerializationProxy(getConfiguration());
return new SourceWithKeysSerializationProxy(getConfiguration(), estimatedSize);
}

static class SourceWithKeysSerializationProxy extends AbstractSerializationProxy {

private long estimatedSize;

public SourceWithKeysSerializationProxy(
CloudBigtableScanConfiguration configuration, long estimatedSize) {
super(configuration);
this.estimatedSize = estimatedSize;
}

Object readResolve() {
CloudBigtableScanConfiguration conf = buildScanConfig();
return new SourceWithKeys(conf, estimatedSize);
}
}
}
/** Reads rows for a specific {@link Table}, usually filtered by a {@link Scan}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.beam;

import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;
Expand All @@ -28,18 +29,22 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -217,4 +222,73 @@ public void testWriteToMultipleTablesValidateConfig() throws Exception {
Assert.assertTrue(e.getMessage().contains("A instanceId must be set"));
}
}

@Test
public void testSerializeSource() {
String projectId = "my-project";
String instanceId = "my-instance";
String tableId = "my-table";
Scan scan = new Scan().setFilter(new KeyOnlyFilter()).setMaxVersions(2);
CloudBigtableScanConfiguration configuration =
CloudBigtableScanConfiguration.createConfig(
StaticValueProvider.of(projectId),
StaticValueProvider.of(instanceId),
StaticValueProvider.of(tableId),
StaticValueProvider.of(scan),
new HashMap<>());

Source source = new Source(configuration);

Source deserialized = SerializableUtils.ensureSerializable(source);

assertEquals(projectId, deserialized.getConfiguration().getProjectId());
assertEquals(instanceId, deserialized.getConfiguration().getInstanceId());
assertEquals(tableId, deserialized.getConfiguration().getTableId());
assertEquals(
scan.getFilter(), deserialized.getConfiguration().getScanValueProvider().get().getFilter());
assertEquals(2, scan.getMaxVersions());
}

@Test
public void testSerializeSourceWithKeys() {
String projectId = "my-project";
String instanceId = "my-instance";
String tableId = "my-table";
String startKey = "aaa1";
String endKey = "bbb3";

Scan scan =
new Scan()
.withStartRow(ByteString.copyFromUtf8(startKey).toByteArray())
.withStopRow(ByteString.copyFromUtf8(endKey).toByteArray())
.setFilter(new KeyOnlyFilter());

CloudBigtableScanConfiguration sourceWithKeysConfiguration =
CloudBigtableScanConfiguration.createConfig(
StaticValueProvider.of(projectId),
StaticValueProvider.of(instanceId),
StaticValueProvider.of(tableId),
StaticValueProvider.of(scan),
new HashMap<>());

long estimatedSize = 123456;
SourceWithKeys sourceWithKeys = new SourceWithKeys(sourceWithKeysConfiguration, estimatedSize);

SourceWithKeys deserialized = SerializableUtils.ensureSerializable(sourceWithKeys);

assertEquals(projectId, deserialized.getConfiguration().getProjectId());
assertEquals(instanceId, deserialized.getConfiguration().getInstanceId());
assertEquals(tableId, deserialized.getConfiguration().getTableId());
assertEquals(
startKey,
ByteString.copyFrom(
deserialized.getConfiguration().getScanValueProvider().get().getStartRow())
.toStringUtf8());
assertEquals(
endKey,
ByteString.copyFrom(
deserialized.getConfiguration().getScanValueProvider().get().getStopRow())
.toStringUtf8());
assertEquals(estimatedSize, deserialized.getEstimatedSize());
}
}