onErrorResumeNext.ts
8.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import { Observable } from '../Observable';
import { from } from '../observable/from';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { isArray } from '../util/isArray';
import { ObservableInput, OperatorFunction } from '../types';
import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
/* tslint:disable:max-line-length */
export function onErrorResumeNext<T>(): OperatorFunction<T, T>;
export function onErrorResumeNext<T, T2>(v: ObservableInput<T2>): OperatorFunction<T, T | T2>;
export function onErrorResumeNext<T, T2, T3>(v: ObservableInput<T2>, v2: ObservableInput<T3>): OperatorFunction<T, T | T2 | T3>;
export function onErrorResumeNext<T, T2, T3, T4>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>): OperatorFunction<T, T | T2 | T3 | T4>;
export function onErrorResumeNext<T, T2, T3, T4, T5>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>): OperatorFunction<T, T | T2 | T3 | T4 | T5>;
export function onErrorResumeNext<T, T2, T3, T4, T5, T6>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>, v5: ObservableInput<T6>): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>;
export function onErrorResumeNext<T, T2, T3, T4, T5, T6, T7>(v: ObservableInput<T2>, v2: ObservableInput<T3>, v3: ObservableInput<T4>, v4: ObservableInput<T5>, v5: ObservableInput<T6>, v6: ObservableInput<T7>): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6 | T7>;
export function onErrorResumeNext<T, R>(...observables: Array<ObservableInput<any>>): OperatorFunction<T, T | R>;
export function onErrorResumeNext<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, T | R>;
/* tslint:enable:max-line-length */
/**
* When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one
* that was passed.
*
* <span class="informal">Execute series of Observables no matter what, even if it means swallowing errors.</span>
*
* ![](onErrorResumeNext.png)
*
* `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as
* arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
* as the source.
*
* `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable.
* When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext`
* will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
* its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another
* Observable in provided series, no matter if previous Observable completed or ended with an error. This will
* be happening until there is no more Observables left in the series, at which point returned Observable will
* complete - even if the last subscribed stream ended with an error.
*
* `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive
* when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable
* in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with
* an error.
*
* Note that you do not get any access to errors emitted by the Observables. In particular do not
* expect these errors to appear in error callback passed to {@link Observable#subscribe}. If you want to take
* specific actions based on what error was emitted by an Observable, you should try out {@link catchError} instead.
*
*
* ## Example
* Subscribe to the next Observable after map fails
* ```ts
* import { of } from 'rxjs';
* import { onErrorResumeNext, map } from 'rxjs/operators';
*
* of(1, 2, 3, 0).pipe(
* map(x => {
* if (x === 0) { throw Error(); }
* return 10 / x;
* }),
* onErrorResumeNext(of(1, 2, 3)),
* )
* .subscribe(
* val => console.log(val),
* err => console.log(err), // Will never be called.
* () => console.log('that\'s it!')
* );
*
* // Logs:
* // 10
* // 5
* // 3.3333333333333335
* // 1
* // 2
* // 3
* // "that's it!"
* ```
*
* @see {@link concat}
* @see {@link catchError}
*
* @param {...ObservableInput} observables Observables passed either directly or as an array.
* @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
* to the next passed Observable and so on, until it completes or runs out of Observables.
* @method onErrorResumeNext
* @owner Observable
*/
export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
Array<ObservableInput<any>>>): OperatorFunction<T, R> {
if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = <Array<Observable<any>>>nextSources[0];
}
return (source: Observable<T>) => source.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}
/* tslint:disable:max-line-length */
export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;
export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
/* tslint:enable:max-line-length */
export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let source: ObservableInput<any>|undefined = undefined;
if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = nextSources[0] as ObservableInput<any>[];
}
// TODO: resolve issue with passing no arguments.
source = nextSources.shift()!;
return from(source).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}
class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
constructor(private nextSources: Array<ObservableInput<any>>) {
}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
}
}
class OnErrorResumeNextSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
constructor(protected destination: Subscriber<T>,
private nextSources: Array<ObservableInput<any>>) {
super(destination);
}
notifyError(): void {
this.subscribeToNextSource();
}
notifyComplete(): void {
this.subscribeToNextSource();
}
protected _error(err: any): void {
this.subscribeToNextSource();
this.unsubscribe();
}
protected _complete(): void {
this.subscribeToNextSource();
this.unsubscribe();
}
private subscribeToNextSource(): void {
const next = this.nextSources.shift();
if (!!next) {
const innerSubscriber = new SimpleInnerSubscriber(this);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
const innerSubscription = innerSubscribe(next, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
} else {
this.destination.complete();
}
}
}