Skip to content

Commit 38708a8

Browse files
committed
feat(observability): trace BatchTransaction and Table
This change is part of a series of changes to add OpenTelemetry traces, focused on BatchTransaction and Table. While here, made the tests for sessionPool spans much more precise to avoid flakes. Updates #2079 Built from PR #2087 Updates #2114
1 parent 3300ab5 commit 38708a8

File tree

7 files changed

+556
-96
lines changed

7 files changed

+556
-96
lines changed

src/batch-transaction.ts

Lines changed: 108 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
CLOUD_RESOURCE_HEADER,
2626
addLeaderAwareRoutingHeader,
2727
} from '../src/common';
28+
import {startTrace, setSpanError, traceConfig} from './instrument';
2829

2930
export interface TransactionIdentifier {
3031
session: string | Session;
@@ -136,20 +137,37 @@ class BatchTransaction extends Snapshot {
136137
delete reqOpts.gaxOptions;
137138
delete reqOpts.types;
138139

139-
const headers: {[k: string]: string} = {};
140-
if (this._getSpanner().routeToLeaderEnabled) {
141-
addLeaderAwareRoutingHeader(headers);
142-
}
140+
const traceConfig: traceConfig = {
141+
sql: query,
142+
opts: this.observabilityOptions,
143+
};
144+
return startTrace(
145+
'BatchTransaction.createQueryPartitions',
146+
traceConfig,
147+
span => {
148+
const headers: {[k: string]: string} = {};
149+
if (this._getSpanner().routeToLeaderEnabled) {
150+
addLeaderAwareRoutingHeader(headers);
151+
}
143152

144-
this.createPartitions_(
145-
{
146-
client: 'SpannerClient',
147-
method: 'partitionQuery',
148-
reqOpts,
149-
gaxOpts: query.gaxOptions,
150-
headers: headers,
151-
},
152-
callback
153+
this.createPartitions_(
154+
{
155+
client: 'SpannerClient',
156+
method: 'partitionQuery',
157+
reqOpts,
158+
gaxOpts: query.gaxOptions,
159+
headers: headers,
160+
},
161+
(err, partitions, resp) => {
162+
if (err) {
163+
setSpanError(span, err);
164+
}
165+
166+
span.end();
167+
callback(err, partitions, resp);
168+
}
169+
);
170+
}
153171
);
154172
}
155173
/**
@@ -163,38 +181,52 @@ class BatchTransaction extends Snapshot {
163181
* @param {function} callback Callback function.
164182
*/
165183
createPartitions_(config, callback) {
166-
const query = extend({}, config.reqOpts, {
167-
session: this.session.formattedName_,
168-
transaction: {id: this.id},
169-
});
170-
config.reqOpts = extend({}, query);
171-
config.headers = {
172-
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
184+
const traceConfig: traceConfig = {
185+
opts: this.observabilityOptions,
173186
};
174-
delete query.partitionOptions;
175-
this.session.request(config, (err, resp) => {
176-
if (err) {
177-
callback(err, null, resp);
178-
return;
179-
}
180187

181-
const partitions = resp.partitions.map(partition => {
182-
return extend({}, query, partition);
183-
});
188+
return startTrace(
189+
'BatchTransaction.createPartitions_',
190+
traceConfig,
191+
span => {
192+
const query = extend({}, config.reqOpts, {
193+
session: this.session.formattedName_,
194+
transaction: {id: this.id},
195+
});
196+
config.reqOpts = extend({}, query);
197+
config.headers = {
198+
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database)
199+
.formattedName_,
200+
};
201+
delete query.partitionOptions;
202+
this.session.request(config, (err, resp) => {
203+
if (err) {
204+
setSpanError(span, err);
205+
span.end();
206+
callback(err, null, resp);
207+
return;
208+
}
184209

185-
if (resp.transaction) {
186-
const {id, readTimestamp} = resp.transaction;
210+
const partitions = resp.partitions.map(partition => {
211+
return extend({}, query, partition);
212+
});
187213

188-
this.id = id;
214+
if (resp.transaction) {
215+
const {id, readTimestamp} = resp.transaction;
189216

190-
if (readTimestamp) {
191-
this.readTimestampProto = readTimestamp;
192-
this.readTimestamp = new PreciseDate(readTimestamp);
193-
}
194-
}
217+
this.id = id;
218+
219+
if (readTimestamp) {
220+
this.readTimestampProto = readTimestamp;
221+
this.readTimestamp = new PreciseDate(readTimestamp);
222+
}
223+
}
195224

196-
callback(null, partitions, resp);
197-
});
225+
span.end();
226+
callback(null, partitions, resp);
227+
});
228+
}
229+
);
198230
}
199231
/**
200232
* @typedef {object} ReadPartition
@@ -226,28 +258,45 @@ class BatchTransaction extends Snapshot {
226258
* @returns {Promise<CreateReadPartitionsResponse>}
227259
*/
228260
createReadPartitions(options, callback) {
229-
const reqOpts = Object.assign({}, options, {
230-
keySet: Snapshot.encodeKeySet(options),
231-
});
261+
const traceConfig: traceConfig = {
262+
opts: this.observabilityOptions,
263+
};
232264

233-
delete reqOpts.gaxOptions;
234-
delete reqOpts.keys;
235-
delete reqOpts.ranges;
265+
return startTrace(
266+
'BatchTransaction.createReadPartitions',
267+
traceConfig,
268+
span => {
269+
const reqOpts = Object.assign({}, options, {
270+
keySet: Snapshot.encodeKeySet(options),
271+
});
236272

237-
const headers: {[k: string]: string} = {};
238-
if (this._getSpanner().routeToLeaderEnabled) {
239-
addLeaderAwareRoutingHeader(headers);
240-
}
273+
delete reqOpts.gaxOptions;
274+
delete reqOpts.keys;
275+
delete reqOpts.ranges;
241276

242-
this.createPartitions_(
243-
{
244-
client: 'SpannerClient',
245-
method: 'partitionRead',
246-
reqOpts,
247-
gaxOpts: options.gaxOptions,
248-
headers: headers,
249-
},
250-
callback
277+
const headers: {[k: string]: string} = {};
278+
if (this._getSpanner().routeToLeaderEnabled) {
279+
addLeaderAwareRoutingHeader(headers);
280+
}
281+
282+
this.createPartitions_(
283+
{
284+
client: 'SpannerClient',
285+
method: 'partitionRead',
286+
reqOpts,
287+
gaxOpts: options.gaxOptions,
288+
headers: headers,
289+
},
290+
(err, partitions, resp) => {
291+
if (err) {
292+
setSpanError(span, err);
293+
}
294+
295+
span.end();
296+
callback(err, partitions, resp);
297+
}
298+
);
299+
}
251300
);
252301
}
253302
/**
@@ -322,6 +371,7 @@ class BatchTransaction extends Snapshot {
322371
* ```
323372
*/
324373
executeStream(partition) {
374+
// TODO: Instrument the streams with Otel.
325375
if (is.string(partition.table)) {
326376
return this.createReadStream(partition.table, partition);
327377
}

src/instrument.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ interface SQLStatement {
4747

4848
interface observabilityOptions {
4949
tracerProvider: TracerProvider;
50-
enableExtendedTracing: boolean;
50+
enableExtendedTracing?: boolean;
5151
}
5252

5353
export type {observabilityOptions as ObservabilityOptions};
@@ -81,7 +81,7 @@ interface traceConfig {
8181
}
8282

8383
const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix.
84-
export {SPAN_NAMESPACE_PREFIX};
84+
export {SPAN_NAMESPACE_PREFIX, traceConfig};
8585

8686
/**
8787
* startTrace begins an active span in the current active context

src/table.ts

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ import {
3131
import {google as databaseAdmin} from '../protos/protos';
3232
import {Schema, LongRunningCallback} from './common';
3333
import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions;
34+
import {
35+
ObservabilityOptions,
36+
startTrace,
37+
setSpanError,
38+
traceConfig,
39+
} from './instrument';
3440

3541
export type Key = string | string[];
3642

@@ -93,6 +99,7 @@ const POSTGRESQL = 'POSTGRESQL';
9399
class Table {
94100
database: Database;
95101
name: string;
102+
observabilityOptions?: ObservabilityOptions;
96103
constructor(database: Database, name: string) {
97104
/**
98105
* The {@link Database} instance of this {@link Table} instance.
@@ -1072,29 +1079,43 @@ class Table {
10721079
options: MutateRowsOptions | CallOptions = {},
10731080
callback: CommitCallback
10741081
): void {
1075-
const requestOptions =
1076-
'requestOptions' in options ? options.requestOptions : {};
1082+
const traceConfig: traceConfig = {
1083+
opts: this.observabilityOptions,
1084+
};
10771085

1078-
const excludeTxnFromChangeStreams =
1079-
'excludeTxnFromChangeStreams' in options
1080-
? options.excludeTxnFromChangeStreams
1081-
: false;
1086+
startTrace('Table.' + method, traceConfig, span => {
1087+
const requestOptions =
1088+
'requestOptions' in options ? options.requestOptions : {};
10821089

1083-
this.database.runTransaction(
1084-
{
1085-
requestOptions: requestOptions,
1086-
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
1087-
},
1088-
(err, transaction) => {
1089-
if (err) {
1090-
callback(err);
1091-
return;
1092-
}
1090+
const excludeTxnFromChangeStreams =
1091+
'excludeTxnFromChangeStreams' in options
1092+
? options.excludeTxnFromChangeStreams
1093+
: false;
10931094

1094-
transaction![method](this.name, rows as Key[]);
1095-
transaction!.commit(options, callback);
1096-
}
1097-
);
1095+
this.database.runTransaction(
1096+
{
1097+
requestOptions: requestOptions,
1098+
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
1099+
},
1100+
(err, transaction) => {
1101+
if (err) {
1102+
setSpanError(span, err);
1103+
span.end();
1104+
callback(err);
1105+
return;
1106+
}
1107+
1108+
transaction![method](this.name, rows as Key[]);
1109+
transaction!.commit(options, (err, resp) => {
1110+
if (err) {
1111+
setSpanError(span, err);
1112+
}
1113+
span.end();
1114+
callback(err, resp);
1115+
});
1116+
}
1117+
);
1118+
});
10981119
}
10991120
}
11001121

src/transaction.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions;
4646
import IRequestOptions = google.spanner.v1.IRequestOptions;
4747
import {Database, Spanner} from '.';
4848
import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
49+
import {ObservabilityOptions} from './instrument';
4950

5051
export type Rows = Array<Row | Json>;
5152
const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo';
@@ -285,6 +286,7 @@ export class Snapshot extends EventEmitter {
285286
queryOptions?: IQueryOptions;
286287
resourceHeader_: {[k: string]: string};
287288
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
289+
observabilityOptions?: ObservabilityOptions;
288290

289291
/**
290292
* The transaction ID.

0 commit comments

Comments
 (0)