RedisConnection.coffee
2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
parser = require "./parser"
Events = require "./Events"
Scripts = require "./Scripts"
class RedisConnection
datastore: "redis"
defaults:
Redis: null
clientOptions: {}
client: null
Promise: Promise
Events: null
constructor: (options={}) ->
parser.load options, @defaults, @
@Redis ?= eval("require")("redis") # Obfuscated or else Webpack/Angular will try to inline the optional redis module. To override this behavior: pass the redis module to Bottleneck as the 'Redis' option.
@Events ?= new Events @
@terminated = false
@client ?= @Redis.createClient @clientOptions
@subscriber = @client.duplicate()
@limiters = {}
@shas = {}
@ready = @Promise.all [@_setup(@client, false), @_setup(@subscriber, true)]
.then => @_loadScripts()
.then => { @client, @subscriber }
_setup: (client, sub) ->
client.setMaxListeners 0
new @Promise (resolve, reject) =>
client.on "error", (e) => @Events.trigger "error", e
if sub
client.on "message", (channel, message) =>
@limiters[channel]?._store.onMessage channel, message
if client.ready then resolve()
else client.once "ready", resolve
_loadScript: (name) ->
new @Promise (resolve, reject) =>
payload = Scripts.payload name
@client.multi([["script", "load", payload]]).exec (err, replies) =>
if err? then return reject err
@shas[name] = replies[0]
resolve replies[0]
_loadScripts: -> @Promise.all(Scripts.names.map (k) => @_loadScript k)
__runCommand__: (cmd) ->
await @ready
new @Promise (resolve, reject) =>
@client.multi([cmd]).exec_atomic (err, replies) ->
if err? then reject(err) else resolve(replies[0])
__addLimiter__: (instance) ->
@Promise.all [instance.channel(), instance.channel_client()].map (channel) =>
new @Promise (resolve, reject) =>
handler = (chan) =>
if chan == channel
@subscriber.removeListener "subscribe", handler
@limiters[channel] = instance
resolve()
@subscriber.on "subscribe", handler
@subscriber.subscribe channel
__removeLimiter__: (instance) ->
@Promise.all [instance.channel(), instance.channel_client()].map (channel) =>
unless @terminated
await new @Promise (resolve, reject) =>
@subscriber.unsubscribe channel, (err, chan) ->
if err? then return reject err
if chan == channel then return resolve()
delete @limiters[channel]
__scriptArgs__: (name, id, args, cb) ->
keys = Scripts.keys name, id
[@shas[name], keys.length].concat keys, args, cb
__scriptFn__: (name) ->
@client.evalsha.bind(@client)
disconnect: (flush=true) ->
clearInterval(@limiters[k]._store.heartbeat) for k in Object.keys @limiters
@limiters = {}
@terminated = true
@client.end flush
@subscriber.end flush
@Promise.resolve()
module.exports = RedisConnection