Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8c2af65
Wrap in event type and now I get failing tests
danieljbruce Nov 25, 2024
751a8e2
Correct the fake chunk transformer mock
danieljbruce Nov 25, 2024
8fac34e
Adjust another mock so that Fake chunk transformer
danieljbruce Nov 25, 2024
46a636f
Fix all the tests in the chunk transformer
danieljbruce Nov 25, 2024
11b03d8
Adjust interfaces ready for new push events
danieljbruce Nov 25, 2024
af7e5fa
Push lastScannedRow events from the chunk transfo
danieljbruce Nov 25, 2024
76aeba9
Remove TODO
danieljbruce Nov 25, 2024
64a67d2
Remove LastScannedRow from known_failures
danieljbruce Nov 25, 2024
e7164dd
Remove console log
danieljbruce Nov 25, 2024
47425e8
Fix linting error
danieljbruce Nov 25, 2024
41f494c
Remove the data event wrapping
danieljbruce Nov 25, 2024
3297b6a
Revert mock
danieljbruce Nov 25, 2024
a8d00ba
Revert mock
danieljbruce Nov 25, 2024
0f9dc62
Revert mock
danieljbruce Nov 25, 2024
60742aa
Remove unused data properties
danieljbruce Nov 25, 2024
1aa668d
For data events, don’t send an enum
danieljbruce Nov 25, 2024
c1a6108
Run the linter
danieljbruce Nov 25, 2024
1c1ff29
Revert changes in chunk transformer test
danieljbruce Nov 25, 2024
53f8174
Remove the DataEvent import
danieljbruce Nov 25, 2024
68ab0d0
Remove unused dependency
danieljbruce Nov 25, 2024
cc46cd9
Update the chunk transformer comment
danieljbruce Nov 25, 2024
dee312e
Clarify comment
danieljbruce Nov 25, 2024
daffd82
correct comment
danieljbruce Nov 25, 2024
e372197
Correct comment
danieljbruce Nov 25, 2024
6cd2edc
Correct comment
danieljbruce Nov 25, 2024
96a9a36
Correct the comments
danieljbruce Nov 25, 2024
2a553dd
Merge branch 'main' into 380268299-implementing-the-alternative
danieljbruce Nov 27, 2024
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import {TableUtils} from './utils/table';

export type Value = string | number | boolean | Uint8Array;

export enum DataEvent {
LAST_ROW_KEY_UPDATE,
}

export interface Chunk {
rowContents: Value;
commitRow: boolean;
Expand Down Expand Up @@ -51,6 +55,12 @@ export interface TransformErrorProps {
message: string;
chunk: Chunk | null;
}
export interface ChunkPushLastScannedRowData {
eventType: DataEvent.LAST_ROW_KEY_UPDATE;
lastScannedRowKey?: string;
}

export type ChunkPushData = Row | ChunkPushLastScannedRowData;

class TransformError extends Error {
constructor(props: TransformErrorProps) {
Expand Down Expand Up @@ -159,6 +169,19 @@ export class ChunkTransformer extends Transform {
userOptions: this.options,
}
);
/**
* Push an event that will update the lastRowKey in the user stream after
* all rows ahead of this event have reached the user stream. This will
* ensure that a retry excludes the lastScannedRow as this is required
* for the TestReadRows_Retry_LastScannedRow conformance test to pass. It
* is important to use a 'data' event to update the last row key in order
* to allow all the data queued ahead of this event to reach the user
* stream first.
*/
this.push({
eventType: DataEvent.LAST_ROW_KEY_UPDATE,
lastScannedRowKey: this.lastRowKey,
});
}
next();
}
Expand Down
50 changes: 44 additions & 6 deletions src/tabular-api-surface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
} from './index';
import {Filter, BoundData, RawFilter} from './filter';
import {Row} from './row';
import {ChunkTransformer} from './chunktransformer';
import {
ChunkPushData,
ChunkPushLastScannedRowData,
ChunkTransformer,
DataEvent,
} from './chunktransformer';
import {BackoffSettings} from 'google-gax/build/src/gax';
import {google} from '../protos/protos';
import {CallOptions, ServiceError} from 'google-gax';
Expand Down Expand Up @@ -244,11 +249,27 @@
objectMode: true,
readableHighWaterMark: 0, // We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early.
writableHighWaterMark: 0, // We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key.
transform(row, _encoding, callback) {
transform(event, _encoding, callback) {
if (userCanceled) {
callback();
return;
}
if (event.eventType === DataEvent.LAST_ROW_KEY_UPDATE) {
/**
* This code will run when receiving an event containing
* lastScannedRowKey data that the chunk transformer sent. When the
* chunk transformer gets lastScannedRowKey data, this code
* updates the lastRowKey to ensure row ids with the lastScannedRowKey
* aren't re-requested in retries. The lastRowKey needs to be updated
* here and not in the chunk transformer to ensure the update is
* queued behind all events that deliver data to the user stream
* first.
*/
lastRowKey = event.lastScannedRowKey;
callback();
return;
}
const row = event;
if (TableUtils.lessThanOrEqualTo(row.id, lastRowKey)) {
/*
Sometimes duplicate rows reach this point. To avoid delivering
Expand Down Expand Up @@ -325,7 +346,7 @@
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {

Check warning on line 349 in src/tabular-api-surface.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used

Check warning on line 349 in src/tabular-api-surface.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
return false;
},
};
Expand Down Expand Up @@ -425,17 +446,34 @@
activeRequestStream = requestStream!;

const toRowStream = new Transform({
transform: (rowData, _, next) => {
transform: (rowData: ChunkPushData, _, next) => {
if (
userCanceled ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(userStream as any)._writableState.ended
) {
return next();
}
const row = this.row(rowData.key);
row.data = rowData.data;
next(null, row);
if (
(rowData as ChunkPushLastScannedRowData).eventType ===
DataEvent.LAST_ROW_KEY_UPDATE
) {
/**
* If the data is the chunk transformer communicating that the
* lastScannedRow was received then this message is passed along
* to the user stream to update the lastRowKey.
*/
next(null, rowData);
} else {
/**
* If the data is just regular rows being pushed from the
* chunk transformer then the rows are encoded so that they
* can be consumed by the user stream.
*/
const row = this.row((rowData as Row).key as string);
row.data = (rowData as Row).data;
next(null, row);
}
},
objectMode: true,
});
Expand Down Expand Up @@ -483,7 +521,7 @@
userStream.emit('error', error);
}
})
.on('data', _ => {

Check warning on line 524 in src/tabular-api-surface.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used
// Reset error count after a successful read so the backoff
// time won't keep increasing when as stream had multiple errors
numConsecutiveErrors = 0;
Expand Down Expand Up @@ -706,7 +744,7 @@
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {

Check warning on line 747 in src/tabular-api-surface.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used
return false;
},
};
Expand Down
1 change: 0 additions & 1 deletion testproxy/known_failures.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ TestReadRow_Retry_WithRetryInfo\|
TestReadRows_ReverseScans_FeatureFlag_Enabled\|
TestReadRows_NoRetry_OutOfOrderError_Reverse\|
TestReadRows_Retry_PausedScan\|
TestReadRows_Retry_LastScannedRow\|
TestReadRows_Retry_LastScannedRow_Reverse\|
TestReadRows_Retry_StreamReset\|
TestReadRows_Generic_CloseClient\|
Expand Down
Loading