Skip to content

Commit 6c9aa15

Browse files
authored
Finishing clusterd (#1165)
1 parent e88823b commit 6c9aa15

File tree

8 files changed

+302
-42
lines changed

8 files changed

+302
-42
lines changed

Package.swift

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,14 @@ var targets: [PackageDescription.Target] = [
121121
.product(name: "ArgumentParser", package: "swift-argument-parser"),
122122
]
123123
),
124-
124+
.executableTarget(
125+
name: "swift-clusterd",
126+
dependencies: [
127+
"DistributedCluster",
128+
.product(name: "ArgumentParser", package: "swift-argument-parser"),
129+
],
130+
path: "Sources/Clusterd"
131+
),
125132
// ==== ------------------------------------------------------------------------------------------------------------
126133
// MARK: Multi Node Tests
127134

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2020-2024 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import ArgumentParser
16+
import DistributedCluster
17+
import Logging
18+
19+
@main
20+
struct ClusterDBoot: AsyncParsableCommand {
21+
@Option(name: .shortAndLong, help: "The port to bind the cluster daemon on.")
22+
var port: Int = ClusterDaemon.defaultEndpoint.port
23+
24+
@Option(help: "The host address to bid the cluster daemon on.")
25+
var host: String = ClusterDaemon.defaultEndpoint.host
26+
27+
func run() async throws {
28+
let daemon = await ClusterSystem.startClusterDaemon(configuredWith: self.configure)
29+
30+
#if DEBUG
31+
daemon.system.log.warning("RUNNING ClusterD DEBUG binary, operation is likely to be negatively affected. Please build/run the ClusterD process using '-c release' configuration!")
32+
#endif
33+
34+
try daemon.system.park()
35+
}
36+
37+
func configure(_ settings: inout ClusterSystemSettings) {
38+
// other nodes will be discovering us, not the opposite
39+
settings.discovery = .init(static: [])
40+
41+
settings.endpoint = Cluster.Endpoint(
42+
systemName: "clusterd",
43+
host: self.host,
44+
port: self.port
45+
)
46+
}
47+
}

