Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* <p>TODO: support updated schema
*/
class ConnectionWorker implements AutoCloseable {

private static final Logger log = Logger.getLogger(StreamWriter.class.getName());

// Maximum wait time on inflight quota before error out.
Expand Down Expand Up @@ -280,6 +281,8 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
requestBuilder.setOffset(Int64Value.of(offset));
}
requestBuilder.setWriteStream(streamWriter.getStreamName());
requestBuilder.putAllMissingValueInterpretations(
streamWriter.getMissingValueInterpretationMap());
return appendInternal(streamWriter, requestBuilder.build());
}

Expand Down Expand Up @@ -853,6 +856,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() {

// Class that wraps AppendRowsRequest and its corresponding Response future.
static final class AppendRequestAndResponse {

final SettableApiFuture<AppendRowsResponse> appendResult;
final AppendRowsRequest message;
final long messageSize;
Expand Down Expand Up @@ -884,6 +888,7 @@ public Load getLoad() {
*/
@AutoValue
public abstract static class Load {

// Consider the load on this worker to be overwhelmed when above some percentage of
// in-flight bytes or in-flight requests count.
private static double overwhelmedInflightCount = 0.2;
Expand Down Expand Up @@ -957,6 +962,7 @@ static void setMaxInflightQueueWaitTime(long waitTime) {

@AutoValue
abstract static class TableSchemaAndTimestamp {

// Shows the timestamp updated schema is reported from response
abstract long updateTimeStamp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}

/**
* Sets the missing value interpretation map for the JsonStreamWriter. The input
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by the
* JsonStreamWriter.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return streamWriter.getMissingValueInterpretationMap();
}

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
Expand Down Expand Up @@ -61,6 +62,11 @@ public class StreamWriter implements AutoCloseable {
// Cache of location info for a given dataset.
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();

// Map of fields to their MissingValueInterpretation, which dictates how a field should be
// populated when it is missing from an input user row.
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
new HashMap();

/*
* The identifier of stream to write to.
*/
Expand Down Expand Up @@ -336,6 +342,18 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
}
}

/**
* Sets the missing value interpretation map for the stream writer. The input
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by stream
* writer.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.missingValueInterpretationMap = missingValueInterpretationMap;
}

/**
* Schedules the writing of rows at the end of current stream.
*
Expand Down Expand Up @@ -419,6 +437,12 @@ public String getLocation() {
return location;
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return missingValueInterpretationMap;
}

/**
* @return if a stream writer can no longer be used for writing. It is due to either the
* StreamWriter is explicitly closed or the underlying connection is broken when connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1290,4 +1291,43 @@ private AppendRowsResponse createAppendResponse(long offset) {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build())
.build();
}

@Test
public void testAppendWithMissingValueMap() throws Exception {
TableFieldSchema field =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test-列")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build();
FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build();
JSONObject flexible = new JSONObject();
flexible.put("test-列", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(flexible);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {

Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(),
missingValueMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -67,6 +69,7 @@

@RunWith(JUnit4.class)
public class StreamWriterTest {

private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
Expand Down Expand Up @@ -1227,6 +1230,51 @@ public void testCloseDisconnectedStream() throws Exception {
writer.close();
}

@Test
public void testSetAndGetMissingValueInterpretationMap() throws Exception {
StreamWriter writer = getTestStreamWriter();
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
}

@Test
public void testAppendWithMissingValueMap() throws Exception {
StreamWriter writer = getTestStreamWriter();

long appendCount = 2;
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// The first append doesn't use a missing value map.
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0));

// The second append uses a missing value map.
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1));

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}

// Ensure that the AppendRowsRequest for the first append operation does not have a missing
// value map, and that the second AppendRowsRequest has the missing value map provided in the
// second append.
verifyAppendRequests(appendCount);
AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0);
AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1);
assertTrue(request1.getMissingValueInterpretations().isEmpty());
assertEquals(request2.getMissingValueInterpretations(), missingValueMap);

writer.close();
}

@Test(timeout = 10000)
public void testStreamWriterUserCloseMultiplexing() throws Exception {
StreamWriter writer =
Expand Down