|
@@ -10,6 +10,8 @@ const unzipper = require('unzipper');
|
|
|
const { ObjectId } = require('mongoose').Types;
|
|
const { ObjectId } = require('mongoose').Types;
|
|
|
|
|
|
|
|
const { createBatchStream } = require('../util/batch-stream');
|
|
const { createBatchStream } = require('../util/batch-stream');
|
|
|
|
|
+const CollectionProgressingStatus = require('../models/vo/collection-progressing-status');
|
|
|
|
|
+
|
|
|
|
|
|
|
|
const BULK_IMPORT_SIZE = 100;
|
|
const BULK_IMPORT_SIZE = 100;
|
|
|
|
|
|
|
@@ -22,6 +24,8 @@ class ImportService {
|
|
|
this.baseDir = path.join(crowi.tmpDir, 'imports');
|
|
this.baseDir = path.join(crowi.tmpDir, 'imports');
|
|
|
this.keepOriginal = this.keepOriginal.bind(this);
|
|
this.keepOriginal = this.keepOriginal.bind(this);
|
|
|
|
|
|
|
|
|
|
+ this.adminEvent = crowi.event('admin');
|
|
|
|
|
+
|
|
|
// { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
|
|
// { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
|
|
|
this.convertMap = {};
|
|
this.convertMap = {};
|
|
|
this.initConvertMap(crowi.models);
|
|
this.initConvertMap(crowi.models);
|
|
@@ -100,24 +104,57 @@ class ImportService {
|
|
|
};
|
|
};
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * import collections from json
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param {string} collections MongoDB collection name
|
|
|
|
|
+ * @param {object} jsonFileNamesMap key: collection name, value: json file name
|
|
|
|
|
+ * @param {object} overwriteParamsMap key: collection name, value: overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
|
|
+ */
|
|
|
|
|
+ async import(collections, jsonFileNamesMap, overwriteParamsMap) {
|
|
|
|
|
+ // init status object
|
|
|
|
|
+ this.currentProgressingStatus = new CollectionProgressingStatus(collections);
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ const promises = collections.map((collectionName) => {
|
|
|
|
|
+ const jsonFileName = jsonFileNamesMap[collectionName];
|
|
|
|
|
+ const overwriteParams = overwriteParamsMap[collectionName];
|
|
|
|
|
+ return this.importCollection(collections, jsonFileName, overwriteParams);
|
|
|
|
|
+ });
|
|
|
|
|
+ await Promise.all(promises);
|
|
|
|
|
+ }
|
|
|
|
|
+ finally {
|
|
|
|
|
+ this.currentProgressingStatus = null;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* import a collection from json
|
|
* import a collection from json
|
|
|
*
|
|
*
|
|
|
* @memberOf ImportService
|
|
* @memberOf ImportService
|
|
|
- * @param {object} Model instance of mongoose model
|
|
|
|
|
- * @param {string} jsonFile absolute path to the jsonFile being imported
|
|
|
|
|
|
|
+ * @param {string} collectionName MongoDB collection name
|
|
|
|
|
+ * @param {string} jsonFileName json file name
|
|
|
* @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
* @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
* @return {insertedIds: Array.<string>, failedIds: Array.<string>}
|
|
* @return {insertedIds: Array.<string>, failedIds: Array.<string>}
|
|
|
*/
|
|
*/
|
|
|
- async import(Model, jsonFile, overwriteParams = {}) {
|
|
|
|
|
|
|
+ async importCollection(collectionName, jsonFileName, overwriteParams = {}) {
|
|
|
// prepare functions invoked from custom streams
|
|
// prepare functions invoked from custom streams
|
|
|
const convertDocuments = this.convertDocuments.bind(this);
|
|
const convertDocuments = this.convertDocuments.bind(this);
|
|
|
const execUnorderedBulkOpSafely = this.execUnorderedBulkOpSafely.bind(this);
|
|
const execUnorderedBulkOpSafely = this.execUnorderedBulkOpSafely.bind(this);
|
|
|
|
|
+ const emitProgressEvent = this.emitProgressEvent.bind(this);
|
|
|
|
|
+ const emitTerminateEvent = this.emitTerminateEvent.bind(this);
|
|
|
|
|
|
|
|
|
|
+ const Model = this.growiBridgeService.getModelFromCollectionName(collectionName);
|
|
|
|
|
+ const jsonFile = this.getFile(jsonFileName);
|
|
|
|
|
+ const collectionProgress = this.currentProgressingStatus.progressMap[collectionName];
|
|
|
|
|
+
|
|
|
|
|
+ // stream 1
|
|
|
const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
|
|
|
|
|
|
|
+ // stream 2
|
|
|
const jsonStream = JSONStream.parse('*');
|
|
const jsonStream = JSONStream.parse('*');
|
|
|
|
|
|
|
|
|
|
+ // stream 3
|
|
|
const convertStream = new Transform({
|
|
const convertStream = new Transform({
|
|
|
objectMode: true,
|
|
objectMode: true,
|
|
|
transform(doc, encoding, callback) {
|
|
transform(doc, encoding, callback) {
|
|
@@ -127,8 +164,10 @@ class ImportService {
|
|
|
},
|
|
},
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
|
|
+ // stream 4
|
|
|
const batchStream = createBatchStream(BULK_IMPORT_SIZE);
|
|
const batchStream = createBatchStream(BULK_IMPORT_SIZE);
|
|
|
|
|
|
|
|
|
|
+ // stream 5
|
|
|
const writeStream = new Writable({
|
|
const writeStream = new Writable({
|
|
|
objectMode: true,
|
|
objectMode: true,
|
|
|
async write(batch, encoding, callback) {
|
|
async write(batch, encoding, callback) {
|
|
@@ -140,16 +179,18 @@ class ImportService {
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
// exec
|
|
// exec
|
|
|
- // eslint-disable-next-line no-unused-vars
|
|
|
|
|
- const { insertedIds, failedIds } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
|
|
+ const { insertedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Failed: ${errors.length}.`);
|
|
|
|
|
+ collectionProgress.currentCount += insertedCount;
|
|
|
|
|
|
|
|
- // TODO: emit event
|
|
|
|
|
|
|
+ emitProgressEvent(errors);
|
|
|
|
|
|
|
|
callback();
|
|
callback();
|
|
|
},
|
|
},
|
|
|
final(callback) {
|
|
final(callback) {
|
|
|
// TODO: logger.info
|
|
// TODO: logger.info
|
|
|
- // TODO: emit event
|
|
|
|
|
|
|
+
|
|
|
|
|
+ emitTerminateEvent();
|
|
|
|
|
|
|
|
callback();
|
|
callback();
|
|
|
},
|
|
},
|
|
@@ -167,6 +208,30 @@ class ImportService {
|
|
|
fs.unlinkSync(jsonFile);
|
|
fs.unlinkSync(jsonFile);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * emit progress event
|
|
|
|
|
+ * @param {object} appendedErrors key: collection name, value: array of error object
|
|
|
|
|
+ */
|
|
|
|
|
+ emitProgressEvent(appendedErrors) {
|
|
|
|
|
+ const { currentCount, totalCount, progressList } = this.currentProgressingStatus;
|
|
|
|
|
+ const data = {
|
|
|
|
|
+ currentCount,
|
|
|
|
|
+ totalCount,
|
|
|
|
|
+ progressList,
|
|
|
|
|
+ appendedErrors,
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ // send event (in progress in global)
|
|
|
|
|
+ this.adminEvent.emit('onProgressForImport', data);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * emit terminate event
|
|
|
|
|
+ */
|
|
|
|
|
+ emitTerminateEvent() {
|
|
|
|
|
+ this.adminEvent.emit('onTerminateForImport');
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* extract a zip file
|
|
* extract a zip file
|
|
|
*
|
|
*
|
|
@@ -204,38 +269,27 @@ class ImportService {
|
|
|
*
|
|
*
|
|
|
* @memberOf ImportService
|
|
* @memberOf ImportService
|
|
|
* @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
|
|
* @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
|
|
|
- * @return {{nInserted: number, failed: Array.<string>}} number of docuemnts inserted and failed
|
|
|
|
|
|
|
+ * @return {object} e.g. { insertedCount: 10, errors: [...] }
|
|
|
*/
|
|
*/
|
|
|
async execUnorderedBulkOpSafely(unorderedBulkOp) {
|
|
async execUnorderedBulkOpSafely(unorderedBulkOp) {
|
|
|
- // keep the number of documents inserted and failed for logger
|
|
|
|
|
- let insertedIds = [];
|
|
|
|
|
- let failedIds = [];
|
|
|
|
|
|
|
+ let insertedCount = 0;
|
|
|
|
|
+ let errors = [];
|
|
|
|
|
|
|
|
// try catch to skip errors
|
|
// try catch to skip errors
|
|
|
try {
|
|
try {
|
|
|
const log = await unorderedBulkOp.execute();
|
|
const log = await unorderedBulkOp.execute();
|
|
|
- const _insertedIds = log.result.insertedIds.map(op => op._id);
|
|
|
|
|
- insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
|
|
|
|
+ insertedCount = log.result.insertedIds.length;
|
|
|
}
|
|
}
|
|
|
catch (err) {
|
|
catch (err) {
|
|
|
- const collectionName = unorderedBulkOp.s.namespace;
|
|
|
|
|
-
|
|
|
|
|
- for (const error of err.result.result.writeErrors) {
|
|
|
|
|
- logger.error(`${collectionName}: ${error.errmsg}`);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- const _failedIds = err.result.result.writeErrors.map(err => err.err.op._id);
|
|
|
|
|
- const _insertedIds = err.result.result.insertedIds.filter(op => !_failedIds.includes(op._id)).map(op => op._id);
|
|
|
|
|
-
|
|
|
|
|
- failedIds = [...failedIds, ..._failedIds];
|
|
|
|
|
- insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
|
|
|
|
+ errors = err.writeErrors.map((err) => {
|
|
|
|
|
+ const moreDetailErr = err.err;
|
|
|
|
|
+ return { _id: moreDetailErr.op._id, message: err.errmsg };
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${insertedIds.length}. Failed: ${failedIds.length}.`);
|
|
|
|
|
-
|
|
|
|
|
return {
|
|
return {
|
|
|
- insertedIds,
|
|
|
|
|
- failedIds,
|
|
|
|
|
|
|
+ insertedCount,
|
|
|
|
|
+ errors,
|
|
|
};
|
|
};
|
|
|
}
|
|
}
|
|
|
|
|
|