bufferToggle.ts
5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { subscribeToResult } from '../util/subscribeToResult';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { OperatorFunction, SubscribableOrPromise } from '../types';
/**
* Buffers the source Observable values starting from an emission from
* `openings` and ending when the output of `closingSelector` emits.
*
* <span class="informal">Collects values from the past as an array. Starts
* collecting only when `opening` emits, and calls the `closingSelector`
* function to get an Observable that tells when to close the buffer.</span>
*
* ![](bufferToggle.png)
*
* Buffers values from the source by opening the buffer via signals from an
* Observable provided to `openings`, and closing and sending the buffers when
* a Subscribable or Promise returned by the `closingSelector` function emits.
*
* ## Example
*
* Every other second, emit the click events from the next 500ms
*
* ```ts
* import { fromEvent, interval, EMPTY } from 'rxjs';
* import { bufferToggle } from 'rxjs/operators';
*
* const clicks = fromEvent(document, 'click');
* const openings = interval(1000);
* const buffered = clicks.pipe(bufferToggle(openings, i =>
* i % 2 ? interval(500) : EMPTY
* ));
* buffered.subscribe(x => console.log(x));
* ```
*
* @see {@link buffer}
* @see {@link bufferCount}
* @see {@link bufferTime}
* @see {@link bufferWhen}
* @see {@link windowToggle}
*
* @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
* buffers.
* @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
* the value emitted by the `openings` observable and returns a Subscribable or Promise,
* which, when it emits, signals that the associated buffer should be emitted
* and cleared.
* @return {Observable<T[]>} An observable of arrays of buffered values.
* @method bufferToggle
* @owner Observable
*/
export function bufferToggle<T, O>(
openings: SubscribableOrPromise<O>,
closingSelector: (value: O) => SubscribableOrPromise<any>
): OperatorFunction<T, T[]> {
return function bufferToggleOperatorFunction(source: Observable<T>) {
return source.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
};
}
class BufferToggleOperator<T, O> implements Operator<T, T[]> {
constructor(private openings: SubscribableOrPromise<O>,
private closingSelector: (value: O) => SubscribableOrPromise<any>) {
}
call(subscriber: Subscriber<T[]>, source: any): any {
return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
}
}
interface BufferContext<T> {
buffer: T[];
subscription: Subscription;
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
private contexts: Array<BufferContext<T>> = [];
constructor(destination: Subscriber<T[]>,
openings: SubscribableOrPromise<O>,
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
super(destination);
this.add(subscribeToResult(this, openings));
}
protected _next(value: T): void {
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
contexts[i].buffer.push(value);
}
}
protected _error(err: any): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift()!;
context.subscription.unsubscribe();
context.buffer = null!;
context.subscription = null!;
}
this.contexts = null!;
super._error(err);
}
protected _complete(): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift()!;
this.destination.next!(context.buffer);
context.subscription.unsubscribe();
context.buffer = null!;
context.subscription = null!;
}
this.contexts = null!;
super._complete();
}
notifyNext(outerValue: any, innerValue: O): void {
outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
}
notifyComplete(innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer((<any> innerSub).context);
}
private openBuffer(value: O): void {
try {
const closingSelector = this.closingSelector;
const closingNotifier = closingSelector.call(this, value);
if (closingNotifier) {
this.trySubscribe(closingNotifier);
}
} catch (err) {
this._error(err);
}
}
private closeBuffer(context: BufferContext<T>): void {
const contexts = this.contexts;
if (contexts && context) {
const { buffer, subscription } = context;
this.destination.next!(buffer);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
}
}
private trySubscribe(closingNotifier: any): void {
const contexts = this.contexts;
const buffer: Array<T> = [];
const subscription = new Subscription();
const context = { buffer, subscription };
contexts.push(context);
const innerSubscription = subscribeToResult(this, closingNotifier, context as any);
if (!innerSubscription || innerSubscription.closed) {
this.closeBuffer(context);
} else {
(innerSubscription as any).context = context;
this.add(innerSubscription);
subscription.add(innerSubscription);
}
}
}