ReplaySubject.js
4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"use strict";
var __extends = (this && this.__extends) || (function () {
var extendStatics = function (d, b) {
extendStatics = Object.setPrototypeOf ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; };
return extendStatics(d, b);
}
return function (d, b) {
extendStatics(d, b);
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
})();
Object.defineProperty(exports, "__esModule", { value: true });
var Subject_1 = require("./Subject");
var queue_1 = require("./scheduler/queue");
var Subscription_1 = require("./Subscription");
var observeOn_1 = require("./operators/observeOn");
var ObjectUnsubscribedError_1 = require("./util/ObjectUnsubscribedError");
var SubjectSubscription_1 = require("./SubjectSubscription");
var ReplaySubject = (function (_super) {
__extends(ReplaySubject, _super);
function ReplaySubject(bufferSize, windowTime, scheduler) {
if (bufferSize === void 0) { bufferSize = Number.POSITIVE_INFINITY; }
if (windowTime === void 0) { windowTime = Number.POSITIVE_INFINITY; }
var _this = _super.call(this) || this;
_this.scheduler = scheduler;
_this._events = [];
_this._infiniteTimeWindow = false;
_this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
_this._windowTime = windowTime < 1 ? 1 : windowTime;
if (windowTime === Number.POSITIVE_INFINITY) {
_this._infiniteTimeWindow = true;
_this.next = _this.nextInfiniteTimeWindow;
}
else {
_this.next = _this.nextTimeWindow;
}
return _this;
}
ReplaySubject.prototype.nextInfiniteTimeWindow = function (value) {
if (!this.isStopped) {
var _events = this._events;
_events.push(value);
if (_events.length > this._bufferSize) {
_events.shift();
}
}
_super.prototype.next.call(this, value);
};
ReplaySubject.prototype.nextTimeWindow = function (value) {
if (!this.isStopped) {
this._events.push(new ReplayEvent(this._getNow(), value));
this._trimBufferThenGetEvents();
}
_super.prototype.next.call(this, value);
};
ReplaySubject.prototype._subscribe = function (subscriber) {
var _infiniteTimeWindow = this._infiniteTimeWindow;
var _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();
var scheduler = this.scheduler;
var len = _events.length;
var subscription;
if (this.closed) {
throw new ObjectUnsubscribedError_1.ObjectUnsubscribedError();
}
else if (this.isStopped || this.hasError) {
subscription = Subscription_1.Subscription.EMPTY;
}
else {
this.observers.push(subscriber);
subscription = new SubjectSubscription_1.SubjectSubscription(this, subscriber);
}
if (scheduler) {
subscriber.add(subscriber = new observeOn_1.ObserveOnSubscriber(subscriber, scheduler));
}
if (_infiniteTimeWindow) {
for (var i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(_events[i]);
}
}
else {
for (var i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(_events[i].value);
}
}
if (this.hasError) {
subscriber.error(this.thrownError);
}
else if (this.isStopped) {
subscriber.complete();
}
return subscription;
};
ReplaySubject.prototype._getNow = function () {
return (this.scheduler || queue_1.queue).now();
};
ReplaySubject.prototype._trimBufferThenGetEvents = function () {
var now = this._getNow();
var _bufferSize = this._bufferSize;
var _windowTime = this._windowTime;
var _events = this._events;
var eventsCount = _events.length;
var spliceCount = 0;
while (spliceCount < eventsCount) {
if ((now - _events[spliceCount].time) < _windowTime) {
break;
}
spliceCount++;
}
if (eventsCount > _bufferSize) {
spliceCount = Math.max(spliceCount, eventsCount - _bufferSize);
}
if (spliceCount > 0) {
_events.splice(0, spliceCount);
}
return _events;
};
return ReplaySubject;
}(Subject_1.Subject));
exports.ReplaySubject = ReplaySubject;
var ReplayEvent = (function () {
function ReplayEvent(time, value) {
this.time = time;
this.value = value;
}
return ReplayEvent;
}());
//# sourceMappingURL=ReplaySubject.js.map