Sources/DistributedCluster/Cluster/DiscoveryShell.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ final class DiscoveryShell {
3434

3535
var behavior: _Behavior<Message> {
3636
.setup { context in
37+
// FIXME: should have a behavior to bridge the async world...
38+
context.log.info("Initializing discovery: \(self.settings.implementation)")
39+
// Try to initialise clusterd if needed
40+
self.settings.initializeClusterd(context.system)
41+
context.log.info("Initializing discovery, done.")
42+
3743
self.subscription = self.settings.subscribe(
3844
onNext: { result in
3945
switch result {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Distributed Actors open source project
4+
//
5+
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of Swift Distributed Actors project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Atomics
16+
import Backtrace
17+
import CDistributedActorsMailbox
18+
import Dispatch
19+
import Distributed
20+
import DistributedActorsConcurrencyHelpers
21+
import Foundation // for UUID
22+
import Logging
23+
import NIO
24+
25+
extension ClusterSystem {
26+
public static func startClusterDaemon(configuredWith configureSettings: (inout ClusterSystemSettings) -> Void = { _ in () }) async -> ClusterDaemon {
27+
let system = await ClusterSystem("clusterd") { settings in
28+
settings.endpoint = ClusterDaemon.defaultEndpoint
29+
configureSettings(&settings)
30+
}
31+
32+
return ClusterDaemon(system: system)
33+
}
34+
}
35+
36+
public struct ClusterDaemon {
37+
public let system: ClusterSystem
38+
public var settings: ClusterSystemSettings {
39+
self.system.settings
40+
}
41+
42+
public init(system: ClusterSystem) {
43+
self.system = system
44+
}
45+
}
46+
47+
extension ClusterDaemon {
48+
/// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown()``.
49+
public var terminated: Void {
50+
get async throws {
51+
try await self.system.terminated
52+
}
53+
}
54+
55+
/// Returns `true` if the system was already successfully terminated (i.e. awaiting ``terminated`` would resume immediately).
56+
public var isTerminated: Bool {
57+
self.system.isTerminated
58+
}
59+
60+
/// Forcefully stops this actor system and all actors that live within it.
61+
/// This is an asynchronous operation and will be executed on a separate thread.
62+
///
63+
/// You can use `shutdown().wait()` to synchronously await on the system's termination,
64+
/// or provide a callback to be executed after the system has completed it's shutdown.
65+
///
66+
/// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown.
67+
@discardableResult
68+
public func shutdown() throws -> ClusterSystem.Shutdown {
69+
try self.system.shutdown()
70+
}
71+
}
72+
73+
extension ClusterDaemon {
74+
/// The default endpoint
75+
public static let defaultEndpoint = Cluster.Endpoint(host: "127.0.0.1", port: 3137)
76+
}
77+
78+
internal distributed actor ClusterDaemonServant {
79+
typealias ActorSystem = ClusterSystem
80+
81+
@ActorID.Metadata(\.wellKnown)
82+
public var wellKnownName: String
83+
84+
init(system: ClusterSystem) async {
85+
self.actorSystem = system
86+
self.wellKnownName = "$cluster-daemon-servant"
87+
}
88+
}

Sources/DistributedCluster/ClusterSystemSettings.swift

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -405,18 +405,19 @@ protocol ClusterSystemInstrumentationProvider {
405405
/// all the nodes of an existing cluster.
406406
public struct ServiceDiscoverySettings {
407407
let implementation: ServiceDiscoveryImplementation
408-
private let _subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?
409408

410409
public init<Discovery, S>(_ implementation: Discovery, service: S)
411410
where
412411
Discovery: ServiceDiscovery,
413412
Discovery.Instance == Cluster.Endpoint,
414413
S == Discovery.Service
415414
{
416-
self.implementation = .dynamic(AnyServiceDiscovery(implementation))
417-
self._subscribe = { onNext, onComplete in
418-
implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete)
419-
}
415+
self.implementation = .dynamic(
416+
serviceDiscovery: AnyServiceDiscovery(implementation),
417+
subscribe: { onNext, onComplete in
418+
implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete)
419+
}
420+
)
420421
}
421422

422423
public init<Discovery, S>(_ implementation: Discovery, service: S, mapInstanceToNode transformer: @escaping (Discovery.Instance) throws -> Cluster.Endpoint)
@@ -425,33 +426,78 @@ public struct ServiceDiscoverySettings {
425426
S == Discovery.Service
426427
{
427428
let mappedDiscovery: MapInstanceServiceDiscovery<Discovery, Cluster.Endpoint> = implementation.mapInstance(transformer)
428-
self.implementation = .dynamic(AnyServiceDiscovery(mappedDiscovery))
429-
self._subscribe = { onNext, onComplete in
430-
mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete)
429+
self.implementation = .dynamic(
430+
serviceDiscovery: AnyServiceDiscovery(mappedDiscovery),
431+
subscribe: { onNext, onComplete in
432+
mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete)
433+
}
434+
)
435+
}
436+
437+
init(clusterdEndpoint: Cluster.Endpoint) {
438+
self.implementation = .clusterDaemon(
439+
endpoint: clusterdEndpoint,
440+
initialize: { system in
441+
system.log.info("Joining [clusterd] at \(clusterdEndpoint)")
442+
system.cluster.join(endpoint: clusterdEndpoint)
443+
}
444+
)
445+
}
446+
447+
/// Locate the default `ClusterD` process and use it for discovering cluster nodes.
448+
public static var clusterd: Self {
449+
get {
450+
Self.clusterd(endpoint: nil)
431451
}
432452
}
433453

454+
public static func clusterd(endpoint: Cluster.Endpoint?) -> Self {
455+
ServiceDiscoverySettings(clusterdEndpoint: endpoint ?? ClusterDaemon.defaultEndpoint)
456+
}
457+
458+
public static func seed(nodes: Set<Cluster.Endpoint>) -> Self {
459+
.init(static: nodes)
460+
}
461+
434462
public init(static nodes: Set<Cluster.Endpoint>) {
435-
self.implementation = .static(nodes)
436-
self._subscribe = { onNext, _ in
437-
// Call onNext once and never again since the list of nodes doesn't change
438-
onNext(.success(Array(nodes)))
439-
// Ignore onComplete because static service discovery never terminates
440-
441-
// No cancellation token
442-
return nil
443-
}
463+
self.implementation = .static(
464+
endpoints: nodes,
465+
subscribe: { onNext, _ in
466+
// Call onNext once and never again since the list of nodes doesn't change
467+
onNext(.success(Array(nodes)))
468+
// Ignore onComplete because static service discovery never terminates
469+
470+
// No cancellation token
471+
return nil
472+
}
473+
)
444474
}
445475

446476
/// Similar to `ServiceDiscovery.subscribe` however it allows the handling of the listings to be generic and handled by the cluster system.
447477
/// This function is only intended for internal use by the `DiscoveryShell`.
448478
func subscribe(onNext nextResultHandler: @escaping (Result<[Cluster.Endpoint], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? {
449-
self._subscribe(nextResultHandler, completionHandler)
479+
switch self.implementation {
480+
case .static(_, let subscribe),
481+
.dynamic(_, let subscribe):
482+
subscribe(nextResultHandler, completionHandler)
483+
case .clusterDaemon:
484+
.none
485+
}
486+
}
487+
488+
func initializeClusterd(_ system: ClusterSystem) {
489+
switch self.implementation {
490+
case .clusterDaemon(_, let initialize):
491+
initialize(system)
492+
default:
493+
break
494+
}
450495
}
451496

452497
enum ServiceDiscoveryImplementation {
453-
case `static`(Set<Cluster.Endpoint>)
454-
case dynamic(AnyServiceDiscovery)
498+
case `static`(endpoints: Set<Cluster.Endpoint>, subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?)
499+
case dynamic(serviceDiscovery: AnyServiceDiscovery, subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?)
500+
case clusterDaemon(endpoint: Cluster.Endpoint, initialize: (ClusterSystem) -> Void)
455501
}
456502
}
457503

Sources/MultiNodeTestKit/MultiNodeTestKit.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public struct MultiNodeTest {
3636
public let crashRegex: String?
3737
public let runTest: (any MultiNodeTestControlProtocol) async throws -> Void
3838
public let configureActorSystem: (inout ClusterSystemSettings) -> Void
39+
public let startNode: (ClusterSystemSettings) async throws -> ClusterSystem
3940
public let configureMultiNodeTest: (inout MultiNodeTestSettings) -> Void
4041
public let makeControl: (String) -> any MultiNodeTestControlProtocol
4142

@@ -51,6 +52,7 @@ public struct MultiNodeTest {
5152
}
5253

5354
self.configureActorSystem = TestSuite.configureActorSystem
55+
self.startNode = TestSuite.startNode
5456
self.configureMultiNodeTest = TestSuite.configureMultiNodeTest
5557

5658
self.makeControl = { nodeName -> Control<TestSuite.Nodes> in
@@ -80,6 +82,7 @@ public protocol MultiNodeTestSuite {
8082
init()
8183
associatedtype Nodes: MultiNodeNodes
8284
static func configureActorSystem(settings: inout ClusterSystemSettings)
85+
static func startNode(settings: ClusterSystemSettings) async throws -> ClusterSystem
8386
static func configureMultiNodeTest(settings: inout MultiNodeTestSettings)
8487
}
8588

@@ -88,8 +91,8 @@ extension MultiNodeTestSuite {
8891
"\(Self.self)".split(separator: ".").last.map(String.init) ?? ""
8992
}
9093

91-
public func configureActorSystem(settings: inout ClusterSystemSettings) {
92-
// do nothing by default
94+
public static func startNode(settings: ClusterSystemSettings) async throws -> ClusterSystem {
95+
await ClusterSystem(settings: settings)
9396
}
9497

9598
var nodeNames: [String] {

Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Exec.swift

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,27 +50,34 @@ extension MultiNodeTestKitRunnerBoot {
5050
)
5151
}
5252

53-
let actorSystem = await ClusterSystem(nodeName) { settings in
54-
settings.bindHost = myNode.host
55-
settings.bindPort = myNode.port
56-
57-
/// Configure a nicer logger, that pretty prints metadata and also includes source location of logs
58-
if multiNodeSettings.installPrettyLogger {
59-
settings.logging.baseLogger = Logger(
60-
label: nodeName,
61-
factory: { label in
62-
PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture)
63-
}
64-
)
65-
}
53+
var settings = ClusterSystemSettings(name: nodeName)
54+
settings.bindHost = myNode.host
55+
settings.bindPort = myNode.port
56+
57+
/// Configure a nicer logger, that pretty prints metadata and also includes source location of logs
58+
if multiNodeSettings.installPrettyLogger {
59+
settings.logging.baseLogger = Logger(
60+
label: nodeName,
61+
factory: { label in
62+
PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture)
63+
}
64+
)
65+
}
6666

67-
// we use the singleton to implement a simple Coordinator
68-
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
69-
// there's a few ways to solve this... but for now this is good enough.
70-
settings += ClusterSingletonPlugin()
67+
// we use the singleton to implement a simple Coordinator
68+
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
69+
// there's a few ways to solve this... but for now this is good enough.
70+
settings += ClusterSingletonPlugin()
7171

72-
multiNodeTest.configureActorSystem(&settings)
73-
}
72+
multiNodeTest.configureActorSystem(&settings)
73+
74+
// we use the singleton to implement a simple Coordinator
75+
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
76+
// there's a few ways to solve this... but for now this is good enough.
77+
settings += ClusterSingletonPlugin()
78+
multiNodeTest.configureActorSystem(&settings)
79+
80+
let actorSystem = try await multiNodeTest.startNode(settings)
7481
control._actorSystem = actorSystem
7582

7683
let signalQueue = DispatchQueue(label: "multi.node.\(multiNodeTest.testSuiteName).\(multiNodeTest.testName).\(nodeName).SignalHandlerQueue")

0 commit comments

Comments
 (0)