Skip to content

Commit 5004a18

Browse files
authored
Fix race conditions in PrimitiveSequence+Concurrency (#2641)
This caused continuations to be resumed more than once, crashing clients due to a violation of the Swift Concurrency contract
1 parent 7157523 commit 5004a18

File tree

3 files changed

+161
-25
lines changed

3 files changed

+161
-25
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ All notable changes to this project will be documented in this file.
1212
* Provides `Infallible` versions of `CombineLatest+Collection` helpers.
1313
* Explicitly declare `APPLICATION_EXTENSION_API_ONLY` for CocoaPods
1414

15+
### Anomalies
16+
17+
* Fixes a crash that could occur when awaiting a `Single`, `Maybe`, or `Completable` that was disposed.
18+
1519
## 6.5.0
1620

1721
You can now use `await` on `Observable`-conforming objects (as well as `Driver`, `Signal`, `Infallible`, `Single`, `Completable`) using the following syntax:

RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,29 @@ public extension PrimitiveSequenceType where Trait == SingleTrait {
6262
operation: {
6363
try await withCheckedThrowingContinuation { continuation in
6464
var didResume = false
65+
let lock = RecursiveLock()
6566
disposable.setDisposable(
6667
self.subscribe(
67-
onSuccess: {
68-
didResume = true
69-
continuation.resume(returning: $0)
68+
onSuccess: { value in
69+
lock.withLock {
70+
guard !didResume else { return }
71+
didResume = true
72+
continuation.resume(returning: value)
73+
}
7074
},
71-
onFailure: {
72-
didResume = true
73-
continuation.resume(throwing: $0)
75+
onFailure: { error in
76+
lock.withLock {
77+
guard !didResume else { return }
78+
didResume = true
79+
continuation.resume(throwing: error)
80+
}
7481
},
7582
onDisposed: {
76-
guard !didResume else { return }
77-
continuation.resume(throwing: CancellationError())
83+
lock.withLock {
84+
guard !didResume else { return }
85+
didResume = true
86+
continuation.resume(throwing: CancellationError())
87+
}
7888
}
7989
)
8090
)
@@ -111,27 +121,37 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait {
111121
return try await withTaskCancellationHandler(
112122
operation: {
113123
try await withCheckedThrowingContinuation { continuation in
114-
var didEmit = false
115124
var didResume = false
125+
let lock = RecursiveLock()
116126
disposable.setDisposable(
117127
self.subscribe(
118128
onSuccess: { value in
119-
didEmit = true
120-
didResume = true
121-
continuation.resume(returning: value)
129+
lock.withLock {
130+
guard !didResume else { return }
131+
didResume = true
132+
continuation.resume(returning: value)
133+
}
122134
},
123135
onError: { error in
124-
didResume = true
125-
continuation.resume(throwing: error)
136+
lock.withLock {
137+
guard !didResume else { return }
138+
didResume = true
139+
continuation.resume(throwing: error)
140+
}
126141
},
127142
onCompleted: {
128-
guard !didEmit else { return }
129-
didResume = true
130-
continuation.resume(returning: nil)
143+
lock.withLock {
144+
guard !didResume else { return }
145+
didResume = true
146+
continuation.resume(returning: nil)
147+
}
131148
},
132149
onDisposed: {
133-
guard !didResume else { return }
134-
continuation.resume(throwing: CancellationError())
150+
lock.withLock {
151+
guard !didResume else { return }
152+
didResume = true
153+
continuation.resume(throwing: CancellationError())
154+
}
135155
}
136156
)
137157
)
@@ -168,19 +188,29 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element
168188
operation: {
169189
try await withCheckedThrowingContinuation { continuation in
170190
var didResume = false
191+
let lock = RecursiveLock()
171192
disposable.setDisposable(
172193
self.subscribe(
173194
onCompleted: {
174-
didResume = true
175-
continuation.resume()
195+
lock.withLock {
196+
guard !didResume else { return }
197+
didResume = true
198+
continuation.resume()
199+
}
176200
},
177201
onError: { error in
178-
didResume = true
179-
continuation.resume(throwing: error)
202+
lock.withLock {
203+
guard !didResume else { return }
204+
didResume = true
205+
continuation.resume(throwing: error)
206+
}
180207
},
181208
onDisposed: {
182-
guard !didResume else { return }
183-
continuation.resume(throwing: CancellationError())
209+
lock.withLock {
210+
guard !didResume else { return }
211+
didResume = true
212+
continuation.resume(throwing: CancellationError())
213+
}
184214
}
185215
)
186216
)

Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,40 @@ extension PrimitiveSequenceConcurrencyTests {
8686
[randomResult]
8787
)
8888
}
89+
90+
/// A previous implementation of the `Single` to swift concurrency bridge had a bug where it would sometimes call the continuation twice.
91+
/// The current number of iterations is a sweet spot to not make the tests too slow while still catching the bug in most runs.
92+
/// If you are debugging this issue you might want to increase the iterations and/or run this test repeatedly.
93+
func testSingleContinuationIsNotResumedTwice() {
94+
let expectation = XCTestExpectation()
95+
let iterations = 10000
96+
for i in 0 ..< iterations {
97+
DispatchQueue.global(qos: .userInitiated).async {
98+
let single = Single<Int>.create { observer in
99+
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
100+
observer(.success(42))
101+
}
102+
return Disposables.create()
103+
}
104+
105+
let task = Task {
106+
_ = try await single.value
107+
}
108+
109+
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
110+
task.cancel()
111+
}
112+
113+
self.sleep(Double.random(in: 0.004...0.006))
114+
115+
if i == iterations - 1 {
116+
expectation.fulfill()
117+
}
118+
}
119+
}
120+
121+
wait(for: [expectation], timeout: 10)
122+
}
89123
}
90124

91125
// MARK: - Maybe
@@ -167,6 +201,40 @@ extension PrimitiveSequenceConcurrencyTests {
167201
try await Task.sleep(nanoseconds: 1_000_000)
168202
task.cancel()
169203
}
204+
205+
/// A previous implementation of the `Single` to swift concurrency bridge had a bug where it would sometimes call the continuation twice.
206+
/// The current number of iterations is a sweet spot to not make the tests too slow while still catching the bug in most runs.
207+
/// If you are debugging this issue you might want to increase the iterations and/or run this test repeatedly.
208+
func testMaybeContinuationIsNotResumedTwice() {
209+
let expectation = XCTestExpectation()
210+
let iterations = 10000
211+
for i in 0 ..< iterations {
212+
DispatchQueue.global(qos: .userInitiated).async {
213+
let maybe = Maybe<Bool>.create { observer in
214+
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
215+
observer(.success(true))
216+
}
217+
return Disposables.create()
218+
}
219+
220+
let task = Task {
221+
_ = try await maybe.value
222+
}
223+
224+
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
225+
task.cancel()
226+
}
227+
228+
self.sleep(Double.random(in: 0.004...0.006))
229+
230+
if i == iterations - 1 {
231+
expectation.fulfill()
232+
}
233+
}
234+
}
235+
236+
wait(for: [expectation], timeout: 10)
237+
}
170238
}
171239

172240
// MARK: - Completable
@@ -220,6 +288,40 @@ extension PrimitiveSequenceConcurrencyTests {
220288
}
221289
}.cancel()
222290
}
291+
292+
/// A previous implementation of the `Single` to swift concurrency bridge had a bug where it would sometimes call the continuation twice.
293+
/// The current number of iterations is a sweet spot to not make the tests too slow while still catching the bug in most runs.
294+
/// If you are debugging this issue you might want to increase the iterations and/or run this test repeatedly.
295+
func testCompletableContinuationIsNotResumedTwice() {
296+
let expectation = XCTestExpectation()
297+
let iterations = 10000
298+
for i in 0 ..< iterations {
299+
DispatchQueue.global(qos: .userInitiated).async {
300+
let completable = Completable.create { observer in
301+
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
302+
observer(.completed)
303+
}
304+
return Disposables.create()
305+
}
306+
307+
let task = Task {
308+
_ = try await completable.value
309+
}
310+
311+
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
312+
task.cancel()
313+
}
314+
315+
self.sleep(Double.random(in: 0.004...0.006))
316+
317+
if i == iterations - 1 {
318+
expectation.fulfill()
319+
}
320+
}
321+
}
322+
323+
wait(for: [expectation], timeout: 10)
324+
}
223325
}
224326
#endif
225327

0 commit comments

Comments
 (0)