Skip to content

Commit 193b00f

Browse files
authored
Merge pull request #5166 from cloudflare/felix/092325-exception-handling
EW-9453 Implement proper STW exception propagation
2 parents edb686a + 687a17d commit 193b00f

File tree

1 file changed

+30
-26
lines changed

1 file changed

+30
-26
lines changed

src/workerd/io/trace-stream.c++

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@
1212
namespace workerd::tracing {
1313
namespace {
1414

15-
// Uniquely identifies js tail session failures
16-
constexpr kj::Exception::DetailTypeId TAIL_STREAM_JS_FAILURE = 0xcde53d65a46183f7;
17-
1815
#define STRS(V) \
1916
V(ALARM, "alarm") \
2017
V(ATTRIBUTES, "attributes") \
@@ -606,10 +603,9 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
606603

607604
auto ownReportContext = capnp::CallContextHook::from(reportContext).addRef();
608605

609-
auto promise =
610-
ioContext
611-
.run([this, &ioContext, reportContext, ownReportContext = kj::mv(ownReportContext)](
612-
Worker::Lock& lock) mutable -> kj::Promise<void> {
606+
auto promise = ioContext.run(
607+
[this, &ioContext, reportContext, ownReportContext = kj::mv(ownReportContext)](
608+
Worker::Lock& lock) mutable -> kj::Promise<void> {
613609
auto params = reportContext.getParams();
614610
KJ_ASSERT(params.hasEvents(), "Events are required.");
615611
auto eventReaders = params.getEvents();
@@ -638,19 +634,26 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
638634
} else {
639635
return kj::mv(result);
640636
}
641-
}).catch_([](kj::Exception&& e) {
637+
});
638+
639+
auto paf = kj::newPromiseAndFulfiller<void>();
640+
promise = promise.then([&fulfiller = *paf.fulfiller]() { fulfiller.fulfill(); },
641+
[&, &fulfiller = *paf.fulfiller](kj::Exception&& e) {
642+
// This is the top level exception catcher for tail events being delivered. We do not want to
643+
// propagate JS exceptions to the client side here, all exceptions should stay within this
644+
// customEvent. Instead, we propagate the exception to the doneFulfiller, where it is used to
645+
// set the right outcome code and re-thrown if appropriate. By rejecting the doneFulfiller, we
646+
// also ensure that no more tail events get delivered.
642647
if (jsg::isTunneledException(e.getDescription())) {
643648
auto description = jsg::stripRemoteExceptionPrefix(e.getDescription());
644649
if (!description.startsWith("remote.")) {
645650
e.setDescription(kj::str("remote.", description));
646651
}
647652
}
648-
kj::throwFatalException(kj::mv(e));
653+
// We still fulfill this fulfiller to disarm the cancellation check below
654+
fulfiller.fulfill();
655+
doneFulfiller->reject(kj::mv(e));
649656
});
650-
651-
auto paf = kj::newPromiseAndFulfiller<void>();
652-
promise = promise.then([&fulfiller = *paf.fulfiller]() { fulfiller.fulfill(); },
653-
[&fulfiller = *paf.fulfiller](kj::Exception&& e) { fulfiller.reject(kj::mv(e)); });
654657
promise = promise.attach(kj::defer([fulfiller = kj::mv(paf.fulfiller)]() mutable {
655658
if (fulfiller->isWaiting()) {
656659
fulfiller->reject(JSG_KJ_EXCEPTION(FAILED, Error,
@@ -764,10 +767,11 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
764767
results->get().setStop(true);
765768
doneFulfiller->fulfill();
766769
}),
767-
ioContext.addFunctor([this, results = sharedResults.addRef()](
768-
jsg::Lock& js, jsg::Value&& error) mutable {
770+
ioContext.addFunctor(
771+
[results = sharedResults.addRef()](jsg::Lock& js, jsg::Value&& error) mutable {
772+
// Received a JS error. Do not reject doneFulfiller yet, this will be handled when we catch
773+
// the exception later.
769774
results->get().setStop(true);
770-
doneFulfiller->fulfill();
771775
js.throwException(kj::mv(error));
772776
})));
773777
} catch (...) {
@@ -860,15 +864,11 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
860864
// the doneFulfiller afterwards, indicating that TailStreamTarget has received all events over
861865
// the stream and has done all its work, that the stream self-evidently did not get canceled
862866
// prematurely. This applies even if promises were rejected.
867+
// No need to catch exceptions here: They will be handled in report() alongside exceptions
868+
// from the onset event etc. JSG knows how JS exceptions look like, so we don't need an
869+
// identifier for them.
863870
if (doFulfill) {
864-
p = p.then(js, [&](jsg::Lock& js) { doneFulfiller->fulfill(); },
865-
[&](jsg::Lock& js, jsg::Value&& value) {
866-
// Convert the JS exception to a KJ exception, preserving all details
867-
kj::Exception exception = js.exceptionToKj(kj::mv(value));
868-
// Mark this as a tail stream failure for proper classification
869-
exception.setDetail(TAIL_STREAM_JS_FAILURE, kj::heapArray<kj::byte>(0));
870-
doneFulfiller->reject(kj::mv(exception));
871-
});
871+
p = p.then(js, [&](jsg::Lock& js) { doneFulfiller->fulfill(); });
872872
}
873873
return ioContext.awaitJs(js, kj::mv(p));
874874
}
@@ -916,8 +916,12 @@ kj::Promise<WorkerInterface::CustomEvent::Result> TailStreamCustomEventImpl::run
916916

917917
auto eventOutcome = co_await donePromise.exclusiveJoin(ioContext.onAbort()).then([&]() {
918918
return ioContext.waitUntilStatus();
919-
}, [](kj::Exception&& e) {
920-
if (e.getDetail(TAIL_STREAM_JS_FAILURE) != kj::none) {
919+
}, [&incomingRequest](kj::Exception&& e) {
920+
// If we have a JSG exception, just set the appropriate return code – this will already have
921+
// been logged and we do not need to treat it like a KJ exception. Otherwise, re-throw the
922+
// exception.
923+
if (jsg::isTunneledException(e.getDescription())) {
924+
incomingRequest->getMetrics().reportFailure(e);
921925
return EventOutcome::EXCEPTION;
922926
}
923927
kj::throwRecoverableException(kj::mv(e));

0 commit comments

Comments
 (0)