build-message.js
5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
var util = require('../core').util;
var crypto = util.crypto;
var Int64 = require('./int64').Int64;
var toBuffer = util.buffer.toBuffer;
var allocBuffer = util.buffer.alloc;
var Buffer = util.Buffer;
/**
* @api private
*/
function buildMessage(message) {
var formattedHeaders = buildHeaders(message.headers);
var headerLengthBytes = allocBuffer(4);
headerLengthBytes.writeUInt32BE(formattedHeaders.length, 0);
var totalLengthBytes = allocBuffer(4);
totalLengthBytes.writeUInt32BE(
totalLengthBytes.length + // size of this buffer
headerLengthBytes.length + // size of header length buffer
4 + // prelude crc32
formattedHeaders.length + // total size of headers
message.body.length + // total size of payload
4, // size of crc32 of the total message
0
);
var prelude = Buffer.concat([
totalLengthBytes,
headerLengthBytes
], totalLengthBytes.length + headerLengthBytes.length);
var preludeCrc32 = crc32(prelude);
var totalSansCrc32 = Buffer.concat([
prelude, preludeCrc32, formattedHeaders, message.body
], prelude.length + preludeCrc32.length + formattedHeaders.length + message.body.length);
var totalCrc32 = crc32(totalSansCrc32);
return Buffer.concat([totalSansCrc32, totalCrc32]);
}
function buildHeaders(headers) {
/** @type {Buffer[]} */
var chunks = [];
var totalSize = 0;
var headerNames = Object.keys(headers);
for (var i = 0; i < headerNames.length; i++) {
var headerName = headerNames[i];
var bytes = toBuffer(headerName);
var headerValue = buildHeaderValue(headers[headerName]);
var nameLength = allocBuffer(1);
nameLength[0] = headerName.length;
chunks.push(
nameLength,
bytes,
headerValue
);
totalSize += nameLength.length + bytes.length + headerValue.length;
}
var out = allocBuffer(totalSize);
var position = 0;
for (var j = 0; j < chunks.length; j++) {
var chunk = chunks[j];
for (var k = 0; k < chunk.length; k++) {
out[position] = chunk[k];
position++;
}
}
return out;
}
/**
* @param {object} header
* @param {'boolean'|'byte'|'short'|'integer'|'long'|'binary'|'string'|'timestamp'|'uuid'} header.type
* @param {*} header.value
* @returns {Buffer}
*/
function buildHeaderValue(header) {
switch (header.type) {
case 'binary':
var binBytes = allocBuffer(3);
binBytes.writeUInt8(HEADER_VALUE_TYPE.byteArray, 0);
binBytes.writeUInt16BE(header.value.length, 1);
return Buffer.concat([
binBytes, header.value
], binBytes.length + header.value.length);
case 'boolean':
var boolByte = allocBuffer(1);
boolByte[0] = header.value ? HEADER_VALUE_TYPE.boolTrue : HEADER_VALUE_TYPE.boolFalse;
return boolByte;
case 'byte':
var singleByte = allocBuffer(2);
singleByte[0] = HEADER_VALUE_TYPE.byte;
singleByte[1] = header.value;
return singleByte;
case 'integer':
var intBytes = allocBuffer(5);
intBytes.writeUInt8(HEADER_VALUE_TYPE.integer, 0);
intBytes.writeInt32BE(header.value, 1);
return intBytes;
case 'long':
var longBytes = allocBuffer(1);
longBytes[0] = HEADER_VALUE_TYPE.long;
return Buffer.concat([
longBytes, header.value.bytes
], 9);
case 'short':
var shortBytes = allocBuffer(3);
shortBytes.writeUInt8(HEADER_VALUE_TYPE.short, 0);
shortBytes.writeInt16BE(header.value, 1);
return shortBytes;
case 'string':
var utf8Bytes = toBuffer(header.value);
var strBytes = allocBuffer(3);
strBytes.writeUInt8(HEADER_VALUE_TYPE.string, 0);
strBytes.writeUInt16BE(utf8Bytes.length, 1);
return Buffer.concat([
strBytes, utf8Bytes
], strBytes.length + utf8Bytes.length);
case 'timestamp':
var tsBytes = allocBuffer(1);
tsBytes[0] = HEADER_VALUE_TYPE.timestamp;
return Buffer.concat([
tsBytes, Int64.fromNumber(header.value.valueOf()).bytes
], 9);
case 'uuid':
if (!UUID_PATTERN.test(header.value)) {
throw new Error('Invalid UUID received: ' + header.value);
}
var uuidBytes = allocBuffer(1);
uuidBytes[0] = HEADER_VALUE_TYPE.uuid;
return Buffer.concat([
uuidBytes, toBuffer(header.value.replace(/\-/g, ''), 'hex')
], 17);
}
}
function crc32(buffer) {
var crc32 = crypto.crc32(buffer);
var crc32Buffer = allocBuffer(4);
crc32Buffer.writeUInt32BE(crc32, 0);
return crc32Buffer;
}
/**
* @api private
*/
var HEADER_VALUE_TYPE = {
boolTrue: 0,
boolFalse: 1,
byte: 2,
short: 3,
integer: 4,
long: 5,
byteArray: 6,
string: 7,
timestamp: 8,
uuid: 9,
};
var UUID_PATTERN = /^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$/;
/**
* @api private
*/
module.exports = {
buildMessage: buildMessage
};