Skip to content

Commit 459946d

Browse files
committed
Added FlatMapSeededProcessor.
1 parent 4392735 commit 459946d

File tree

4 files changed

+191
-98
lines changed

4 files changed

+191
-98
lines changed

Concurrency Utilities/ReactiveCollection.swift

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,14 @@ public final class IteratorSeededPublisher<S, O>: IteratorPublisherClassBase<O>
4242
///
4343
/// - parameters:
4444
/// - qos: Quality of service for the dispatch queue used to sequence items and produce items in the background (default `DispatchQOS.default`).
45-
/// - bufferSize: This is a standard argument to Reactive Stream types, in this case the argument doesn't define the buffer size but rather the number of items produced before testing for subscription cancellation (default `ReactiveStreams.defaultBufferSize`).
4645
/// - initialSeed: The value of the seed at the start of each iteration cycle.
4746
/// - nextItem: A closure that produces the next item, or `nil` to indicate termination, given the seed (which it can modify)
4847
/// - seed: The seed passed to the `nextItem` closure as an `inout` parameter so that the closure can modify the seed.
49-
public init(qos: DispatchQoS = .default, bufferSize: Int = ReactiveStreams.defaultBufferSize, initialSeed: S, nextItem: @escaping (_ seed: inout S) throws -> O?) {
48+
public init(qos: DispatchQoS = .default, initialSeed: S, nextItem: @escaping (_ seed: inout S) throws -> O?) {
5049
self.initialSeed = initialSeed
5150
seed = initialSeed
5251
self.nextItem = nextItem
53-
super.init(qos: qos, bufferSize: bufferSize)
52+
super.init(qos: qos)
5453
}
5554

5655
/// Calls `nextItem(&seed)`.
@@ -76,7 +75,6 @@ public final class IteratorSeededPublisher<S, O>: IteratorPublisherClassBase<O>
7675
///
7776
/// - note:
7877
/// - Each subscriber receives all of the sequence individually, i.e. each subscriber receives the whole sequence (provided that the given sequence supports multiple traversal).
79-
/// - This class does not use buffering, because the buffer is the given sequence.
8078
/// - When created the given sequence is copied, therefore any changes to the sequence made after the publisher is created are *not* reflected in the items produced (the copy of the sequence is made at creation time not subscription time).
8179
///
8280
/// - parameters
@@ -91,10 +89,9 @@ public final class ForEachPublisher<O>: IteratorPublisherClassBase<O> {
9189
/// - parameters:
9290
/// - sequence: The sequence of items produced (one sequence per subscription assuming that the sequence can be traversed multiple times).
9391
/// - qos: Quality of service for the dispatch queue used to sequence items and produce items in the background (default `DispatchQOS.default`).
94-
/// - bufferSize: This is a standard argument to Reactive Stream types, in this case the argument doesn't define the buffer size but rather the number of items produced before testing for subscription cancellation (default `ReactiveStreams.defaultBufferSize`).
95-
public init<S>(sequence: S, qos: DispatchQoS = .default, bufferSize: Int = ReactiveStreams.defaultBufferSize) where S: Sequence, S.SubSequence: Sequence, S.Iterator.Element == O, S.SubSequence.SubSequence == S.SubSequence, S.SubSequence.Iterator.Element == O {
92+
public init<S>(sequence: S, qos: DispatchQoS = .default) where S: Sequence, S.SubSequence: Sequence, S.Iterator.Element == O, S.SubSequence.SubSequence == S.SubSequence, S.SubSequence.Iterator.Element == O {
9693
self.sequence = AnySequence(sequence)
97-
super.init(qos: qos, bufferSize: bufferSize)
94+
super.init(qos: qos)
9895
}
9996

10097
private var iterator: AnyIterator<O>!
@@ -184,7 +181,7 @@ public final class ReduceFutureSubscriber<T, R>: FutureSubscriberClassBase<T, R>
184181

185182
// MARK: `Processors`s.
186183

187-
/// A `Processor` that takes input items from its input subscription maps (a.k.a. processes, a.k.a. transforms) them into output items, using its seed, and outputs them and to its output subscriber (Reactive Stream version of `Sequence.map(_ transform:)`).
184+
/// A `Processor` that takes input items from its input subscription and maps (a.k.a. processes, a.k.a. transforms) them into output items, using its seed (Reactive Stream version of `Sequence.map(_ transform:)`).
188185
///
189186
/// - warning:
190187
/// - `processors`s are not thread safe, since they are an alternative to dealing with thread safety directly and therefore it makes no sense to share them between threads.
@@ -193,37 +190,85 @@ public final class ReduceFutureSubscriber<T, R>: FutureSubscriberClassBase<T, R>
193190
///
194191
/// - parameters
195192
/// - S: The type of the seed accepted by the given transform closure.
196-
/// - T: The type of the elements subscribed to.
197-
/// - R: The result type of the accumulation.
198-
public final class MapSeededProcessor<S, I, O>: MapProcessorClassBase<I, O> {
193+
/// - I: The type of the input elements subscribed to.
194+
/// - O: The output type after the mapping.
195+
public final class MapSeededProcessor<S, I, O>: ProcessorClassBase<I, O> {
199196
private let initialSeed: S
200197

201198
private let transformClosure: (inout S, I) throws -> O
202199

203200
private var seed: S
204201

205-
/// A `Processor` that takes input items from its input subscription maps (a.k.a. processes, a.k.a. transforms) them into output items, using its seed, and outputs them and to its output subscriber (Reactive Stream version of `Sequence.map(_ transform:)`).
202+
/// A `Processor` that takes input items from its input subscription and maps (a.k.a. processes, a.k.a. transforms) them into output items, using its seed (Reactive Stream version of `Sequence.map(_ transform:)`).
206203
///
207204
/// - parameters:
208205
/// - initialSeed: The initial value of the seed at the start of new input and output subscription.
209-
/// - transform: The mapping/processing transform that converts an input item intop an output item.
206+
/// - transform: The mapping/processing transform that converts an input item into an output item.
210207
public init(initialSeed: S, transform: @escaping (_ seed: inout S, _ nextItem: I) throws -> O) {
211208
self.initialSeed = initialSeed
212209
seed = initialSeed
213210
transformClosure = transform
214211
}
215212

216-
/// Calls the transform closure with the seed and the given input item.
213+
/// Calls the transform closure with the seed and the given input item and passes the resulting transformed item onto the output susbcription.
217214
///
218215
/// - parameter inputItem: The input item to be transformed/mapped/processed.
219-
///
220-
/// - returns: The transformed/mapped/processed input item.
221-
public override func _map(_ inputItem: I) throws -> O {
222-
return try transformClosure(&seed, inputItem)
216+
public override func _consumeAndRequest(item: I) throws {
217+
_outputSubscriber.value?.on(next: try transformClosure(&seed, item))
223218
}
224219

225220
/// Resets the seed at the start of each new output subscription.
226221
public override func _resetOutputSubscription() {
227222
seed = initialSeed
228223
}
229224
}
225+
226+
/// A `Processor` that takes input items from its input subscription and maps (a.k.a. processes, a.k.a. transforms) them into *non-`nil`* output items (Reactive Stream version of `Sequence.flatMap(_ transform:)`).
227+
///
228+
/// - warning:
229+
/// - `processors`s are not thread safe, since they are an alternative to dealing with thread safety directly and therefore it makes no sense to share them between threads.
230+
/// - There are *no* `Publisher` methods/properties intended for use by a client (the programmer using an instance of this class), the client *only* passes the instance to the `subscribe` method of a `Publisher`.
231+
/// Passing the instance to the publisher is best accomplished using operator `~~>`, since this emphasizes that the other methods are not for client use.
232+
///
233+
/// - parameters
234+
/// - S: The type of the seed accepted by the given transform closure.
235+
/// - I: The type of the input elements subscribed to.
236+
/// - O: The output type after the mapping.
237+
public final class FlatMapSeededProcessor<S, I, O>: ProcessorClassBase<I, O> {
238+
private let initialSeed: S
239+
240+
private let transformClosure: (inout S, I) throws -> O?
241+
242+
private var seed: S
243+
244+
/// A `Processor` that takes input items from its input subscription and maps (a.k.a. processes, a.k.a. transforms) them into *non-`nil`* output items (Reactive Stream version of `Sequence.flatMap(_ transform:)`).
245+
///
246+
/// - parameters:
247+
/// - initialSeed: The initial value of the seed at the start of new input and output subscription.
248+
/// - transform:
249+
/// The mapping/processing transform that converts an input item into an *optional* output item.
250+
/// If the transformed/mapped/processed item is `nil`, it is disguarded.
251+
public init(initialSeed: S, transform: @escaping (_ seed: inout S, _ nextItem: I) throws -> O?) {
252+
self.initialSeed = initialSeed
253+
seed = initialSeed
254+
transformClosure = transform
255+
}
256+
257+
/// Calls the transform closure with the seed and the given input item and passes the resulting transformed item onto the output susbcription assuming that it isn't `nil`, if it is `nil` it requests an extra input item.
258+
///
259+
/// - parameter inputItem: The input item to be transformed/mapped/processed.
260+
public override func _consumeAndRequest(item: I) throws {
261+
let outputItemOptional = try transformClosure(&seed, item)
262+
guard let outputItem = outputItemOptional else {
263+
_inputSubscription.value?.request(1)
264+
return
265+
}
266+
_outputSubscriber.value?.on(next: outputItem)
267+
}
268+
269+
/// Resets the seed at the start of each new output subscription.
270+
public override func _resetOutputSubscription() {
271+
seed = initialSeed
272+
}
273+
}
274+

0 commit comments

Comments
 (0)