exhaustMap.ts
5.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { map } from './map';
import { from } from '../observable/from';
import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
/* tslint:disable:max-line-length */
export function exhaustMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O): OperatorFunction<T, ObservedValueOf<O>>;
/** @deprecated resultSelector is no longer supported. Use inner map instead. */
export function exhaustMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction<T, ObservedValueOf<O>>;
/** @deprecated resultSelector is no longer supported. Use inner map instead. */
export function exhaustMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */
/**
* Projects each source value to an Observable which is merged in the output
* Observable only if the previous projected Observable has completed.
*
* <span class="informal">Maps each value to an Observable, then flattens all of
* these inner Observables using {@link exhaust}.</span>
*
* ![](exhaustMap.png)
*
* Returns an Observable that emits items based on applying a function that you
* supply to each item emitted by the source Observable, where that function
* returns an (so-called "inner") Observable. When it projects a source value to
* an Observable, the output Observable begins emitting the items emitted by
* that projected Observable. However, `exhaustMap` ignores every new projected
* Observable if the previous projected Observable has not yet completed. Once
* that one completes, it will accept and flatten the next projected Observable
* and repeat this process.
*
* ## Example
* Run a finite timer for each click, only if there is no currently active timer
* ```ts
* import { fromEvent, interval } from 'rxjs';
* import { exhaustMap, take } from 'rxjs/operators';
*
* const clicks = fromEvent(document, 'click');
* const result = clicks.pipe(
* exhaustMap(ev => interval(1000).pipe(take(5)))
* );
* result.subscribe(x => console.log(x));
* ```
*
* @see {@link concatMap}
* @see {@link exhaust}
* @see {@link mergeMap}
* @see {@link switchMap}
*
* @param {function(value: T, ?index: number): ObservableInput} project A function
* that, when applied to an item emitted by the source Observable, returns an
* Observable.
* @return {Observable} An Observable containing projected Observables
* of each item of the source, ignoring projected Observables that start before
* their preceding Observable has completed.
* @method exhaustMap
* @owner Observable
*/
export function exhaustMap<T, R, O extends ObservableInput<any>>(
project: (value: T, index: number) => O,
resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R,
): OperatorFunction<T, ObservedValueOf<O>|R> {
if (resultSelector) {
// DEPRECATED PATH
return (source: Observable<T>) => source.pipe(
exhaustMap((a, i) => from(project(a, i)).pipe(
map((b: any, ii: any) => resultSelector(a, b, i, ii)),
)),
);
}
return (source: Observable<T>) =>
source.lift(new ExhaustMapOperator(project));
}
class ExhaustMapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => ObservableInput<R>) {
}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new ExhaustMapSubscriber(subscriber, this.project));
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class ExhaustMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
private hasSubscription = false;
private hasCompleted = false;
private index = 0;
constructor(destination: Subscriber<R>,
private project: (value: T, index: number) => ObservableInput<R>) {
super(destination);
}
protected _next(value: T): void {
if (!this.hasSubscription) {
this.tryNext(value);
}
}
private tryNext(value: T): void {
let result: ObservableInput<R>;
const index = this.index++;
try {
result = this.project(value, index);
} catch (err) {
this.destination.error!(err);
return;
}
this.hasSubscription = true;
this._innerSub(result);
}
private _innerSub(result: ObservableInput<R>): void {
const innerSubscriber = new SimpleInnerSubscriber(this);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
const innerSubscription = innerSubscribe(result, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}
protected _complete(): void {
this.hasCompleted = true;
if (!this.hasSubscription) {
this.destination.complete!();
}
this.unsubscribe();
}
notifyNext(innerValue: R): void {
this.destination.next!(innerValue);
}
notifyError(err: any): void {
this.destination.error!(err);
}
notifyComplete(): void {
this.hasSubscription = false;
if (this.hasCompleted) {
this.destination.complete!();
}
}
}