Skip to content

Commit 6377c1d

Browse files
authored
fix: Reinitialize streams in an unbounded executor (#1027)
Stream initializations can sometimes be stalled, which starves tasks using the alarm executor. This performs reinitialization in an unbounded (futures) executor.
1 parent 3a50664 commit 6377c1d

File tree

1 file changed

+16
-11
lines changed

1 file changed

+16
-11
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -199,17 +199,22 @@ public final void onError(Throwable t) {
199199
backoffTime, streamDescription());
200200
ScheduledFuture<?> retry =
201201
SystemExecutors.getAlarmExecutor()
202-
.schedule(
203-
() -> {
204-
try {
205-
observer.triggerReinitialize(statusOr.get());
206-
} catch (Throwable t2) {
207-
logger.atWarning().withCause(t2).log("Error occurred in triggerReinitialize.");
208-
onError(t2);
209-
}
210-
},
211-
backoffTime,
212-
MILLISECONDS);
202+
.schedule(() -> triggerReinitialize(statusOr.get()), backoffTime, MILLISECONDS);
203+
}
204+
205+
private void triggerReinitialize(CheckedApiException streamError) {
206+
// Reinitialize in an unbounded executor to avoid starving tasks using the bounded alarm
207+
// executor.
208+
SystemExecutors.getFuturesExecutor()
209+
.execute(
210+
() -> {
211+
try {
212+
observer.triggerReinitialize(streamError);
213+
} catch (Throwable t) {
214+
logger.atWarning().withCause(t).log("Error occurred in triggerReinitialize.");
215+
onError(t);
216+
}
217+
});
213218
}
214219

215220
@Override

0 commit comments

Comments
 (0)