|
|
@@ -15,6 +15,17 @@ const CollectionProgressingStatus = require('../models/vo/collection-progressing
|
|
|
|
|
|
const BULK_IMPORT_SIZE = 100;
|
|
|
|
|
|
+
|
|
|
+class ImportOptions {
|
|
|
+
|
|
|
+ constructor(mode) {
|
|
|
+ this.mode = mode || 'insert';
|
|
|
+ this.jsonFileName = null;
|
|
|
+ this.overwriteParams = null;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
class ImportService {
|
|
|
|
|
|
constructor(crowi) {
|
|
|
@@ -33,6 +44,14 @@ class ImportService {
|
|
|
this.currentProgressingStatus = null;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * generate ImportOptions instance
|
|
|
+ * @param {string} mode bulk operation mode (insert | upsert | flushAndInsert)
|
|
|
+ */
|
|
|
+ generateImportOptions(mode) {
|
|
|
+ return new ImportOptions(mode);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* initialize convert map. set keepOriginal as default
|
|
|
*
|
|
|
@@ -108,18 +127,16 @@ class ImportService {
|
|
|
* import collections from json
|
|
|
*
|
|
|
* @param {string} collections MongoDB collection name
|
|
|
- * @param {object} jsonFileNamesMap key: collection name, value: json file name
|
|
|
- * @param {object} overwriteParamsMap key: collection name, value: overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
+ * @param {array} importOptionsMap key: collection name, value: ImportOptions instance
|
|
|
*/
|
|
|
- async import(collections, jsonFileNamesMap, overwriteParamsMap) {
|
|
|
+ async import(collections, importOptionsMap) {
|
|
|
// init status object
|
|
|
this.currentProgressingStatus = new CollectionProgressingStatus(collections);
|
|
|
|
|
|
try {
|
|
|
const promises = collections.map((collectionName) => {
|
|
|
- const jsonFileName = jsonFileNamesMap[collectionName];
|
|
|
- const overwriteParams = overwriteParamsMap[collectionName];
|
|
|
- return this.importCollection(collections, jsonFileName, overwriteParams);
|
|
|
+ const importOptions = importOptionsMap[collectionName];
|
|
|
+ return this.importCollection(collections, importOptions);
|
|
|
});
|
|
|
await Promise.all(promises);
|
|
|
}
|
|
|
@@ -133,17 +150,18 @@ class ImportService {
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
* @param {string} collectionName MongoDB collection name
|
|
|
- * @param {string} jsonFileName json file name
|
|
|
- * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
+ * @param {ImportOptions} importOptions
|
|
|
* @return {insertedIds: Array.<string>, failedIds: Array.<string>}
|
|
|
*/
|
|
|
- async importCollection(collectionName, jsonFileName, overwriteParams = {}) {
|
|
|
+ async importCollection(collectionName, importOptions) {
|
|
|
// prepare functions invoked from custom streams
|
|
|
const convertDocuments = this.convertDocuments.bind(this);
|
|
|
+ const bulkOperate = this.bulkOperate.bind(this);
|
|
|
const execUnorderedBulkOpSafely = this.execUnorderedBulkOpSafely.bind(this);
|
|
|
const emitProgressEvent = this.emitProgressEvent.bind(this);
|
|
|
const emitTerminateEvent = this.emitTerminateEvent.bind(this);
|
|
|
|
|
|
+ const { jsonFileName, overwriteParams } = importOptions;
|
|
|
const Model = this.growiBridgeService.getModelFromCollectionName(collectionName);
|
|
|
const jsonFile = this.getFile(jsonFileName);
|
|
|
const collectionProgress = this.currentProgressingStatus.progressMap[collectionName];
|
|
|
@@ -175,13 +193,13 @@ class ImportService {
|
|
|
|
|
|
// documents are not persisted until unorderedBulkOp.execute()
|
|
|
batch.forEach((document) => {
|
|
|
- unorderedBulkOp.insert(document);
|
|
|
+ bulkOperate(unorderedBulkOp, collectionName, document, importOptions);
|
|
|
});
|
|
|
|
|
|
// exec
|
|
|
- const { insertedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
- logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Failed: ${errors.length}.`);
|
|
|
- collectionProgress.currentCount += insertedCount;
|
|
|
+ const { insertedCount, modifiedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errors.length}.`);
|
|
|
+ collectionProgress.currentCount += insertedCount + modifiedCount;
|
|
|
|
|
|
emitProgressEvent(errors);
|
|
|
|
|
|
@@ -208,6 +226,26 @@ class ImportService {
|
|
|
fs.unlinkSync(jsonFile);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * process bulk operation
|
|
|
+ * @param {object} bulk MongoDB Bulk instance
|
|
|
+ * @param {string} collectionName collection name
|
|
|
+ * @param {object} document
|
|
|
+ * @param {ImportOptions} importOptions
|
|
|
+ */
|
|
|
+ bulkOperate(bulk, collectionName, document, importOptions) {
|
|
|
+ // insert
|
|
|
+ if (importOptions.mode !== 'upsert') {
|
|
|
+ return bulk.insert(document);
|
|
|
+ }
|
|
|
+
|
|
|
+ // upsert
|
|
|
+ switch (collectionName) {
|
|
|
+ default:
|
|
|
+ return bulk.find({ _id: document._id }).upsert().replaceOne(document);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* emit progress event
|
|
|
* @param {object} appendedErrors key: collection name, value: array of error object
|
|
|
@@ -272,23 +310,27 @@ class ImportService {
|
|
|
* @return {object} e.g. { insertedCount: 10, errors: [...] }
|
|
|
*/
|
|
|
async execUnorderedBulkOpSafely(unorderedBulkOp) {
|
|
|
- let insertedCount = 0;
|
|
|
let errors = [];
|
|
|
+ let result = null;
|
|
|
|
|
|
- // try catch to skip errors
|
|
|
try {
|
|
|
const log = await unorderedBulkOp.execute();
|
|
|
- insertedCount = log.result.insertedIds.length;
|
|
|
+ result = log.result;
|
|
|
}
|
|
|
catch (err) {
|
|
|
+ result = err.result;
|
|
|
errors = err.writeErrors.map((err) => {
|
|
|
const moreDetailErr = err.err;
|
|
|
return { _id: moreDetailErr.op._id, message: err.errmsg };
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ const insertedCount = result.nInserted + result.nUpserted;
|
|
|
+ const modifiedCount = result.nModified;
|
|
|
+
|
|
|
return {
|
|
|
insertedCount,
|
|
|
+ modifiedCount,
|
|
|
errors,
|
|
|
};
|
|
|
}
|