Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.10",
"version": "1.8.11",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
61 changes: 33 additions & 28 deletions packages/grpc-js/src/retrying-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ export class RetryingCall implements Call {
private initialMetadata: Metadata | null = null;
private underlyingCalls: UnderlyingCall[] = [];
private writeBuffer: WriteBufferEntry[] = [];
/**
* The offset of message indices in the writeBuffer. For example, if
* writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
* is in writeBuffer[5].
*/
private writeBufferOffset = 0;
/**
* Tracks whether a read has been started, so that we know whether to start
* reads on new child calls. This only matters for the first read, because
Expand Down Expand Up @@ -203,14 +209,8 @@ export class RetryingCall implements Call {
private reportStatus(statusObject: StatusObject) {
this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
this.bufferTracker.freeAll(this.callNumber);
for (let i = 0; i < this.writeBuffer.length; i++) {
if (this.writeBuffer[i].entryType === 'MESSAGE') {
this.writeBuffer[i] = {
entryType: 'FREED',
allocated: false
};
}
}
this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
this.writeBuffer = [];
process.nextTick(() => {
// Explicitly construct status object to remove progress field
this.listener?.onReceiveStatus({
Expand All @@ -236,20 +236,27 @@ export class RetryingCall implements Call {
}
}

private maybefreeMessageBufferEntry(messageIndex: number) {
private getBufferEntry(messageIndex: number): WriteBufferEntry {
return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false};
}

private getNextBufferIndex() {
return this.writeBufferOffset + this.writeBuffer.length;
}

private clearSentMessages() {
if (this.state !== 'COMMITTED') {
return;
}
const bufferEntry = this.writeBuffer[messageIndex];
if (bufferEntry.entryType === 'MESSAGE') {
const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) {
const bufferEntry = this.getBufferEntry(messageIndex);
if (bufferEntry.allocated) {
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
}
this.writeBuffer[messageIndex] = {
entryType: 'FREED',
allocated: false
};
}
this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset);
this.writeBufferOffset = earliestNeededMessageIndex;
}

private commitCall(index: number) {
Expand All @@ -272,9 +279,7 @@ export class RetryingCall implements Call {
this.underlyingCalls[i].state = 'COMPLETED';
this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
}
for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
this.maybefreeMessageBufferEntry(messageIndex);
}
this.clearSentMessages();
}

private commitCallWithMostMessages() {
Expand Down Expand Up @@ -555,8 +560,8 @@ export class RetryingCall implements Call {
private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend;
this.writeBuffer[messageIndex].callback?.();
this.maybefreeMessageBufferEntry(messageIndex);
this.getBufferEntry(messageIndex).callback?.();
this.clearSentMessages();
childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex);
}
Expand All @@ -566,10 +571,10 @@ export class RetryingCall implements Call {
if (childCall.state === 'COMPLETED') {
return;
}
if (this.writeBuffer[childCall.nextMessageToSend]) {
const bufferEntry = this.writeBuffer[childCall.nextMessageToSend];
if (this.getBufferEntry(childCall.nextMessageToSend)) {
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
switch (bufferEntry.entryType) {
case 'MESSAGE':
case 'MESSAGE':
childCall.call.sendMessageWithContext({
callback: (error) => {
// Ignore error
Expand All @@ -594,13 +599,13 @@ export class RetryingCall implements Call {
message,
flags: context.flags,
};
const messageIndex = this.writeBuffer.length;
const messageIndex = this.getNextBufferIndex();
const bufferEntry: WriteBufferEntry = {
entryType: 'MESSAGE',
message: writeObj,
allocated: this.bufferTracker.allocate(message.length, this.callNumber)
};
this.writeBuffer[messageIndex] = bufferEntry;
this.writeBuffer.push(bufferEntry);
if (bufferEntry.allocated) {
context.callback?.();
for (const [callIndex, call] of this.underlyingCalls.entries()) {
Expand Down Expand Up @@ -642,11 +647,11 @@ export class RetryingCall implements Call {
}
halfClose(): void {
this.trace('halfClose called');
const halfCloseIndex = this.writeBuffer.length;
this.writeBuffer[halfCloseIndex] = {
const halfCloseIndex = this.getNextBufferIndex();
this.writeBuffer.push({
entryType: 'HALF_CLOSE',
allocated: false
};
});
for (const call of this.underlyingCalls) {
if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
call.nextMessageToSend += 1;
Expand Down