Skip to content

Commit 7d64e35

Browse files
authored
fix: unblock local puts without downstream ack (#2089)
1 parent 2819cd8 commit 7d64e35

File tree

1 file changed

+38
-9
lines changed
  • crates/core/src/operations

1 file changed

+38
-9
lines changed

crates/core/src/operations/put.rs

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -295,15 +295,44 @@ impl Operation for PutOp {
295295
htl: *htl,
296296
});
297297

298-
// Transition to AwaitingResponse state to handle future SuccessfulPut messages
299-
new_state = Some(PutState::AwaitingResponse {
300-
key,
301-
upstream: Some(prev_sender.clone()),
302-
contract: contract.clone(),
303-
state: modified_value,
304-
subscribe,
305-
origin: origin.clone(),
306-
});
298+
// When we're the origin node we already seeded the contract locally.
299+
// Treat downstream SuccessfulPut messages as best-effort so River is unblocked.
300+
if origin.peer == own_location.peer {
301+
tracing::debug!(
302+
tx = %id,
303+
%key,
304+
"Origin node finishing PUT without waiting for SuccessfulPut ack"
305+
);
306+
307+
if subscribe {
308+
if !op_manager.failed_parents().contains(id) {
309+
let child_tx =
310+
super::start_subscription_request(op_manager, *id, key);
311+
tracing::debug!(
312+
tx = %id,
313+
%child_tx,
314+
"started subscription as child operation"
315+
);
316+
} else {
317+
tracing::warn!(
318+
tx = %id,
319+
"not starting subscription for failed parent operation"
320+
);
321+
}
322+
}
323+
324+
new_state = Some(PutState::Finished { key });
325+
} else {
326+
// Transition to AwaitingResponse state to handle future SuccessfulPut messages
327+
new_state = Some(PutState::AwaitingResponse {
328+
key,
329+
upstream: Some(prev_sender.clone()),
330+
contract: contract.clone(),
331+
state: modified_value,
332+
subscribe,
333+
origin: origin.clone(),
334+
});
335+
}
307336
} else {
308337
// No other peers to forward to - we're the final destination
309338
tracing::warn!(

0 commit comments

Comments
 (0)