Skip to content

Commit 8f1b976

Browse files
authored
feat(repeatWhen): notifier supports ObservableInput (#7103)
1 parent 1cf9994 commit 8f1b976

File tree

4 files changed

+77
-13
lines changed

4 files changed

+77
-13
lines changed
Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,84 @@
11
import { of } from 'rxjs';
22
import { repeatWhen } from 'rxjs/operators';
3+
import { asInteropObservable } from '../../spec/helpers/interop-helper';
34

45
it('should infer correctly', () => {
5-
const o = of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable<number>
6+
of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable<number>
67
});
78

89
it('should infer correctly when the error observable has a different type', () => {
9-
const o = of(1, 2, 3).pipe(repeatWhen(repeatWhen(errors => of('a', 'b', 'c')))); // $ExpectType Observable<number>
10+
of(1, 2, 3).pipe(repeatWhen(errors => asInteropObservable(of('a', 'b', 'c')))); // $ExpectType Observable<number>
1011
});
1112

1213
it('should enforce types', () => {
13-
const o = of(1, 2, 3).pipe(repeatWhen()); // $ExpectError
14+
of(1, 2, 3).pipe(repeatWhen()); // $ExpectError
15+
});
16+
17+
it('should accept interop observable notifier', () => {
18+
of(1, 2, 3).pipe(repeatWhen(() => asInteropObservable(of(true)))); // $ExpectType Observable<number>
19+
});
20+
21+
it('should accept promise notifier', () => {
22+
of(1, 2, 3).pipe(repeatWhen(() => Promise.resolve(true))); // $ExpectType Observable<number>
23+
});
24+
25+
it('should async iterable notifier', () => {
26+
const asyncRange = {
27+
from: 1,
28+
to: 2,
29+
[Symbol.asyncIterator]() {
30+
return {
31+
current: this.from,
32+
last: this.to,
33+
async next() {
34+
await Promise.resolve();
35+
const done = (this.current > this.last);
36+
return {
37+
done,
38+
value: done ? this.current++ : undefined
39+
};
40+
}
41+
};
42+
}
43+
};
44+
of(1, 2, 3).pipe(repeatWhen(() => asyncRange)); // $ExpectType Observable<number>
45+
});
46+
47+
it('should accept iterable notifier', () => {
48+
const syncRange = {
49+
from: 1,
50+
to: 2,
51+
[Symbol.iterator]() {
52+
return {
53+
current: this.from,
54+
last: this.to,
55+
next() {
56+
const done = (this.current > this.last);
57+
return {
58+
done,
59+
value: done ? this.current++ : undefined
60+
};
61+
}
62+
};
63+
}
64+
};
65+
of(1, 2, 3).pipe(repeatWhen(() => syncRange)); // $ExpectType Observable<number>
66+
});
67+
68+
it('should accept readable stream notifier', () => {
69+
const readableStream = new ReadableStream<string>({
70+
pull(controller) {
71+
controller.enqueue('x');
72+
controller.close();
73+
},
74+
});
75+
of(1, 2, 3).pipe(repeatWhen(() => readableStream)); // $ExpectType Observable<number>
1476
});
1577

1678
it('should enforce types of the notifier', () => {
17-
const o = of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError
79+
of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError
1880
});
1981

2082
it('should be deprecated', () => {
21-
const o = of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation
83+
of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation
2284
});

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ export { publishLast } from './internal/operators/publishLast';
164164
export { publishReplay } from './internal/operators/publishReplay';
165165
export { raceWith } from './internal/operators/raceWith';
166166
export { reduce } from './internal/operators/reduce';
167-
export { repeat } from './internal/operators/repeat';
167+
export { repeat, RepeatConfig } from './internal/operators/repeat';
168168
export { repeatWhen } from './internal/operators/repeatWhen';
169169
export { retry, RetryConfig } from './internal/operators/retry';
170170
export { retryWhen } from './internal/operators/retryWhen';

src/internal/operators/repeatWhen.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { Observable } from '../Observable';
2+
import { innerFrom } from '../observable/innerFrom';
23
import { Subject } from '../Subject';
34
import { Subscription } from '../Subscription';
45

5-
import { MonoTypeOperatorFunction } from '../types';
6+
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
67
import { operate } from '../util/lift';
78
import { createOperatorSubscriber } from './OperatorSubscriber';
89

@@ -33,13 +34,14 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
3334
* @see {@link retry}
3435
* @see {@link retryWhen}
3536
*
36-
* @param {function(notifications: Observable): Observable} notifier - Receives an Observable of notifications with
37+
* @param notifier Function that receives an Observable of notifications with
3738
* which a user can `complete` or `error`, aborting the repetition.
38-
* @return A function that returns an Observable that mirrors the source
39+
* @return A function that returns an `ObservableInput` that mirrors the source
3940
* Observable with the exception of a `complete`.
40-
* @deprecated Will be removed in v9 or v10. Use {@link repeat}'s `delay` option instead.
41+
* @deprecated Will be removed in v9 or v10. Use {@link repeat}'s {@link RepeatConfig#delay delay} option instead.
42+
* Instead of `repeatWhen(() => notify$)`, use: `repeat({ delay: () => notify$ })`.
4143
*/
42-
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Observable<any>): MonoTypeOperatorFunction<T> {
44+
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
4345
return operate((source, subscriber) => {
4446
let innerSub: Subscription | null;
4547
let syncResub = false;
@@ -61,7 +63,7 @@ export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Obs
6163

6264
// If the call to `notifier` throws, it will be caught by the OperatorSubscriber
6365
// In the main subscription -- in `subscribeForRepeatWhen`.
64-
notifier(completions$).subscribe(
66+
innerFrom(notifier(completions$)).subscribe(
6567
createOperatorSubscriber(
6668
subscriber,
6769
() => {

src/operators/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ export { publishReplay } from '../internal/operators/publishReplay';
6868
export { race } from '../internal/operators/race';
6969
export { raceWith } from '../internal/operators/raceWith';
7070
export { reduce } from '../internal/operators/reduce';
71-
export { repeat } from '../internal/operators/repeat';
71+
export { repeat, RepeatConfig } from '../internal/operators/repeat';
7272
export { repeatWhen } from '../internal/operators/repeatWhen';
7373
export { retry, RetryConfig } from '../internal/operators/retry';
7474
export { retryWhen } from '../internal/operators/retryWhen';

0 commit comments

Comments
 (0)