Skip to content

Commit abd10cc

Browse files
authored
fix: update grpc.max_metadata_size to 4MiB for exactly-once, and shift ack/modack errors to 'debug' stream channel (#1505)
* fix: update grpc.max_metadata_size to 4MiB for exactly-once * fix: change ack and modAck errors to be optional debug warnings * tests: update testing to match earlier changes * fix: check against undefined, not falsey
1 parent 18b7e5d commit abd10cc

File tree

7 files changed

+65
-8
lines changed

7 files changed

+65
-8
lines changed

src/message-queues.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,10 @@ export abstract class MessageQueue {
141141
try {
142142
await this._sendBatch(batch);
143143
} catch (e) {
144-
this._subscriber.emit('error', e);
144+
// These queues are used for ack and modAck messages, which should
145+
// never surface an error to the user level. However, we'll emit
146+
// them onto this debug channel in case debug info is needed.
147+
this._subscriber.emit('debug', e);
145148
}
146149

147150
this.numInFlightRequests -= batchSize;

src/pubsub.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,17 @@ export class PubSub {
272272
private schemaClient?: SchemaServiceClient;
273273

274274
constructor(options?: ClientConfig) {
275-
options = options || {};
275+
options = Object.assign({}, options || {});
276+
277+
// Needed for potentially large responses that may come from using exactly-once delivery.
278+
// This will get passed down to grpc client objects.
279+
const maxMetadataSize = 'grpc.max_metadata_size';
280+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
281+
const optionsAny = options as any;
282+
if (optionsAny[maxMetadataSize] === undefined) {
283+
optionsAny[maxMetadataSize] = 4 * 1024 * 1024; // 4 MiB
284+
}
285+
276286
// Determine what scopes are needed.
277287
// It is the union of the scopes on both clients.
278288
const clientClasses = [v1.SubscriberClient, v1.PublisherClient];

src/subscriber.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ export class Subscriber extends EventEmitter {
393393

394394
this._stream
395395
.on('error', err => this.emit('error', err))
396+
.on('debug', err => this.emit('debug', err))
396397
.on('data', (data: PullResponse) => this._onData(data))
397398
.once('close', () => this.close());
398399

src/subscription.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ export type DetachSubscriptionResponse = EmptyResponse;
104104
listener: (error: StatusError) => void
105105
): this;
106106
on(event: 'close', listener: () => void): this;
107+
on(event: 'debug', listener: (error: StatusError) => void); this;
107108
108109
// Only used internally.
109110
on(event: 'newListener', listener: Function): this;
@@ -154,6 +155,9 @@ export type DetachSubscriptionResponse = EmptyResponse;
154155
* Upon receipt of an error:
155156
* on(event: 'error', listener: (error: Error) => void): this;
156157
*
158+
* Upon receipt of a (non-fatal) debug warning:
159+
* on(event: 'debug', listener: (error: Error) => void): this;
160+
*
157161
* Upon the closing of the subscriber:
158162
* on(event: 'close', listener: Function): this;
159163
*
@@ -220,6 +224,9 @@ export type DetachSubscriptionResponse = EmptyResponse;
220224
* // Register an error handler.
221225
* subscription.on('error', (err) => {});
222226
*
227+
* // Register a debug handler, to catch non-fatal errors.
228+
* subscription.on('debug', (err) => { console.error(err); });
229+
*
223230
* // Register a close handler in case the subscriber closes unexpectedly
224231
* subscription.on('close', () => {});
225232
*
@@ -318,6 +325,7 @@ export class Subscription extends EventEmitter {
318325
this._subscriber = new Subscriber(this, options);
319326
this._subscriber
320327
.on('error', err => this.emit('error', err))
328+
.on('debug', err => this.emit('debug', err))
321329
.on('message', message => this.emit('message', message))
322330
.on('close', () => this.emit('close'));
323331

test/message-queues.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,12 @@ describe('MessageQueues', () => {
195195
assert.deepStrictEqual(batch, expectedBatch);
196196
});
197197

198-
it('should emit any errors', done => {
198+
it('should emit any errors as debug events', done => {
199199
const fakeError = new Error('err');
200200

201201
sandbox.stub(messageQueue.batches, 'push').throws(fakeError);
202202

203-
subscriber.on('error', err => {
203+
subscriber.on('debug', err => {
204204
assert.strictEqual(err, fakeError);
205205
done();
206206
});
@@ -362,7 +362,7 @@ describe('MessageQueues', () => {
362362
assert.strictEqual(callOptions, fakeCallOptions);
363363
});
364364

365-
it('should throw a BatchError if unable to ack', done => {
365+
it('should throw a BatchError on "debug" if unable to ack', done => {
366366
const messages = [
367367
new FakeMessage(),
368368
new FakeMessage(),
@@ -380,7 +380,7 @@ describe('MessageQueues', () => {
380380

381381
sandbox.stub(subscriber.client, 'acknowledge').rejects(fakeError);
382382

383-
subscriber.on('error', (err: BatchError) => {
383+
subscriber.on('debug', (err: BatchError) => {
384384
assert.strictEqual(err.message, expectedMessage);
385385
assert.deepStrictEqual(err.ackIds, ackIds);
386386
assert.strictEqual(err.code, fakeError.code);
@@ -487,7 +487,7 @@ describe('MessageQueues', () => {
487487
assert.strictEqual(callOptions, fakeCallOptions);
488488
});
489489

490-
it('should throw a BatchError if unable to modAck', done => {
490+
it('should throw a BatchError on "debug" if unable to modAck', done => {
491491
const messages = [
492492
new FakeMessage(),
493493
new FakeMessage(),
@@ -505,7 +505,7 @@ describe('MessageQueues', () => {
505505

506506
sandbox.stub(subscriber.client, 'modifyAckDeadline').rejects(fakeError);
507507

508-
subscriber.on('error', (err: BatchError) => {
508+
subscriber.on('debug', (err: BatchError) => {
509509
assert.strictEqual(err.message, expectedMessage);
510510
assert.deepStrictEqual(err.ackIds, ackIds);
511511
assert.strictEqual(err.code, fakeError.code);

test/pubsub.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,12 @@ describe('PubSub', () => {
196196
});
197197

198198
describe('instantiation', () => {
199+
const maxMetadataSizeKey = 'grpc.max_metadata_size';
199200
const DEFAULT_OPTIONS = {
200201
libName: 'gccl',
201202
libVersion: PKG.version,
202203
scopes: [],
204+
[maxMetadataSizeKey]: 4 * 1024 * 1024,
203205
};
204206

205207
it('should extend the correct methods', () => {
@@ -220,6 +222,20 @@ describe('PubSub', () => {
220222
assert(new PubSub() instanceof PubSub);
221223
});
222224

225+
it('should augment the gRPC options for metadata size', () => {
226+
let pubsub = new PubSub();
227+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
228+
let optionsAny: any = pubsub.options;
229+
assert.strictEqual(optionsAny[maxMetadataSizeKey], 4 * 1024 * 1024);
230+
231+
optionsAny = {
232+
[maxMetadataSizeKey]: 1 * 1024 * 1024,
233+
};
234+
pubsub = new PubSub(optionsAny);
235+
optionsAny = pubsub.options;
236+
assert.strictEqual(optionsAny[maxMetadataSizeKey], 1 * 1024 * 1024);
237+
});
238+
223239
it('should combine all required scopes', () => {
224240
v1ClientOverrides.SubscriberClient = {};
225241
v1ClientOverrides.SubscriberClient.scopes = ['a', 'b', 'c'];

test/subscription.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,25 @@ describe('Subscription', () => {
509509
});
510510
});
511511

512+
describe('debug', () => {
513+
const error = new Error('err') as ServiceError;
514+
515+
beforeEach(() => {
516+
subscription.request = (config, callback) => {
517+
callback(error);
518+
};
519+
});
520+
521+
it('should return the debug events to the callback', done => {
522+
subscription.on('debug', err => {
523+
assert.strictEqual(err, error);
524+
done();
525+
});
526+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
527+
(subscription as any)._subscriber.emit('debug', error);
528+
});
529+
});
530+
512531
describe('delete', () => {
513532
beforeEach(() => {
514533
sandbox.stub(subscription, 'removeAllListeners').yields(util.noop);

0 commit comments

Comments
 (0)