Bottleneck.coffee
10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
NUM_PRIORITIES = 10
DEFAULT_PRIORITY = 5
parser = require "./parser"
Queues = require "./Queues"
Job = require "./Job"
LocalDatastore = require "./LocalDatastore"
RedisDatastore = require "./RedisDatastore"
Events = require "./Events"
States = require "./States"
Sync = require "./Sync"
class Bottleneck
Bottleneck.default = Bottleneck
Bottleneck.Events = Events
Bottleneck.version = Bottleneck::version = require("./version.json").version
Bottleneck.strategy = Bottleneck::strategy = { LEAK:1, OVERFLOW:2, OVERFLOW_PRIORITY:4, BLOCK:3 }
Bottleneck.BottleneckError = Bottleneck::BottleneckError = require "./BottleneckError"
Bottleneck.Group = Bottleneck::Group = require "./Group"
Bottleneck.RedisConnection = Bottleneck::RedisConnection = require "./RedisConnection"
Bottleneck.IORedisConnection = Bottleneck::IORedisConnection = require "./IORedisConnection"
Bottleneck.Batcher = Bottleneck::Batcher = require "./Batcher"
jobDefaults:
priority: DEFAULT_PRIORITY
weight: 1
expiration: null
id: "<no-id>"
storeDefaults:
maxConcurrent: null
minTime: 0
highWater: null
strategy: Bottleneck::strategy.LEAK
penalty: null
reservoir: null
reservoirRefreshInterval: null
reservoirRefreshAmount: null
reservoirIncreaseInterval: null
reservoirIncreaseAmount: null
reservoirIncreaseMaximum: null
localStoreDefaults:
Promise: Promise
timeout: null
heartbeatInterval: 250
redisStoreDefaults:
Promise: Promise
timeout: null
heartbeatInterval: 5000
clientTimeout: 10000
Redis: null
clientOptions: {}
clusterNodes: null
clearDatastore: false
connection: null
instanceDefaults:
datastore: "local"
connection: null
id: "<no-id>"
rejectOnDrop: true
trackDoneStatus: false
Promise: Promise
stopDefaults:
enqueueErrorMessage: "This limiter has been stopped and cannot accept new jobs."
dropWaitingJobs: true
dropErrorMessage: "This limiter has been stopped."
constructor: (options={}, invalid...) ->
@_validateOptions options, invalid
parser.load options, @instanceDefaults, @
@_queues = new Queues NUM_PRIORITIES
@_scheduled = {}
@_states = new States ["RECEIVED", "QUEUED", "RUNNING", "EXECUTING"].concat(if @trackDoneStatus then ["DONE"] else [])
@_limiter = null
@Events = new Events @
@_submitLock = new Sync "submit", @Promise
@_registerLock = new Sync "register", @Promise
storeOptions = parser.load options, @storeDefaults, {}
@_store = if @datastore == "redis" or @datastore == "ioredis" or @connection?
storeInstanceOptions = parser.load options, @redisStoreDefaults, {}
new RedisDatastore @, storeOptions, storeInstanceOptions
else if @datastore == "local"
storeInstanceOptions = parser.load options, @localStoreDefaults, {}
new LocalDatastore @, storeOptions, storeInstanceOptions
else
throw new Bottleneck::BottleneckError "Invalid datastore type: #{@datastore}"
@_queues.on "leftzero", => @_store.heartbeat?.ref?()
@_queues.on "zero", => @_store.heartbeat?.unref?()
_validateOptions: (options, invalid) ->
unless options? and typeof options == "object" and invalid.length == 0
throw new Bottleneck::BottleneckError "Bottleneck v2 takes a single object argument. Refer to https://github.com/SGrondin/bottleneck#upgrading-to-v2 if you're upgrading from Bottleneck v1."
ready: -> @_store.ready
clients: -> @_store.clients
channel: -> "b_#{@id}"
channel_client: -> "b_#{@id}_#{@_store.clientId}"
publish: (message) -> @_store.__publish__ message
disconnect: (flush=true) -> @_store.__disconnect__ flush
chain: (@_limiter) -> @
queued: (priority) -> @_queues.queued priority
clusterQueued: -> @_store.__queued__()
empty: -> @queued() == 0 and @_submitLock.isEmpty()
running: -> @_store.__running__()
done: -> @_store.__done__()
jobStatus: (id) -> @_states.jobStatus id
jobs: (status) -> @_states.statusJobs status
counts: -> @_states.statusCounts()
_randomIndex: -> Math.random().toString(36).slice(2)
check: (weight=1) -> @_store.__check__ weight
_clearGlobalState: (index) ->
if @_scheduled[index]?
clearTimeout @_scheduled[index].expiration
delete @_scheduled[index]
true
else false
_free: (index, job, options, eventInfo) ->
try
{ running } = await @_store.__free__ index, options.weight
@Events.trigger "debug", "Freed #{options.id}", eventInfo
if running == 0 and @empty() then @Events.trigger "idle"
catch e
@Events.trigger "error", e
_run: (index, job, wait) ->
job.doRun()
clearGlobalState = @_clearGlobalState.bind @, index
run = @_run.bind @, index, job
free = @_free.bind @, index, job
@_scheduled[index] =
timeout: setTimeout =>
job.doExecute @_limiter, clearGlobalState, run, free
, wait
expiration: if job.options.expiration? then setTimeout ->
job.doExpire clearGlobalState, run, free
, wait + job.options.expiration
job: job
_drainOne: (capacity) ->
@_registerLock.schedule =>
if @queued() == 0 then return @Promise.resolve null
queue = @_queues.getFirst()
{ options, args } = next = queue.first()
if capacity? and options.weight > capacity then return @Promise.resolve null
@Events.trigger "debug", "Draining #{options.id}", { args, options }
index = @_randomIndex()
@_store.__register__ index, options.weight, options.expiration
.then ({ success, wait, reservoir }) =>
@Events.trigger "debug", "Drained #{options.id}", { success, args, options }
if success
queue.shift()
empty = @empty()
if empty then @Events.trigger "empty"
if reservoir == 0 then @Events.trigger "depleted", empty
@_run index, next, wait
@Promise.resolve options.weight
else
@Promise.resolve null
_drainAll: (capacity, total=0) ->
@_drainOne(capacity)
.then (drained) =>
if drained?
newCapacity = if capacity? then capacity - drained else capacity
@_drainAll(newCapacity, total + drained)
else @Promise.resolve total
.catch (e) => @Events.trigger "error", e
_dropAllQueued: (message) -> @_queues.shiftAll (job) -> job.doDrop { message }
stop: (options={}) ->
options = parser.load options, @stopDefaults
waitForExecuting = (at) =>
finished = =>
counts = @_states.counts
(counts[0] + counts[1] + counts[2] + counts[3]) == at
new @Promise (resolve, reject) =>
if finished() then resolve()
else
@on "done", =>
if finished()
@removeAllListeners "done"
resolve()
done = if options.dropWaitingJobs
@_run = (index, next) -> next.doDrop { message: options.dropErrorMessage }
@_drainOne = => @Promise.resolve null
@_registerLock.schedule => @_submitLock.schedule =>
for k, v of @_scheduled
if @jobStatus(v.job.options.id) == "RUNNING"
clearTimeout v.timeout
clearTimeout v.expiration
v.job.doDrop { message: options.dropErrorMessage }
@_dropAllQueued options.dropErrorMessage
waitForExecuting(0)
else
@schedule { priority: NUM_PRIORITIES - 1, weight: 0 }, => waitForExecuting(1)
@_receive = (job) -> job._reject new Bottleneck::BottleneckError options.enqueueErrorMessage
@stop = => @Promise.reject new Bottleneck::BottleneckError "stop() has already been called"
done
_addToQueue: (job) =>
{ args, options } = job
try
{ reachedHWM, blocked, strategy } = await @_store.__submit__ @queued(), options.weight
catch error
@Events.trigger "debug", "Could not queue #{options.id}", { args, options, error }
job.doDrop { error }
return false
if blocked
job.doDrop()
return true
else if reachedHWM
shifted = if strategy == Bottleneck::strategy.LEAK then @_queues.shiftLastFrom(options.priority)
else if strategy == Bottleneck::strategy.OVERFLOW_PRIORITY then @_queues.shiftLastFrom(options.priority + 1)
else if strategy == Bottleneck::strategy.OVERFLOW then job
if shifted? then shifted.doDrop()
if not shifted? or strategy == Bottleneck::strategy.OVERFLOW
if not shifted? then job.doDrop()
return reachedHWM
job.doQueue reachedHWM, blocked
@_queues.push job
await @_drainAll()
reachedHWM
_receive: (job) ->
if @_states.jobStatus(job.options.id)?
job._reject new Bottleneck::BottleneckError "A job with the same id already exists (id=#{job.options.id})"
false
else
job.doReceive()
@_submitLock.schedule @_addToQueue, job
submit: (args...) ->
if typeof args[0] == "function"
[fn, args..., cb] = args
options = parser.load {}, @jobDefaults
else
[options, fn, args..., cb] = args
options = parser.load options, @jobDefaults
task = (args...) =>
new @Promise (resolve, reject) ->
fn args..., (args...) ->
(if args[0]? then reject else resolve) args
job = new Job task, args, options, @jobDefaults, @rejectOnDrop, @Events, @_states, @Promise
job.promise
.then (args) -> cb? args...
.catch (args) -> if Array.isArray args then cb? args... else cb? args
@_receive job
schedule: (args...) ->
if typeof args[0] == "function"
[task, args...] = args
options = {}
else
[options, task, args...] = args
job = new Job task, args, options, @jobDefaults, @rejectOnDrop, @Events, @_states, @Promise
@_receive job
job.promise
wrap: (fn) ->
schedule = @schedule.bind @
wrapped = (args...) -> schedule fn.bind(@), args...
wrapped.withOptions = (options, args...) -> schedule options, fn, args...
wrapped
updateSettings: (options={}) ->
await @_store.__updateSettings__ parser.overwrite options, @storeDefaults
parser.overwrite options, @instanceDefaults, @
@
currentReservoir: -> @_store.__currentReservoir__()
incrementReservoir: (incr=0) -> @_store.__incrementReservoir__ incr
module.exports = Bottleneck