node.js
6.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
var AWS = require('../core');
var Stream = AWS.util.stream.Stream;
var TransformStream = AWS.util.stream.Transform;
var ReadableStream = AWS.util.stream.Readable;
require('../http');
var CONNECTION_REUSE_ENV_NAME = 'AWS_NODEJS_CONNECTION_REUSE_ENABLED';
/**
* @api private
*/
AWS.NodeHttpClient = AWS.util.inherit({
handleRequest: function handleRequest(httpRequest, httpOptions, callback, errCallback) {
var self = this;
var endpoint = httpRequest.endpoint;
var pathPrefix = '';
if (!httpOptions) httpOptions = {};
if (httpOptions.proxy) {
pathPrefix = endpoint.protocol + '//' + endpoint.hostname;
if (endpoint.port !== 80 && endpoint.port !== 443) {
pathPrefix += ':' + endpoint.port;
}
endpoint = new AWS.Endpoint(httpOptions.proxy);
}
var useSSL = endpoint.protocol === 'https:';
var http = useSSL ? require('https') : require('http');
var options = {
host: endpoint.hostname,
port: endpoint.port,
method: httpRequest.method,
headers: httpRequest.headers,
path: pathPrefix + httpRequest.path
};
if (!httpOptions.agent) {
options.agent = this.getAgent(useSSL, {
keepAlive: process.env[CONNECTION_REUSE_ENV_NAME] === '1' ? true : false
});
}
AWS.util.update(options, httpOptions);
delete options.proxy; // proxy isn't an HTTP option
delete options.timeout; // timeout isn't an HTTP option
var stream = http.request(options, function (httpResp) {
if (stream.didCallback) return;
callback(httpResp);
httpResp.emit(
'headers',
httpResp.statusCode,
httpResp.headers,
httpResp.statusMessage
);
});
httpRequest.stream = stream; // attach stream to httpRequest
stream.didCallback = false;
// connection timeout support
if (httpOptions.connectTimeout) {
var connectTimeoutId;
stream.on('socket', function(socket) {
if (socket.connecting) {
connectTimeoutId = setTimeout(function connectTimeout() {
if (stream.didCallback) return; stream.didCallback = true;
stream.abort();
errCallback(AWS.util.error(
new Error('Socket timed out without establishing a connection'),
{code: 'TimeoutError'}
));
}, httpOptions.connectTimeout);
socket.on('connect', function() {
clearTimeout(connectTimeoutId);
connectTimeoutId = null;
});
}
});
}
// timeout support
stream.setTimeout(httpOptions.timeout || 0, function() {
if (stream.didCallback) return; stream.didCallback = true;
var msg = 'Connection timed out after ' + httpOptions.timeout + 'ms';
errCallback(AWS.util.error(new Error(msg), {code: 'TimeoutError'}));
stream.abort();
});
stream.on('error', function() {
if (connectTimeoutId) {
clearTimeout(connectTimeoutId);
connectTimeoutId = null;
}
if (stream.didCallback) return; stream.didCallback = true;
errCallback.apply(stream, arguments);
});
var expect = httpRequest.headers.Expect || httpRequest.headers.expect;
if (expect === '100-continue') {
stream.on('continue', function() {
self.writeBody(stream, httpRequest);
});
} else {
this.writeBody(stream, httpRequest);
}
return stream;
},
writeBody: function writeBody(stream, httpRequest) {
var body = httpRequest.body;
var totalBytes = parseInt(httpRequest.headers['Content-Length'], 10);
if (body instanceof Stream) {
// For progress support of streaming content -
// pipe the data through a transform stream to emit 'sendProgress' events
var progressStream = this.progressStream(stream, totalBytes);
if (progressStream) {
body.pipe(progressStream).pipe(stream);
} else {
body.pipe(stream);
}
} else if (body) {
// The provided body is a buffer/string and is already fully available in memory -
// For performance it's best to send it as a whole by calling stream.end(body),
// Callers expect a 'sendProgress' event which is best emitted once
// the http request stream has been fully written and all data flushed.
// The use of totalBytes is important over body.length for strings where
// length is char length and not byte length.
stream.once('finish', function() {
stream.emit('sendProgress', {
loaded: totalBytes,
total: totalBytes
});
});
stream.end(body);
} else {
// no request body
stream.end();
}
},
/**
* Create the https.Agent or http.Agent according to the request schema.
*/
getAgent: function getAgent(useSSL, agentOptions) {
var http = useSSL ? require('https') : require('http');
if (useSSL) {
if (!AWS.NodeHttpClient.sslAgent) {
AWS.NodeHttpClient.sslAgent = new http.Agent(AWS.util.merge({
rejectUnauthorized: true
}, agentOptions || {}));
AWS.NodeHttpClient.sslAgent.setMaxListeners(0);
// delegate maxSockets to globalAgent, set a default limit of 50 if current value is Infinity.
// Users can bypass this default by supplying their own Agent as part of SDK configuration.
Object.defineProperty(AWS.NodeHttpClient.sslAgent, 'maxSockets', {
enumerable: true,
get: function() {
var defaultMaxSockets = 50;
var globalAgent = http.globalAgent;
if (globalAgent && globalAgent.maxSockets !== Infinity && typeof globalAgent.maxSockets === 'number') {
return globalAgent.maxSockets;
}
return defaultMaxSockets;
}
});
}
return AWS.NodeHttpClient.sslAgent;
} else {
if (!AWS.NodeHttpClient.agent) {
AWS.NodeHttpClient.agent = new http.Agent(agentOptions);
}
return AWS.NodeHttpClient.agent;
}
},
progressStream: function progressStream(stream, totalBytes) {
if (typeof TransformStream === 'undefined') {
// for node 0.8 there is no streaming progress
return;
}
var loadedBytes = 0;
var reporter = new TransformStream();
reporter._transform = function(chunk, encoding, callback) {
if (chunk) {
loadedBytes += chunk.length;
stream.emit('sendProgress', {
loaded: loadedBytes,
total: totalBytes
});
}
callback(null, chunk);
};
return reporter;
},
emitter: null
});
/**
* @!ignore
*/
/**
* @api private
*/
AWS.HttpClient.prototype = AWS.NodeHttpClient.prototype;
/**
* @api private
*/
AWS.HttpClient.streamsApiVersion = ReadableStream ? 2 : 1;