eachAsync.js
4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
'use strict';
/*!
* Module dependencies.
*/
const immediate = require('../immediate');
const promiseOrCallback = require('../promiseOrCallback');
/**
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*
* @param {Function} next the thunk to call to get the next document
* @param {Function} fn
* @param {Object} options
* @param {Function} [callback] executed when all docs have been processed
* @return {Promise}
* @api public
* @method eachAsync
*/
module.exports = function eachAsync(next, fn, options, callback) {
const parallel = options.parallel || 1;
const batchSize = options.batchSize;
const enqueue = asyncQueue();
return promiseOrCallback(callback, cb => {
if (batchSize != null) {
if (typeof batchSize !== 'number') {
throw new TypeError('batchSize must be a number');
}
if (batchSize < 1) {
throw new TypeError('batchSize must be at least 1');
}
if (batchSize !== Math.floor(batchSize)) {
throw new TypeError('batchSize must be a positive integer');
}
}
iterate(cb);
});
function iterate(finalCallback) {
let drained = false;
let handleResultsInProgress = 0;
let currentDocumentIndex = 0;
let documentsBatch = [];
let error = null;
for (let i = 0; i < parallel; ++i) {
enqueue(fetch);
}
function fetch(done) {
if (drained || error) {
return done();
}
next(function(err, doc) {
if (drained || error != null) {
return done();
}
if (err != null) {
error = err;
finalCallback(err);
return done();
}
if (doc == null) {
drained = true;
if (handleResultsInProgress <= 0) {
finalCallback(null);
} else if (batchSize != null && documentsBatch.length) {
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
}
return done();
}
++handleResultsInProgress;
// Kick off the subsequent `next()` before handling the result, but
// make sure we know that we still have a result to handle re: #8422
immediate(() => done());
if (batchSize != null) {
documentsBatch.push(doc);
}
// If the current documents size is less than the provided patch size don't process the documents yet
if (batchSize != null && documentsBatch.length !== batchSize) {
setTimeout(() => enqueue(fetch), 0);
return;
}
const docsToProcess = batchSize != null ? documentsBatch : doc;
function handleNextResultCallBack(err) {
if (batchSize != null) {
handleResultsInProgress -= documentsBatch.length;
documentsBatch = [];
} else {
--handleResultsInProgress;
}
if (err != null) {
error = err;
return finalCallback(err);
}
if (drained && handleResultsInProgress <= 0) {
return finalCallback(null);
}
setTimeout(() => enqueue(fetch), 0);
}
handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
});
}
}
function handleNextResult(doc, i, callback) {
const promise = fn(doc, i);
if (promise && typeof promise.then === 'function') {
promise.then(
function() { callback(null); },
function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
} else {
callback(null);
}
}
};
// `next()` can only execute one at a time, so make sure we always execute
// `next()` in series, while still allowing multiple `fn()` instances to run
// in parallel.
function asyncQueue() {
const _queue = [];
let inProgress = null;
let id = 0;
return function enqueue(fn) {
if (_queue.length === 0 && inProgress == null) {
inProgress = id++;
return fn(_step);
}
_queue.push(fn);
};
function _step() {
inProgress = null;
if (_queue.length > 0) {
inProgress = id++;
const fn = _queue.shift();
fn(_step);
}
}
}