mjpeg.js
5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import _ from 'lodash';
import log from './logger';
import B from 'bluebird';
import { getJimpImage, MIME_PNG } from './image-util';
import { Writable } from 'stream';
import { requirePackage } from './node';
import axios from 'axios';
// lazy load this, as it might not be available
let MJpegConsumer = null;
/**
* @throws {Error} If `mjpeg-consumer` module is not installed or cannot be loaded
*/
async function initMJpegConsumer () {
if (!MJpegConsumer) {
try {
MJpegConsumer = await requirePackage('mjpeg-consumer');
} catch (ign) {}
}
if (!MJpegConsumer) {
throw new Error('mjpeg-consumer module is required to use MJPEG-over-HTTP features. ' +
'Please install it first (npm i -g mjpeg-consumer) and restart Appium.');
}
}
// amount of time to wait for the first image in the stream
const MJPEG_SERVER_TIMEOUT_MS = 10000;
/** Class which stores the last bit of data streamed into it */
class MJpegStream extends Writable {
/**
* Create an MJpegStream
* @param {string} mJpegUrl - URL of MJPEG-over-HTTP stream
* @param {function} [errorHandler=noop] - additional function that will be
* called in the case of any errors.
* @param {object} [options={}] - Options to pass to the Writable constructor
*/
constructor (mJpegUrl, errorHandler = _.noop, options = {}) {
super(options);
this.errorHandler = errorHandler;
this.url = mJpegUrl;
this.clear();
}
/**
* Get the base64-encoded version of the JPEG
*
* @returns {?string} base64-encoded JPEG image data
* or `null` if no image can be parsed
*/
get lastChunkBase64 () {
return !_.isEmpty(this.lastChunk) && _.isBuffer(this.lastChunk)
? this.lastChunk.toString('base64')
: null;
}
/**
* Get the PNG version of the JPEG buffer
*
* @returns {?Buffer} PNG image data or `null` if no PNG
* image can be parsed
*/
async lastChunkPNG () {
if (_.isEmpty(this.lastChunk) || !_.isBuffer(this.lastChunk)) {
return null;
}
try {
const jpg = await getJimpImage(this.lastChunk);
return await jpg.getBuffer(MIME_PNG);
} catch (e) {
return null;
}
}
/**
* Get the base64-encoded version of the PNG
*
* @returns {?string} base64-encoded PNG image data
* or `null` if no image can be parsed
*/
async lastChunkPNGBase64 () {
const png = await this.lastChunkPNG();
return png ? png.toString('base64') : null;
}
/**
* Reset internal state
*/
clear () {
this.registerStartSuccess = null;
this.registerStartFailure = null;
this.responseStream = null;
this.consumer = null;
this.lastChunk = null;
this.updateCount = 0;
}
/**
* Start reading the MJpeg stream and storing the last image
*/
async start (serverTimeout = MJPEG_SERVER_TIMEOUT_MS) {
// ensure we're not started already
this.stop();
await initMJpegConsumer();
this.consumer = new MJpegConsumer();
// use the deferred pattern so we can wait for the start of the stream
// based on what comes in from an external pipe
const startPromise = new B((res, rej) => {
this.registerStartSuccess = res;
this.registerStartFailure = rej;
})
// start a timeout so that if the server does not return data, we don't
// block forever.
.timeout(serverTimeout,
`Waited ${serverTimeout}ms but the MJPEG server never sent any images`);
const url = this.url;
const onErr = (err) => {
// Make sure we don't get an outdated screenshot if there was an error
this.lastChunk = null;
log.error(`Error getting MJpeg screenshot chunk: ${err.message}`);
this.errorHandler(err);
if (this.registerStartFailure) {
this.registerStartFailure(err);
}
};
const onClose = () => {
log.debug(`The connection to MJPEG server at ${url} has been closed`);
this.lastChunk = null;
};
try {
this.responseStream = (await axios({
url,
responseType: 'stream',
timeout: serverTimeout,
})).data;
} catch (e) {
return onErr(e);
}
this.responseStream
.once('close', onClose)
.on('error', onErr) // ensure we do something with errors
.pipe(this.consumer) // allow chunking and transforming of jpeg data
.pipe(this); // send the actual jpegs to ourself
await startPromise;
}
/**
* Stop reading the MJpeg stream. Ensure we disconnect all the pipes and stop
* the HTTP request itself. Then reset the state.
*/
stop () {
if (!this.consumer) {
return;
}
this.responseStream.unpipe(this.consumer);
this.consumer.unpipe(this);
this.responseStream.destroy();
this.clear();
}
/**
* Override the Writable write() method in order to save the last image and
* log the number of images we have received
* @override
* @param {Buffer} data - binary data streamed from the MJpeg consumer
*/
write (data) {
this.lastChunk = data;
this.updateCount++;
if (this.registerStartSuccess) {
this.registerStartSuccess();
this.registerStartSuccess = null;
}
}
}
export { MJpegStream };