bulk_write.js
2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
'use strict';
const applyRetryableWrites = require('../utils').applyRetryableWrites;
const applyWriteConcern = require('../utils').applyWriteConcern;
const MongoError = require('../core').MongoError;
const OperationBase = require('./operation').OperationBase;
class BulkWriteOperation extends OperationBase {
constructor(collection, operations, options) {
super(options);
this.collection = collection;
this.operations = operations;
}
execute(callback) {
const coll = this.collection;
const operations = this.operations;
let options = this.options;
// Add ignoreUndfined
if (coll.s.options.ignoreUndefined) {
options = Object.assign({}, options);
options.ignoreUndefined = coll.s.options.ignoreUndefined;
}
// Create the bulk operation
const bulk =
options.ordered === true || options.ordered == null
? coll.initializeOrderedBulkOp(options)
: coll.initializeUnorderedBulkOp(options);
// Do we have a collation
let collation = false;
// for each op go through and add to the bulk
try {
for (let i = 0; i < operations.length; i++) {
// Get the operation type
const key = Object.keys(operations[i])[0];
// Check if we have a collation
if (operations[i][key].collation) {
collation = true;
}
// Pass to the raw bulk
bulk.raw(operations[i]);
}
} catch (err) {
return callback(err, null);
}
// Final options for retryable writes and write concern
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
const writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
const capabilities = coll.s.topology.capabilities();
// Did the user pass in a collation, check if our write server supports it
if (collation && capabilities && !capabilities.commandsTakeCollation) {
return callback(new MongoError('server/primary/mongos does not support collation'));
}
// Execute the bulk
bulk.execute(writeCon, finalOptions, (err, r) => {
// We have connection level error
if (!r && err) {
return callback(err, null);
}
// Update the n
r.n = r.insertedCount;
// Inserted documents
const inserted = r.getInsertedIds();
// Map inserted ids
for (let i = 0; i < inserted.length; i++) {
r.insertedIds[inserted[i].index] = inserted[i]._id;
}
// Upserted documents
const upserted = r.getUpsertedIds();
// Map upserted ids
for (let i = 0; i < upserted.length; i++) {
r.upsertedIds[upserted[i].index] = upserted[i]._id;
}
// Return the results
callback(null, r);
});
}
}
module.exports = BulkWriteOperation;