Skip to content

Commit 9a8a674

Browse files
akbashevktoso
andauthored
Fixed WorkerPool to handle remote actors properly (#1164)
Co-authored-by: Konrad `ktoso` Malawski <konrad_malawski@apple.com>
1 parent 33a2ef3 commit 9a8a674

File tree

8 files changed

+457
-120
lines changed

8 files changed

+457
-120
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ var dependencies: [Package.Dependency] = [
193193

194194
// ~~~ Swift libraries ~~~
195195
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0-beta"),
196-
.package(url: "https://github.com/apple/swift-collections", from: "1.0.5"),
196+
.package(url: "https://github.com/apple/swift-collections", from: "1.1.0"),
197197

198198
// ~~~ Observability ~~~
199199
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),

Sources/DistributedCluster/ClusterSystem.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1070,7 +1070,7 @@ extension ClusterSystem {
10701070
}
10711071

10721072
// Spawn a behavior actor for it:
1073-
let behavior = InvocationBehavior.behavior(instance: Weak(actor))
1073+
let behavior = InvocationBehavior.behavior(instance: WeakLocalRef(actor))
10741074
let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id)
10751075

10761076
// Store references
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2024 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import DistributedActorsConcurrencyHelpers
16+
import NIOConcurrencyHelpers
17+
18+
/// A checked continuation that offers easier APIs for working with cancellation,
19+
/// as well as has its unique identity.
20+
internal final class ClusterCancellableCheckedContinuation<Success>: Hashable, @unchecked Sendable where Success: Sendable {
21+
private struct _State: Sendable {
22+
var cancelled: Bool = false
23+
var onCancel: (@Sendable (ClusterCancellableCheckedContinuation<Success>) -> Void)?
24+
var continuation: CheckedContinuation<Success, any Error>?
25+
}
26+
27+
private let state: NIOLockedValueBox<_State> = .init(_State())
28+
29+
fileprivate init() {}
30+
31+
func setContinuation(_ continuation: CheckedContinuation<Success, any Error>) -> Bool {
32+
var alreadyCancelled = false
33+
self.state.withLockedValue { state in
34+
if state.cancelled {
35+
alreadyCancelled = true
36+
} else {
37+
state.continuation = continuation
38+
}
39+
}
40+
if alreadyCancelled {
41+
continuation.resume(throwing: CancellationError())
42+
}
43+
return !alreadyCancelled
44+
}
45+
46+
/// Register a cancellation handler, or call it immediately if the continuation was already cancelled.
47+
@Sendable
48+
func onCancel(handler: @Sendable @escaping (ClusterCancellableCheckedContinuation<Success>) -> Void) {
49+
var alreadyCancelled: Bool = self.state.withLockedValue { state in
50+
if state.cancelled {
51+
return true
52+
}
53+
54+
state.onCancel = handler
55+
return false
56+
}
57+
if alreadyCancelled {
58+
handler(self)
59+
}
60+
}
61+
62+
private func withContinuation(cancelled: Bool = false, _ operation: (CheckedContinuation<Success, any Error>) -> Void) {
63+
var safeContinuation: CheckedContinuation<Success, any Error>?
64+
var safeOnCancel: (@Sendable (ClusterCancellableCheckedContinuation<Success>) -> Void)?
65+
self.state.withLockedValue { (state: inout _State) -> Void in
66+
state.cancelled = state.cancelled || cancelled
67+
safeContinuation = state.continuation
68+
safeOnCancel = state.onCancel
69+
state.continuation = nil
70+
state.onCancel = nil
71+
}
72+
if let safeContinuation {
73+
operation(safeContinuation)
74+
}
75+
if cancelled {
76+
safeOnCancel?(self)
77+
}
78+
}
79+
80+
func resume(returning value: Success) {
81+
self.withContinuation {
82+
$0.resume(returning: value)
83+
}
84+
}
85+
86+
func resume(throwing error: any Error) {
87+
self.withContinuation {
88+
$0.resume(throwing: error)
89+
}
90+
}
91+
92+
var isCancelled: Bool {
93+
self.state.withLockedValue { $0.cancelled }
94+
}
95+
96+
func cancel() {
97+
self.withContinuation(cancelled: true) {
98+
$0.resume(throwing: CancellationError())
99+
}
100+
}
101+
}
102+
103+
extension ClusterCancellableCheckedContinuation where Success == Void {
104+
func resume() {
105+
self.resume(returning: ())
106+
}
107+
}
108+
109+
extension ClusterCancellableCheckedContinuation {
110+
static func == (lhs: ClusterCancellableCheckedContinuation, rhs: ClusterCancellableCheckedContinuation) -> Bool {
111+
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
112+
}
113+
114+
func hash(into hasher: inout Hasher) {
115+
hasher.combine(ObjectIdentifier(self))
116+
}
117+
}
118+
119+
func _withClusterCancellableCheckedContinuation<Success>(
120+
of successType: Success.Type = Success.self,
121+
_ body: @escaping (ClusterCancellableCheckedContinuation<Success>) -> Void,
122+
function: String = #function
123+
) async throws -> Success where Success: Sendable {
124+
let cccc = ClusterCancellableCheckedContinuation<Success>()
125+
return try await withTaskCancellationHandler {
126+
try await withCheckedThrowingContinuation(function: function) { continuation in
127+
if cccc.setContinuation(continuation) {
128+
body(cccc)
129+
}
130+
}
131+
} onCancel: {
132+
cccc.cancel()
133+
}
134+
}

Sources/DistributedCluster/InvocationBehavior.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible {
3636

3737
// FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957)
3838
enum InvocationBehavior {
39-
static func behavior(instance weakInstance: Weak<some DistributedActor>) -> _Behavior<InvocationMessage> {
39+
static func behavior(instance weakInstance: WeakLocalRef<some DistributedActor>) -> _Behavior<InvocationMessage> {
4040
_Behavior.setup { context in
4141
._receiveMessageAsync { (message) async throws -> _Behavior<InvocationMessage> in
4242
guard let _ = weakInstance.actor else {

0 commit comments

Comments
 (0)