|
@@ -67,61 +67,66 @@ class ImportService {
|
|
|
* @param {object} Model instance of mongoose model
|
|
* @param {object} Model instance of mongoose model
|
|
|
* @param {string} jsonFile absolute path to the jsonFile being imported
|
|
* @param {string} jsonFile absolute path to the jsonFile being imported
|
|
|
* @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
* @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
|
|
+ * @return {insertedIds: Array.<string>, failedIds: Array.<string>}
|
|
|
*/
|
|
*/
|
|
|
async import(Model, jsonFile, overwriteParams = {}) {
|
|
async import(Model, jsonFile, overwriteParams = {}) {
|
|
|
- const { collectionName } = Model.collection;
|
|
|
|
|
-
|
|
|
|
|
- let counter = 0;
|
|
|
|
|
- let nInsertedTotal = 0;
|
|
|
|
|
-
|
|
|
|
|
- let failedIds = [];
|
|
|
|
|
- let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
|
|
-
|
|
|
|
|
- const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
|
|
- const jsonStream = readStream.pipe(JSONStream.parse('*'));
|
|
|
|
|
-
|
|
|
|
|
- jsonStream.on('data', async(document) => {
|
|
|
|
|
- // documents are not persisted until unorderedBulkOp.execute()
|
|
|
|
|
- unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
|
|
|
|
|
-
|
|
|
|
|
- counter++;
|
|
|
|
|
-
|
|
|
|
|
- if (counter % this.per === 0) {
|
|
|
|
|
- // puase jsonStream to prevent more items to be added to unorderedBulkOp
|
|
|
|
|
- jsonStream.pause();
|
|
|
|
|
-
|
|
|
|
|
- const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
- nInsertedTotal += nInserted;
|
|
|
|
|
- failedIds = [...failedIds, ...failed];
|
|
|
|
|
-
|
|
|
|
|
- // reset initializeUnorderedBulkOp
|
|
|
|
|
- unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
|
|
-
|
|
|
|
|
- // resume jsonStream
|
|
|
|
|
- jsonStream.resume();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // streamToPromise(jsonStream) throws an error, use new Promise instead
|
|
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
|
|
+ const { collectionName } = Model.collection;
|
|
|
|
|
+
|
|
|
|
|
+ let counter = 0;
|
|
|
|
|
+ let insertedIds = [];
|
|
|
|
|
+ let failedIds = [];
|
|
|
|
|
+ let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
|
|
+
|
|
|
|
|
+ const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
|
|
+ const jsonStream = readStream.pipe(JSONStream.parse('*'));
|
|
|
|
|
+
|
|
|
|
|
+ jsonStream.on('data', async(document) => {
|
|
|
|
|
+ // documents are not persisted until unorderedBulkOp.execute()
|
|
|
|
|
+ unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
|
|
|
|
|
+
|
|
|
|
|
+ counter++;
|
|
|
|
|
+
|
|
|
|
|
+ if (counter % this.per === 0) {
|
|
|
|
|
+ // puase jsonStream to prevent more items to be added to unorderedBulkOp
|
|
|
|
|
+ jsonStream.pause();
|
|
|
|
|
+
|
|
|
|
|
+ const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
+ insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
|
|
+ failedIds = [...failedIds, ..._failedIds];
|
|
|
|
|
+
|
|
|
|
|
+ // reset initializeUnorderedBulkOp
|
|
|
|
|
+ unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
|
|
+
|
|
|
|
|
+ // resume jsonStream
|
|
|
|
|
+ jsonStream.resume();
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ jsonStream.on('end', async(data) => {
|
|
|
|
|
+ // insert the rest. avoid errors when unorderedBulkOp has no items
|
|
|
|
|
+ if (unorderedBulkOp.s.currentBatch !== null) {
|
|
|
|
|
+ const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
+ insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
|
|
+ failedIds = [...failedIds, ..._failedIds];
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(`Done. Inserted ${insertedIds.length} ${collectionName}.`);
|
|
|
|
|
+
|
|
|
|
|
+ if (failedIds.length > 0) {
|
|
|
|
|
+ logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // clean up tmp directory
|
|
|
|
|
+ fs.unlinkSync(jsonFile);
|
|
|
|
|
+
|
|
|
|
|
+ return resolve({
|
|
|
|
|
+ insertedIds,
|
|
|
|
|
+ failedIds,
|
|
|
|
|
+ });
|
|
|
|
|
+ });
|
|
|
});
|
|
});
|
|
|
-
|
|
|
|
|
- jsonStream.on('end', async(data) => {
|
|
|
|
|
- // insert the rest. avoid errors when unorderedBulkOp has no items
|
|
|
|
|
- if (unorderedBulkOp.s.currentBatch !== null) {
|
|
|
|
|
- const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
- nInsertedTotal += nInserted;
|
|
|
|
|
- failedIds = [...failedIds, ...failed];
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- logger.info(`Done. Inserted ${nInsertedTotal} ${collectionName}.`);
|
|
|
|
|
-
|
|
|
|
|
- if (failedIds.length > 0) {
|
|
|
|
|
- logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- // streamToPromise(jsonStream) throws error, so await readStream instead
|
|
|
|
|
- await streamToPromise(readStream);
|
|
|
|
|
-
|
|
|
|
|
- // clean up tmp directory
|
|
|
|
|
- fs.unlinkSync(jsonFile);
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -165,28 +170,34 @@ class ImportService {
|
|
|
*/
|
|
*/
|
|
|
async execUnorderedBulkOpSafely(unorderedBulkOp) {
|
|
async execUnorderedBulkOpSafely(unorderedBulkOp) {
|
|
|
// keep the number of documents inserted and failed for logger
|
|
// keep the number of documents inserted and failed for logger
|
|
|
- let nInserted = 0;
|
|
|
|
|
- const failed = [];
|
|
|
|
|
|
|
+ let insertedIds = [];
|
|
|
|
|
+ let failedIds = [];
|
|
|
|
|
|
|
|
// try catch to skip errors
|
|
// try catch to skip errors
|
|
|
try {
|
|
try {
|
|
|
const log = await unorderedBulkOp.execute();
|
|
const log = await unorderedBulkOp.execute();
|
|
|
- nInserted = log.result.nInserted;
|
|
|
|
|
|
|
+ const _insertedIds = log.result.insertedIds.map(op => op._id);
|
|
|
|
|
+ insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
}
|
|
}
|
|
|
catch (err) {
|
|
catch (err) {
|
|
|
|
|
+ const collectionName = unorderedBulkOp.s.namespace;
|
|
|
|
|
+
|
|
|
for (const error of err.result.result.writeErrors) {
|
|
for (const error of err.result.result.writeErrors) {
|
|
|
- logger.error(error.errmsg);
|
|
|
|
|
- failed.push(error.err.op._id);
|
|
|
|
|
|
|
+ logger.error(`${collectionName}: ${error.errmsg}`);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- nInserted = err.result.result.nInserted;
|
|
|
|
|
|
|
+ const _failedIds = err.result.result.writeErrors.map(err => err.err.op._id);
|
|
|
|
|
+ const _insertedIds = err.result.result.insertedIds.filter(op => !_failedIds.includes(op._id)).map(op => op._id);
|
|
|
|
|
+
|
|
|
|
|
+ failedIds = [...failedIds, ..._failedIds];
|
|
|
|
|
+ insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${nInserted}. Failed: ${failed.length}.`);
|
|
|
|
|
|
|
+ logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${insertedIds.length}. Failed: ${failedIds.length}.`);
|
|
|
|
|
|
|
|
return {
|
|
return {
|
|
|
- nInserted,
|
|
|
|
|
- failed,
|
|
|
|
|
|
|
+ insertedIds,
|
|
|
|
|
+ failedIds,
|
|
|
};
|
|
};
|
|
|
}
|
|
}
|
|
|
|
|
|