Skip to content

Commit 4392735

Browse files
committed
Added MapSeededProcessor and Pi estimator example.
1 parent 55d634a commit 4392735

File tree

8 files changed

+682
-187
lines changed

8 files changed

+682
-187
lines changed

Concurrency Utilities/Atomic.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,24 @@
99
import Foundation
1010

1111
/// Gives a unique number.
12-
/// Useful for inique identifiers.
1312
///
14-
/// - note: An `enum` is used as a namespace since that is the nearest available in Swift.
13+
/// - note:
14+
/// - An `enum` is used as a namespace since that is the nearest available in Swift.
15+
/// - Useful for inique identifiers.
16+
/// - Is thread safe.
1517
public enum UniqueNumber {
18+
private static var queue = DispatchQueue(label: "UniqueNumber Serial Queue", qos: DispatchQoS.userInitiated)
19+
1620
private static var uniqueNumber = Int.min
1721

1822
/// The next unique number.
1923
public static var next: Int {
20-
defer {
24+
var result = 0
25+
queue.sync {
2126
uniqueNumber += 1
27+
result = uniqueNumber
2228
}
23-
return uniqueNumber
29+
return result
2430
}
2531
}
2632

Concurrency Utilities/Future.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ extension Thread {
2020
result = closure()
2121
} else {
2222
DispatchQueue.main.sync {
23-
result = closure()
23+
result = closure() // Can't think how to test this.
2424
}
2525
}
2626
return result!
@@ -273,7 +273,7 @@ public final class AsynchronousFuture<T>: Future<T> {
273273
case .running:
274274
switch group.wait(timeout: timeoutTime) { // Wait for calculation completion.
275275
case .success:
276-
break // Loop round and test status again to extract result
276+
break // Loop round and test status again to extract result
277277
case .timedOut:
278278
terminateFuture.value = .timedOut
279279
return nil

Concurrency Utilities/ReactiveCollection.swift

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import Foundation
2929
/// - parameters
3030
/// - S: The type of the seed used by given closure `nextItem`.
3131
/// - O: The type of the output items produced by given closure `nextItem`.
32-
public final class IteratorPublisherSeeded<S, O>: IteratorPublisherClassBase<O> {
32+
public final class IteratorSeededPublisher<S, O>: IteratorPublisherClassBase<O> {
3333
private let initialSeed: S
3434

3535
private let nextItem: (inout S) throws -> O?
@@ -110,7 +110,15 @@ public final class ForEachPublisher<O>: IteratorPublisherClassBase<O> {
110110
}
111111
}
112112

113-
// MARK: `Subscribers`s.
113+
// MARK: `Subscriber`s.
114+
115+
/// Signals, as opossed to errors, that `Subscriber`s can send by throwing the signals as errors (they are intercepted and do not result in stream errors).
116+
/// There is no standard way of a subscriber asking for actions, e.g. completion, in the Reactive Stream Specification and therefore throwing these signals must only be used within the Reactive Collection Library.
117+
public enum SubscriberSignal: Error {
118+
/// Indicates that the subscribers input subscription should be cancelled and the subscriber's `onComplete` method should be called to mark the end of items from the subscription.
119+
/// This is useful when a subscriber, or a processor which is a subscriber, needs to signal successful completion, rather than signal an error.
120+
case cancelInputSubscriptionAndComplete
121+
}
114122

115123
/// A `Subscriber` that is also a `Future` that takes items from its subscription and passes them to the given `updateAccumulatingResult` which combines them with the given `initialResult` and when finished returns via `get` the now modified `initialResult` (Reactive Stream version of `Sequence.reduce(into:_:)`).
116124
///
@@ -128,7 +136,7 @@ public final class ForEachPublisher<O>: IteratorPublisherClassBase<O> {
128136
/// - parameters
129137
/// - T: The type of the elements subscribed to.
130138
/// - R: The result type of the accumulation.
131-
public final class ReduceSubscriberFuture<T, R>: SubscriberFutureClassBase<T, R> {
139+
public final class ReduceFutureSubscriber<T, R>: FutureSubscriberClassBase<T, R> {
132140
private let initialResult: R
133141

134142
/// A `Subscriber` that is also a future that takes items from its subscription and passes them to the given `updateAccumulatingResult` which combines them with the given `initialResult` and when finished returns via `get` the now modified `initialResult` (Reactive Stream version of `Sequence.reduce(into:_:)`).
@@ -150,14 +158,14 @@ public final class ReduceSubscriberFuture<T, R>: SubscriberFutureClassBase<T, R>
150158
/// - updateAccumulatingResult: A closure that accepts the given `into` as an `inout` parameter and an item from a subscription and combines them into `into`.
151159
/// - accumulator: The running accumulator (this is the given `into` and is the value returned via `get`).
152160
/// - next: The next item to be accumulated.
153-
public init(timeout: DispatchTimeInterval = Futures.defaultTimeout, bufferSize: Int = ReactiveStreams.defaultBufferSize, into initialResult: R, updateAccumulatingResult: @escaping ( _ accumulator: inout R, _ next: T) throws -> ()) {
161+
public init(timeout: DispatchTimeInterval = Futures.defaultTimeout, bufferSize: Int = ReactiveStreams.defaultBufferSize, into initialResult: R, updateAccumulatingResult: @escaping (_ accumulator: inout R, _ next: T) throws -> ()) {
154162
self.initialResult = initialResult
155163
result = initialResult
156164
self.updateAccumulatingResult = updateAccumulatingResult
157165
super.init(timeout: timeout, bufferSize: bufferSize)
158166
}
159167

160-
private let updateAccumulatingResult: ( _ accumulator: inout R, _ next: T) throws -> ()
168+
private let updateAccumulatingResult: (_ accumulator: inout R, _ next: T) throws -> ()
161169

162170
public override func _consume(item: T) throws {
163171
try updateAccumulatingResult(&result, item)
@@ -175,3 +183,47 @@ public final class ReduceSubscriberFuture<T, R>: SubscriberFutureClassBase<T, R>
175183
}
176184

177185
// MARK: `Processors`s.
186+
187+
/// A `Processor` that takes input items from its input subscription maps (a.k.a. processes, a.k.a. transforms) them into output items, using its seed, and outputs them and to its output subscriber (Reactive Stream version of `Sequence.map(_ transform:)`).
188+
///
189+
/// - warning:
190+
/// - `processors`s are not thread safe, since they are an alternative to dealing with thread safety directly and therefore it makes no sense to share them between threads.
191+
/// - There are *no* `Publisher` methods/properties intended for use by a client (the programmer using an instance of this class), the client *only* passes the instance to the `subscribe` method of a `Publisher`.
192+
/// Passing the instance to the publisher is best accomplished using operator `~~>`, since this emphasizes that the other methods are not for client use.
193+
///
194+
/// - parameters
195+
/// - S: The type of the seed accepted by the given transform closure.
196+
/// - T: The type of the elements subscribed to.
197+
/// - R: The result type of the accumulation.
198+
public final class MapSeededProcessor<S, I, O>: MapProcessorClassBase<I, O> {
199+
private let initialSeed: S
200+
201+
private let transformClosure: (inout S, I) throws -> O
202+
203+
private var seed: S
204+
205+
/// A `Processor` that takes input items from its input subscription maps (a.k.a. processes, a.k.a. transforms) them into output items, using its seed, and outputs them and to its output subscriber (Reactive Stream version of `Sequence.map(_ transform:)`).
206+
///
207+
/// - parameters:
208+
/// - initialSeed: The initial value of the seed at the start of new input and output subscription.
209+
/// - transform: The mapping/processing transform that converts an input item intop an output item.
210+
public init(initialSeed: S, transform: @escaping (_ seed: inout S, _ nextItem: I) throws -> O) {
211+
self.initialSeed = initialSeed
212+
seed = initialSeed
213+
transformClosure = transform
214+
}
215+
216+
/// Calls the transform closure with the seed and the given input item.
217+
///
218+
/// - parameter inputItem: The input item to be transformed/mapped/processed.
219+
///
220+
/// - returns: The transformed/mapped/processed input item.
221+
public override func _map(_ inputItem: I) throws -> O {
222+
return try transformClosure(&seed, inputItem)
223+
}
224+
225+
/// Resets the seed at the start of each new output subscription.
226+
public override func _resetOutputSubscription() {
227+
seed = initialSeed
228+
}
229+
}

0 commit comments

Comments
 (0)