common_functions.js
13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
'use strict';
const applyRetryableWrites = require('../utils').applyRetryableWrites;
const applyWriteConcern = require('../utils').applyWriteConcern;
const decorateWithCollation = require('../utils').decorateWithCollation;
const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
const executeCommand = require('./db_ops').executeCommand;
const formattedOrderClause = require('../utils').formattedOrderClause;
const handleCallback = require('../utils').handleCallback;
const MongoError = require('../core').MongoError;
const ReadPreference = require('../core').ReadPreference;
const toError = require('../utils').toError;
const CursorState = require('../core/cursor').CursorState;
const maxWireVersion = require('../core/utils').maxWireVersion;
/**
* Build the count command.
*
* @method
* @param {collectionOrCursor} an instance of a collection or cursor
* @param {object} query The query for the count.
* @param {object} [options] Optional settings. See Collection.prototype.count and Cursor.prototype.count for a list of options.
*/
function buildCountCommand(collectionOrCursor, query, options) {
const skip = options.skip;
const limit = options.limit;
let hint = options.hint;
const maxTimeMS = options.maxTimeMS;
query = query || {};
// Final query
const cmd = {
count: options.collectionName,
query: query
};
if (collectionOrCursor.s.numberOfRetries) {
// collectionOrCursor is a cursor
if (collectionOrCursor.options.hint) {
hint = collectionOrCursor.options.hint;
} else if (collectionOrCursor.cmd.hint) {
hint = collectionOrCursor.cmd.hint;
}
decorateWithCollation(cmd, collectionOrCursor, collectionOrCursor.cmd);
} else {
decorateWithCollation(cmd, collectionOrCursor, options);
}
// Add limit, skip and maxTimeMS if defined
if (typeof skip === 'number') cmd.skip = skip;
if (typeof limit === 'number') cmd.limit = limit;
if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
if (hint) cmd.hint = hint;
// Do we have a readConcern specified
decorateWithReadConcern(cmd, collectionOrCursor);
return cmd;
}
/**
* Find and update a document.
*
* @method
* @param {Collection} a Collection instance.
* @param {object} query Query object to locate the object to modify.
* @param {array} sort If multiple docs match, choose the first one in the specified sort order as the object to manipulate.
* @param {object} doc The fields/vals to be updated.
* @param {object} [options] Optional settings. See Collection.prototype.findAndModify for a list of options.
* @param {Collection~findAndModifyCallback} [callback] The command result callback
* @deprecated use findOneAndUpdate, findOneAndReplace or findOneAndDelete instead
*/
function findAndModify(coll, query, sort, doc, options, callback) {
// Create findAndModify command object
const queryObject = {
findAndModify: coll.collectionName,
query: query
};
sort = formattedOrderClause(sort);
if (sort) {
queryObject.sort = sort;
}
queryObject.new = options.new ? true : false;
queryObject.remove = options.remove ? true : false;
queryObject.upsert = options.upsert ? true : false;
const projection = options.projection || options.fields;
if (projection) {
queryObject.fields = projection;
}
if (options.arrayFilters) {
queryObject.arrayFilters = options.arrayFilters;
delete options.arrayFilters;
}
if (doc && !options.remove) {
queryObject.update = doc;
}
if (options.maxTimeMS) queryObject.maxTimeMS = options.maxTimeMS;
// Either use override on the function, or go back to default on either the collection
// level or db
options.serializeFunctions = options.serializeFunctions || coll.s.serializeFunctions;
// No check on the documents
options.checkKeys = false;
// Final options for retryable writes and write concern
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
// Decorate the findAndModify command with the write Concern
if (finalOptions.writeConcern) {
queryObject.writeConcern = finalOptions.writeConcern;
}
// Have we specified bypassDocumentValidation
if (finalOptions.bypassDocumentValidation === true) {
queryObject.bypassDocumentValidation = finalOptions.bypassDocumentValidation;
}
finalOptions.readPreference = ReadPreference.primary;
// Have we specified collation
try {
decorateWithCollation(queryObject, coll, finalOptions);
} catch (err) {
return callback(err, null);
}
// Execute the command
executeCommand(coll.s.db, queryObject, finalOptions, (err, result) => {
if (err) return handleCallback(callback, err, null);
return handleCallback(callback, null, result);
});
}
/**
* Retrieves this collections index info.
*
* @method
* @param {Db} db The Db instance on which to retrieve the index info.
* @param {string} name The name of the collection.
* @param {object} [options] Optional settings. See Db.prototype.indexInformation for a list of options.
* @param {Db~resultCallback} [callback] The command result callback
*/
function indexInformation(db, name, options, callback) {
// If we specified full information
const full = options['full'] == null ? false : options['full'];
// Did the user destroy the topology
if (db.serverConfig && db.serverConfig.isDestroyed())
return callback(new MongoError('topology was destroyed'));
// Process all the results from the index command and collection
function processResults(indexes) {
// Contains all the information
let info = {};
// Process all the indexes
for (let i = 0; i < indexes.length; i++) {
const index = indexes[i];
// Let's unpack the object
info[index.name] = [];
for (let name in index.key) {
info[index.name].push([name, index.key[name]]);
}
}
return info;
}
// Get the list of indexes of the specified collection
db.collection(name)
.listIndexes(options)
.toArray((err, indexes) => {
if (err) return callback(toError(err));
if (!Array.isArray(indexes)) return handleCallback(callback, null, []);
if (full) return handleCallback(callback, null, indexes);
handleCallback(callback, null, processResults(indexes));
});
}
function prepareDocs(coll, docs, options) {
const forceServerObjectId =
typeof options.forceServerObjectId === 'boolean'
? options.forceServerObjectId
: coll.s.db.options.forceServerObjectId;
// no need to modify the docs if server sets the ObjectId
if (forceServerObjectId === true) {
return docs;
}
return docs.map(doc => {
if (forceServerObjectId !== true && doc._id == null) {
doc._id = coll.s.pkFactory.createPk();
}
return doc;
});
}
// Get the next available document from the cursor, returns null if no more documents are available.
function nextObject(cursor, callback) {
if (cursor.s.state === CursorState.CLOSED || (cursor.isDead && cursor.isDead())) {
return handleCallback(
callback,
MongoError.create({ message: 'Cursor is closed', driver: true })
);
}
if (cursor.s.state === CursorState.INIT && cursor.cmd && cursor.cmd.sort) {
try {
cursor.cmd.sort = formattedOrderClause(cursor.cmd.sort);
} catch (err) {
return handleCallback(callback, err);
}
}
// Get the next object
cursor._next((err, doc) => {
cursor.s.state = CursorState.OPEN;
if (err) return handleCallback(callback, err);
handleCallback(callback, null, doc);
});
}
function insertDocuments(coll, docs, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// Ensure we are operating on an array op docs
docs = Array.isArray(docs) ? docs : [docs];
// Final options for retryable writes and write concern
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
// If keep going set unordered
if (finalOptions.keepGoing === true) finalOptions.ordered = false;
finalOptions.serializeFunctions = options.serializeFunctions || coll.s.serializeFunctions;
docs = prepareDocs(coll, docs, options);
// File inserts
coll.s.topology.insert(coll.s.namespace, docs, finalOptions, (err, result) => {
if (callback == null) return;
if (err) return handleCallback(callback, err);
if (result == null) return handleCallback(callback, null, null);
if (result.result.code) return handleCallback(callback, toError(result.result));
if (result.result.writeErrors)
return handleCallback(callback, toError(result.result.writeErrors[0]));
// Add docs to the list
result.ops = docs;
// Return the results
handleCallback(callback, null, result);
});
}
function removeDocuments(coll, selector, options, callback) {
if (typeof options === 'function') {
(callback = options), (options = {});
} else if (typeof selector === 'function') {
callback = selector;
options = {};
selector = {};
}
// Create an empty options object if the provided one is null
options = options || {};
// Final options for retryable writes and write concern
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
// If selector is null set empty
if (selector == null) selector = {};
// Build the op
const op = { q: selector, limit: 0 };
if (options.single) {
op.limit = 1;
} else if (finalOptions.retryWrites) {
finalOptions.retryWrites = false;
}
if (options.hint) {
op.hint = options.hint;
}
// Have we specified collation
try {
decorateWithCollation(finalOptions, coll, options);
} catch (err) {
return callback(err, null);
}
if (options.explain !== undefined && maxWireVersion(coll.s.topology) < 3) {
return callback
? callback(new MongoError(`server does not support explain on remove`))
: undefined;
}
// Execute the remove
coll.s.topology.remove(coll.s.namespace, [op], finalOptions, (err, result) => {
if (callback == null) return;
if (err) return handleCallback(callback, err, null);
if (result == null) return handleCallback(callback, null, null);
if (result.result.code) return handleCallback(callback, toError(result.result));
if (result.result.writeErrors) {
return handleCallback(callback, toError(result.result.writeErrors[0]));
}
// Return the results
handleCallback(callback, null, result);
});
}
function updateDocuments(coll, selector, document, options, callback) {
if ('function' === typeof options) (callback = options), (options = null);
if (options == null) options = {};
if (!('function' === typeof callback)) callback = null;
// If we are not providing a selector or document throw
if (selector == null || typeof selector !== 'object')
return callback(toError('selector must be a valid JavaScript object'));
if (document == null || typeof document !== 'object')
return callback(toError('document must be a valid JavaScript object'));
// Final options for retryable writes and write concern
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
// Do we return the actual result document
// Either use override on the function, or go back to default on either the collection
// level or db
finalOptions.serializeFunctions = options.serializeFunctions || coll.s.serializeFunctions;
// Execute the operation
const op = { q: selector, u: document };
op.upsert = options.upsert !== void 0 ? !!options.upsert : false;
op.multi = options.multi !== void 0 ? !!options.multi : false;
if (options.hint) {
op.hint = options.hint;
}
if (finalOptions.arrayFilters) {
op.arrayFilters = finalOptions.arrayFilters;
delete finalOptions.arrayFilters;
}
if (finalOptions.retryWrites && op.multi) {
finalOptions.retryWrites = false;
}
// Have we specified collation
try {
decorateWithCollation(finalOptions, coll, options);
} catch (err) {
return callback(err, null);
}
if (options.explain !== undefined && maxWireVersion(coll.s.topology) < 3) {
return callback
? callback(new MongoError(`server does not support explain on update`))
: undefined;
}
// Update options
coll.s.topology.update(coll.s.namespace, [op], finalOptions, (err, result) => {
if (callback == null) return;
if (err) return handleCallback(callback, err, null);
if (result == null) return handleCallback(callback, null, null);
if (result.result.code) return handleCallback(callback, toError(result.result));
if (result.result.writeErrors)
return handleCallback(callback, toError(result.result.writeErrors[0]));
// Return the results
handleCallback(callback, null, result);
});
}
module.exports = {
buildCountCommand,
findAndModify,
indexInformation,
nextObject,
prepareDocs,
insertDocuments,
removeDocuments,
updateDocuments
};