event-message-chunker-stream.js
4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
var util = require('../core').util;
var Transform = require('stream').Transform;
var allocBuffer = util.buffer.alloc;
/** @type {Transform} */
function EventMessageChunkerStream(options) {
Transform.call(this, options);
this.currentMessageTotalLength = 0;
this.currentMessagePendingLength = 0;
/** @type {Buffer} */
this.currentMessage = null;
/** @type {Buffer} */
this.messageLengthBuffer = null;
}
EventMessageChunkerStream.prototype = Object.create(Transform.prototype);
/**
*
* @param {Buffer} chunk
* @param {string} encoding
* @param {*} callback
*/
EventMessageChunkerStream.prototype._transform = function(chunk, encoding, callback) {
var chunkLength = chunk.length;
var currentOffset = 0;
while (currentOffset < chunkLength) {
// create new message if necessary
if (!this.currentMessage) {
// working on a new message, determine total length
var bytesRemaining = chunkLength - currentOffset;
// prevent edge case where total length spans 2 chunks
if (!this.messageLengthBuffer) {
this.messageLengthBuffer = allocBuffer(4);
}
var numBytesForTotal = Math.min(
4 - this.currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer
bytesRemaining // bytes left in chunk
);
chunk.copy(
this.messageLengthBuffer,
this.currentMessagePendingLength,
currentOffset,
currentOffset + numBytesForTotal
);
this.currentMessagePendingLength += numBytesForTotal;
currentOffset += numBytesForTotal;
if (this.currentMessagePendingLength < 4) {
// not enough information to create the current message
break;
}
this.allocateMessage(this.messageLengthBuffer.readUInt32BE(0));
this.messageLengthBuffer = null;
}
// write data into current message
var numBytesToWrite = Math.min(
this.currentMessageTotalLength - this.currentMessagePendingLength, // number of bytes left to complete message
chunkLength - currentOffset // number of bytes left in the original chunk
);
chunk.copy(
this.currentMessage, // target buffer
this.currentMessagePendingLength, // target offset
currentOffset, // chunk offset
currentOffset + numBytesToWrite // chunk end to write
);
this.currentMessagePendingLength += numBytesToWrite;
currentOffset += numBytesToWrite;
// check if a message is ready to be pushed
if (this.currentMessageTotalLength && this.currentMessageTotalLength === this.currentMessagePendingLength) {
// push out the message
this.push(this.currentMessage);
// cleanup
this.currentMessage = null;
this.currentMessageTotalLength = 0;
this.currentMessagePendingLength = 0;
}
}
callback();
};
EventMessageChunkerStream.prototype._flush = function(callback) {
if (this.currentMessageTotalLength) {
if (this.currentMessageTotalLength === this.currentMessagePendingLength) {
callback(null, this.currentMessage);
} else {
callback(new Error('Truncated event message received.'));
}
} else {
callback();
}
};
/**
* @param {number} size Size of the message to be allocated.
* @api private
*/
EventMessageChunkerStream.prototype.allocateMessage = function(size) {
if (typeof size !== 'number') {
throw new Error('Attempted to allocate an event message where size was not a number: ' + size);
}
this.currentMessageTotalLength = size;
this.currentMessagePendingLength = 4;
this.currentMessage = allocBuffer(size);
this.currentMessage.writeUInt32BE(size, 0);
};
/**
* @api private
*/
module.exports = {
EventMessageChunkerStream: EventMessageChunkerStream
};