|
@@ -26,6 +26,16 @@ class ImportOptions {
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+class ImportingCollectionError extends Error {
|
|
|
|
|
+
|
|
|
|
|
+ constructor(collectionProgress, error) {
|
|
|
|
|
+ super(error);
|
|
|
|
|
+ this.collectionProgress = collectionProgress;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
class ImportService {
|
|
class ImportService {
|
|
|
|
|
|
|
|
constructor(crowi) {
|
|
constructor(crowi) {
|
|
@@ -140,6 +150,12 @@ class ImportService {
|
|
|
});
|
|
});
|
|
|
await Promise.all(promises);
|
|
await Promise.all(promises);
|
|
|
}
|
|
}
|
|
|
|
|
+ // catch ImportingCollectionError
|
|
|
|
|
+ catch (err) {
|
|
|
|
|
+ const { collectionProgress } = err;
|
|
|
|
|
+ logger.error(`failed to import to ${collectionProgress.collectionName}`, err);
|
|
|
|
|
+ this.emitProgressEvent(collectionProgress, { message: err.message });
|
|
|
|
|
+ }
|
|
|
finally {
|
|
finally {
|
|
|
this.currentProgressingStatus = null;
|
|
this.currentProgressingStatus = null;
|
|
|
this.emitTerminateEvent();
|
|
this.emitTerminateEvent();
|
|
@@ -166,71 +182,97 @@ class ImportService {
|
|
|
const jsonFile = this.getFile(jsonFileName);
|
|
const jsonFile = this.getFile(jsonFileName);
|
|
|
const collectionProgress = this.currentProgressingStatus.progressMap[collectionName];
|
|
const collectionProgress = this.currentProgressingStatus.progressMap[collectionName];
|
|
|
|
|
|
|
|
- // flush
|
|
|
|
|
- if (mode === 'flushAndInsert') {
|
|
|
|
|
- await Model.remove({});
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ // validate options
|
|
|
|
|
+ this.validateImportOptions(collectionName, importOptions);
|
|
|
|
|
|
|
|
- // stream 1
|
|
|
|
|
- const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
|
|
|
|
+ // flush
|
|
|
|
|
+ if (mode === 'flushAndInsert') {
|
|
|
|
|
+ await Model.remove({});
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // stream 2
|
|
|
|
|
- const jsonStream = JSONStream.parse('*');
|
|
|
|
|
|
|
+ // stream 1
|
|
|
|
|
+ const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
|
|
|
|
|
- // stream 3
|
|
|
|
|
- const convertStream = new Transform({
|
|
|
|
|
- objectMode: true,
|
|
|
|
|
- transform(doc, encoding, callback) {
|
|
|
|
|
- const converted = convertDocuments(Model, doc, overwriteParams);
|
|
|
|
|
- this.push(converted);
|
|
|
|
|
- callback();
|
|
|
|
|
- },
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // stream 2
|
|
|
|
|
+ const jsonStream = JSONStream.parse('*');
|
|
|
|
|
|
|
|
- // stream 4
|
|
|
|
|
- const batchStream = createBatchStream(BULK_IMPORT_SIZE);
|
|
|
|
|
-
|
|
|
|
|
- // stream 5
|
|
|
|
|
- const writeStream = new Writable({
|
|
|
|
|
- objectMode: true,
|
|
|
|
|
- async write(batch, encoding, callback) {
|
|
|
|
|
- const unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
|
|
-
|
|
|
|
|
- // documents are not persisted until unorderedBulkOp.execute()
|
|
|
|
|
- batch.forEach((document) => {
|
|
|
|
|
- bulkOperate(unorderedBulkOp, collectionName, document, importOptions);
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- // exec
|
|
|
|
|
- const { insertedCount, modifiedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
- logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errors.length}.`);
|
|
|
|
|
-
|
|
|
|
|
- const increment = insertedCount + modifiedCount + errors.length;
|
|
|
|
|
- collectionProgress.currentCount += increment;
|
|
|
|
|
- collectionProgress.totalCount += increment;
|
|
|
|
|
- collectionProgress.insertedCount += insertedCount;
|
|
|
|
|
- collectionProgress.modifiedCount += modifiedCount;
|
|
|
|
|
-
|
|
|
|
|
- emitProgressEvent(collectionName, collectionProgress, errors);
|
|
|
|
|
-
|
|
|
|
|
- callback();
|
|
|
|
|
- },
|
|
|
|
|
- final(callback) {
|
|
|
|
|
- logger.info(`Importing ${collectionName} has terminated.`);
|
|
|
|
|
- callback();
|
|
|
|
|
- },
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // stream 3
|
|
|
|
|
+ const convertStream = new Transform({
|
|
|
|
|
+ objectMode: true,
|
|
|
|
|
+ transform(doc, encoding, callback) {
|
|
|
|
|
+ const converted = convertDocuments(Model, doc, overwriteParams);
|
|
|
|
|
+ this.push(converted);
|
|
|
|
|
+ callback();
|
|
|
|
|
+ },
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // stream 4
|
|
|
|
|
+ const batchStream = createBatchStream(BULK_IMPORT_SIZE);
|
|
|
|
|
+
|
|
|
|
|
+ // stream 5
|
|
|
|
|
+ const writeStream = new Writable({
|
|
|
|
|
+ objectMode: true,
|
|
|
|
|
+ async write(batch, encoding, callback) {
|
|
|
|
|
+ const unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
|
|
+
|
|
|
|
|
+ // documents are not persisted until unorderedBulkOp.execute()
|
|
|
|
|
+ batch.forEach((document) => {
|
|
|
|
|
+ bulkOperate(unorderedBulkOp, collectionName, document, importOptions);
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // exec
|
|
|
|
|
+ const { insertedCount, modifiedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errors.length}.`);
|
|
|
|
|
+
|
|
|
|
|
+ const increment = insertedCount + modifiedCount + errors.length;
|
|
|
|
|
+ collectionProgress.currentCount += increment;
|
|
|
|
|
+ collectionProgress.totalCount += increment;
|
|
|
|
|
+ collectionProgress.insertedCount += insertedCount;
|
|
|
|
|
+ collectionProgress.modifiedCount += modifiedCount;
|
|
|
|
|
+
|
|
|
|
|
+ emitProgressEvent(collectionProgress, errors);
|
|
|
|
|
+
|
|
|
|
|
+ callback();
|
|
|
|
|
+ },
|
|
|
|
|
+ final(callback) {
|
|
|
|
|
+ logger.info(`Importing ${collectionName} has terminated.`);
|
|
|
|
|
+ callback();
|
|
|
|
|
+ },
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ readStream
|
|
|
|
|
+ .pipe(jsonStream)
|
|
|
|
|
+ .pipe(convertStream)
|
|
|
|
|
+ .pipe(batchStream)
|
|
|
|
|
+ .pipe(writeStream);
|
|
|
|
|
|
|
|
- readStream
|
|
|
|
|
- .pipe(jsonStream)
|
|
|
|
|
- .pipe(convertStream)
|
|
|
|
|
- .pipe(batchStream)
|
|
|
|
|
- .pipe(writeStream);
|
|
|
|
|
|
|
+ await streamToPromise(writeStream);
|
|
|
|
|
|
|
|
- await streamToPromise(writeStream);
|
|
|
|
|
|
|
+ // clean up tmp directory
|
|
|
|
|
+ fs.unlinkSync(jsonFile);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (err) {
|
|
|
|
|
+ throw new ImportingCollectionError(collectionProgress, err);
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // clean up tmp directory
|
|
|
|
|
- fs.unlinkSync(jsonFile);
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param {string} collectionName
|
|
|
|
|
+ * @param {ImportOptions} importOptions
|
|
|
|
|
+ */
|
|
|
|
|
+ validateImportOptions(collectionName, importOptions) {
|
|
|
|
|
+ const { mode } = importOptions;
|
|
|
|
|
+
|
|
|
|
|
+ switch (collectionName) {
|
|
|
|
|
+ case 'configs':
|
|
|
|
|
+ if (mode !== 'flushAndInsert') {
|
|
|
|
|
+ throw new Error(`The specified mode '${mode}' is not allowed when importing to 'configs' collection.`);
|
|
|
|
|
+ }
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -255,9 +297,12 @@ class ImportService {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* emit progress event
|
|
* emit progress event
|
|
|
|
|
+ * @param {CollectionProgress} collectionProgress
|
|
|
* @param {object} appendedErrors key: collection name, value: array of error object
|
|
* @param {object} appendedErrors key: collection name, value: array of error object
|
|
|
*/
|
|
*/
|
|
|
- emitProgressEvent(collectionName, collectionProgress, appendedErrors) {
|
|
|
|
|
|
|
+ emitProgressEvent(collectionProgress, appendedErrors) {
|
|
|
|
|
+ const { collectionName } = collectionProgress;
|
|
|
|
|
+
|
|
|
// send event (in progress in global)
|
|
// send event (in progress in global)
|
|
|
this.adminEvent.emit('onProgressForImport', { collectionName, collectionProgress, appendedErrors });
|
|
this.adminEvent.emit('onProgressForImport', { collectionName, collectionProgress, appendedErrors });
|
|
|
}
|
|
}
|