|
|
@@ -1,11 +1,41 @@
|
|
|
const logger = require('@alias/logger')('growi:services:ImportService'); // eslint-disable-line no-unused-vars
|
|
|
const fs = require('fs');
|
|
|
const path = require('path');
|
|
|
+
|
|
|
+const { Writable, Transform } = require('stream');
|
|
|
const JSONStream = require('JSONStream');
|
|
|
const streamToPromise = require('stream-to-promise');
|
|
|
const unzipper = require('unzipper');
|
|
|
+
|
|
|
const { ObjectId } = require('mongoose').Types;
|
|
|
|
|
|
+const { createBatchStream } = require('../util/batch-stream');
|
|
|
+const CollectionProgressingStatus = require('../models/vo/collection-progressing-status');
|
|
|
+
|
|
|
+
|
|
|
+const BULK_IMPORT_SIZE = 100;
|
|
|
+
|
|
|
+
|
|
|
+class ImportSettings {
|
|
|
+
|
|
|
+ constructor(mode) {
|
|
|
+ this.mode = mode || 'insert';
|
|
|
+ this.jsonFileName = null;
|
|
|
+ this.overwriteParams = null;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+class ImportingCollectionError extends Error {
|
|
|
+
|
|
|
+ constructor(collectionProgress, error) {
|
|
|
+ super(error);
|
|
|
+ this.collectionProgress = collectionProgress;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
class ImportService {
|
|
|
|
|
|
constructor(crowi) {
|
|
|
@@ -13,12 +43,23 @@ class ImportService {
|
|
|
this.growiBridgeService = crowi.growiBridgeService;
|
|
|
this.getFile = this.growiBridgeService.getFile.bind(this);
|
|
|
this.baseDir = path.join(crowi.tmpDir, 'imports');
|
|
|
- this.per = 100;
|
|
|
this.keepOriginal = this.keepOriginal.bind(this);
|
|
|
|
|
|
+ this.adminEvent = crowi.event('admin');
|
|
|
+
|
|
|
// { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
|
|
|
this.convertMap = {};
|
|
|
this.initConvertMap(crowi.models);
|
|
|
+
|
|
|
+ this.currentProgressingStatus = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * generate ImportSettings instance
|
|
|
+ * @param {string} mode bulk operation mode (insert | upsert | flushAndInsert)
|
|
|
+ */
|
|
|
+ generateImportSettings(mode) {
|
|
|
+ return new ImportSettings(mode);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -48,89 +89,229 @@ class ImportService {
|
|
|
* automatically convert ObjectId
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
- * @param {any} _value value from imported document
|
|
|
- * @param {{ _document: object, schema: object, key: string }}
|
|
|
+ * @param {any} value value from imported document
|
|
|
+ * @param {{ document: object, schema: object, key: string }}
|
|
|
* @return {any} new value for the document
|
|
|
*/
|
|
|
- keepOriginal(_value, { _document, schema, key }) {
|
|
|
- let value;
|
|
|
- if (schema[key].instance === 'ObjectID' && ObjectId.isValid(_value)) {
|
|
|
- value = ObjectId(_value);
|
|
|
+ keepOriginal(value, { document, schema, propertyName }) {
|
|
|
+ let _value;
|
|
|
+ if (schema[propertyName].instance === 'ObjectID' && ObjectId.isValid(value)) {
|
|
|
+ _value = ObjectId(value);
|
|
|
}
|
|
|
else {
|
|
|
- value = _value;
|
|
|
+ _value = value;
|
|
|
}
|
|
|
|
|
|
- return value;
|
|
|
+ return _value;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * parse all zip files in downloads dir
|
|
|
+ *
|
|
|
+ * @memberOf ExportService
|
|
|
+ * @return {object} info for zip files and whether currentProgressingStatus exists
|
|
|
+ */
|
|
|
+ async getStatus() {
|
|
|
+ const zipFiles = fs.readdirSync(this.baseDir).filter(file => path.extname(file) === '.zip');
|
|
|
+ const zipFileStats = await Promise.all(zipFiles.map((file) => {
|
|
|
+ const zipFile = this.getFile(file);
|
|
|
+ return this.growiBridgeService.parseZipFile(zipFile);
|
|
|
+ }));
|
|
|
+
|
|
|
+ // filter null object (broken zip)
|
|
|
+ const filtered = zipFileStats
|
|
|
+ .filter(zipFileStat => zipFileStat != null);
|
|
|
+ // sort with ctime("Change Time" - Time when file status was last changed (inode data modification).)
|
|
|
+ filtered.sort((a, b) => { return a.fileStat.ctime - b.fileStat.ctime });
|
|
|
+
|
|
|
+ const isImporting = this.currentProgressingStatus != null;
|
|
|
+
|
|
|
+ return {
|
|
|
+ zipFileStat: filtered.pop(),
|
|
|
+ isImporting,
|
|
|
+ progressList: isImporting ? this.currentProgressingStatus.progressList : null,
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * import collections from json
|
|
|
+ *
|
|
|
+ * @param {string} collections MongoDB collection name
|
|
|
+ * @param {array} importSettingsMap key: collection name, value: ImportSettings instance
|
|
|
+ */
|
|
|
+ async import(collections, importSettingsMap) {
|
|
|
+ // init status object
|
|
|
+ this.currentProgressingStatus = new CollectionProgressingStatus(collections);
|
|
|
+
|
|
|
+ try {
|
|
|
+ const promises = collections.map((collectionName) => {
|
|
|
+ const importSettings = importSettingsMap[collectionName];
|
|
|
+ return this.importCollection(collectionName, importSettings);
|
|
|
+ });
|
|
|
+ await Promise.all(promises);
|
|
|
+ }
|
|
|
+ // catch ImportingCollectionError
|
|
|
+ catch (err) {
|
|
|
+ const { collectionProgress } = err;
|
|
|
+ logger.error(`failed to import to ${collectionProgress.collectionName}`, err);
|
|
|
+ this.emitProgressEvent(collectionProgress, { message: err.message });
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ this.currentProgressingStatus = null;
|
|
|
+ this.emitTerminateEvent();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* import a collection from json
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
- * @param {object} Model instance of mongoose model
|
|
|
- * @param {string} jsonFile absolute path to the jsonFile being imported
|
|
|
- * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
+ * @param {string} collectionName MongoDB collection name
|
|
|
+ * @param {ImportSettings} importSettings
|
|
|
* @return {insertedIds: Array.<string>, failedIds: Array.<string>}
|
|
|
*/
|
|
|
- async import(Model, jsonFile, overwriteParams = {}) {
|
|
|
- // streamToPromise(jsonStream) throws an error, use new Promise instead
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- const collectionName = Model.collection.name;
|
|
|
+ async importCollection(collectionName, importSettings) {
|
|
|
+ // prepare functions invoked from custom streams
|
|
|
+ const convertDocuments = this.convertDocuments.bind(this);
|
|
|
+ const bulkOperate = this.bulkOperate.bind(this);
|
|
|
+ const execUnorderedBulkOpSafely = this.execUnorderedBulkOpSafely.bind(this);
|
|
|
+ const emitProgressEvent = this.emitProgressEvent.bind(this);
|
|
|
+
|
|
|
+ const { mode, jsonFileName, overwriteParams } = importSettings;
|
|
|
+ const Model = this.growiBridgeService.getModelFromCollectionName(collectionName);
|
|
|
+ const jsonFile = this.getFile(jsonFileName);
|
|
|
+ const collectionProgress = this.currentProgressingStatus.progressMap[collectionName];
|
|
|
+
|
|
|
+ try {
|
|
|
+ // validate options
|
|
|
+ this.validateImportSettings(collectionName, importSettings);
|
|
|
|
|
|
- let counter = 0;
|
|
|
- let insertedIds = [];
|
|
|
- let failedIds = [];
|
|
|
- let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
+ // flush
|
|
|
+ if (mode === 'flushAndInsert') {
|
|
|
+ await Model.remove({});
|
|
|
+ }
|
|
|
|
|
|
+ // stream 1
|
|
|
const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
- const jsonStream = readStream.pipe(JSONStream.parse('*'));
|
|
|
|
|
|
- jsonStream.on('data', async(document) => {
|
|
|
- // documents are not persisted until unorderedBulkOp.execute()
|
|
|
- unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
|
|
|
+ // stream 2
|
|
|
+ const jsonStream = JSONStream.parse('*');
|
|
|
+
|
|
|
+ // stream 3
|
|
|
+ const convertStream = new Transform({
|
|
|
+ objectMode: true,
|
|
|
+ transform(doc, encoding, callback) {
|
|
|
+ const converted = convertDocuments(collectionName, doc, overwriteParams);
|
|
|
+ this.push(converted);
|
|
|
+ callback();
|
|
|
+ },
|
|
|
+ });
|
|
|
|
|
|
- counter++;
|
|
|
+ // stream 4
|
|
|
+ const batchStream = createBatchStream(BULK_IMPORT_SIZE);
|
|
|
+
|
|
|
+ // stream 5
|
|
|
+ const writeStream = new Writable({
|
|
|
+ objectMode: true,
|
|
|
+ async write(batch, encoding, callback) {
|
|
|
+ const unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
+
|
|
|
+ // documents are not persisted until unorderedBulkOp.execute()
|
|
|
+ batch.forEach((document) => {
|
|
|
+ bulkOperate(unorderedBulkOp, collectionName, document, importSettings);
|
|
|
+ });
|
|
|
+
|
|
|
+ // exec
|
|
|
+ const { insertedCount, modifiedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errors.length}.`);
|
|
|
+
|
|
|
+ const increment = insertedCount + modifiedCount + errors.length;
|
|
|
+ collectionProgress.currentCount += increment;
|
|
|
+ collectionProgress.totalCount += increment;
|
|
|
+ collectionProgress.insertedCount += insertedCount;
|
|
|
+ collectionProgress.modifiedCount += modifiedCount;
|
|
|
+
|
|
|
+ emitProgressEvent(collectionProgress, errors);
|
|
|
+
|
|
|
+ callback();
|
|
|
+ },
|
|
|
+ final(callback) {
|
|
|
+ logger.info(`Importing ${collectionName} has terminated.`);
|
|
|
+ callback();
|
|
|
+ },
|
|
|
+ });
|
|
|
|
|
|
- if (counter % this.per === 0) {
|
|
|
- // puase jsonStream to prevent more items to be added to unorderedBulkOp
|
|
|
- jsonStream.pause();
|
|
|
+ readStream
|
|
|
+ .pipe(jsonStream)
|
|
|
+ .pipe(convertStream)
|
|
|
+ .pipe(batchStream)
|
|
|
+ .pipe(writeStream);
|
|
|
|
|
|
- const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
- insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
- failedIds = [...failedIds, ..._failedIds];
|
|
|
+ await streamToPromise(writeStream);
|
|
|
|
|
|
- // reset initializeUnorderedBulkOp
|
|
|
- unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
|
|
|
+ // clean up tmp directory
|
|
|
+ fs.unlinkSync(jsonFile);
|
|
|
+ }
|
|
|
+ catch (err) {
|
|
|
+ throw new ImportingCollectionError(collectionProgress, err);
|
|
|
+ }
|
|
|
|
|
|
- // resume jsonStream
|
|
|
- jsonStream.resume();
|
|
|
- }
|
|
|
- });
|
|
|
+ }
|
|
|
|
|
|
- jsonStream.on('end', async(data) => {
|
|
|
- // insert the rest. avoid errors when unorderedBulkOp has no items
|
|
|
- if (unorderedBulkOp.s.currentBatch !== null) {
|
|
|
- const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
- insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
- failedIds = [...failedIds, ..._failedIds];
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @param {string} collectionName
|
|
|
+ * @param {importSettings} importSettings
|
|
|
+ */
|
|
|
+ validateImportSettings(collectionName, importSettings) {
|
|
|
+ const { mode } = importSettings;
|
|
|
+
|
|
|
+ switch (collectionName) {
|
|
|
+ case 'configs':
|
|
|
+ if (mode !== 'flushAndInsert') {
|
|
|
+ throw new Error(`The specified mode '${mode}' is not allowed when importing to 'configs' collection.`);
|
|
|
}
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- logger.info(`Done. Inserted ${insertedIds.length} ${collectionName}.`);
|
|
|
+ /**
|
|
|
+ * process bulk operation
|
|
|
+ * @param {object} bulk MongoDB Bulk instance
|
|
|
+ * @param {string} collectionName collection name
|
|
|
+ * @param {object} document
|
|
|
+ * @param {ImportSettings} importSettings
|
|
|
+ */
|
|
|
+ bulkOperate(bulk, collectionName, document, importSettings) {
|
|
|
+ // insert
|
|
|
+ if (importSettings.mode !== 'upsert') {
|
|
|
+ return bulk.insert(document);
|
|
|
+ }
|
|
|
|
|
|
- if (failedIds.length > 0) {
|
|
|
- logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
|
|
|
- }
|
|
|
+ // upsert
|
|
|
+ switch (collectionName) {
|
|
|
+ default:
|
|
|
+ return bulk.find({ _id: document._id }).upsert().replaceOne(document);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // clean up tmp directory
|
|
|
- fs.unlinkSync(jsonFile);
|
|
|
+ /**
|
|
|
+ * emit progress event
|
|
|
+ * @param {CollectionProgress} collectionProgress
|
|
|
+ * @param {object} appendedErrors key: collection name, value: array of error object
|
|
|
+ */
|
|
|
+ emitProgressEvent(collectionProgress, appendedErrors) {
|
|
|
+ const { collectionName } = collectionProgress;
|
|
|
|
|
|
- return resolve({
|
|
|
- insertedIds,
|
|
|
- failedIds,
|
|
|
- });
|
|
|
- });
|
|
|
- });
|
|
|
+ // send event (in progress in global)
|
|
|
+ this.adminEvent.emit('onProgressForImport', { collectionName, collectionProgress, appendedErrors });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * emit terminate event
|
|
|
+ */
|
|
|
+ emitTerminateEvent() {
|
|
|
+ this.adminEvent.emit('onTerminateForImport');
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -170,38 +351,31 @@ class ImportService {
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
* @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
|
|
|
- * @return {{nInserted: number, failed: Array.<string>}} number of docuemnts inserted and failed
|
|
|
+ * @return {object} e.g. { insertedCount: 10, errors: [...] }
|
|
|
*/
|
|
|
async execUnorderedBulkOpSafely(unorderedBulkOp) {
|
|
|
- // keep the number of documents inserted and failed for logger
|
|
|
- let insertedIds = [];
|
|
|
- let failedIds = [];
|
|
|
+ let errors = [];
|
|
|
+ let result = null;
|
|
|
|
|
|
- // try catch to skip errors
|
|
|
try {
|
|
|
const log = await unorderedBulkOp.execute();
|
|
|
- const _insertedIds = log.result.insertedIds.map(op => op._id);
|
|
|
- insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
+ result = log.result;
|
|
|
}
|
|
|
catch (err) {
|
|
|
- const collectionName = unorderedBulkOp.s.namespace;
|
|
|
-
|
|
|
- for (const error of err.result.result.writeErrors) {
|
|
|
- logger.error(`${collectionName}: ${error.errmsg}`);
|
|
|
- }
|
|
|
-
|
|
|
- const _failedIds = err.result.result.writeErrors.map(err => err.err.op._id);
|
|
|
- const _insertedIds = err.result.result.insertedIds.filter(op => !_failedIds.includes(op._id)).map(op => op._id);
|
|
|
-
|
|
|
- failedIds = [...failedIds, ..._failedIds];
|
|
|
- insertedIds = [...insertedIds, ..._insertedIds];
|
|
|
+ result = err.result;
|
|
|
+ errors = err.writeErrors.map((err) => {
|
|
|
+ const moreDetailErr = err.err;
|
|
|
+ return { _id: moreDetailErr.op._id, message: err.errmsg };
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${insertedIds.length}. Failed: ${failedIds.length}.`);
|
|
|
+ const insertedCount = result.nInserted + result.nUpserted;
|
|
|
+ const modifiedCount = result.nModified;
|
|
|
|
|
|
return {
|
|
|
- insertedIds,
|
|
|
- failedIds,
|
|
|
+ insertedCount,
|
|
|
+ modifiedCount,
|
|
|
+ errors,
|
|
|
};
|
|
|
}
|
|
|
|
|
|
@@ -209,13 +383,13 @@ class ImportService {
|
|
|
* execute unorderedBulkOp and ignore errors
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
- * @param {object} Model instance of mongoose model
|
|
|
- * @param {object} _document document being imported
|
|
|
+ * @param {string} collectionName
|
|
|
+ * @param {object} document document being imported
|
|
|
* @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
* @return {object} document to be persisted
|
|
|
*/
|
|
|
- convertDocuments(Model, _document, overwriteParams) {
|
|
|
- const collectionName = Model.collection.name;
|
|
|
+ convertDocuments(collectionName, document, overwriteParams) {
|
|
|
+ const Model = this.growiBridgeService.getModelFromCollectionName(collectionName);
|
|
|
const schema = Model.schema.paths;
|
|
|
const convertMap = this.convertMap[collectionName];
|
|
|
|
|
|
@@ -223,31 +397,33 @@ class ImportService {
|
|
|
throw new Error(`attribute map is not defined for ${collectionName}`);
|
|
|
}
|
|
|
|
|
|
- const document = {};
|
|
|
+ const _document = {};
|
|
|
|
|
|
// assign value from documents being imported
|
|
|
- for (const entry of Object.entries(convertMap)) {
|
|
|
- const [key, value] = entry;
|
|
|
+ Object.entries(convertMap).forEach(([propertyName, convertedValue]) => {
|
|
|
+ const value = document[propertyName];
|
|
|
|
|
|
// distinguish between null and undefined
|
|
|
- if (_document[key] === undefined) {
|
|
|
- continue; // next entry
|
|
|
+ if (value === undefined) {
|
|
|
+ return; // next entry
|
|
|
}
|
|
|
|
|
|
- document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
|
|
|
- }
|
|
|
+ const convertFunc = (typeof convertedValue === 'function') ? convertedValue : null;
|
|
|
+ _document[propertyName] = (convertFunc != null) ? convertFunc(value, { document, propertyName, schema }) : convertedValue;
|
|
|
+ });
|
|
|
|
|
|
// overwrite documents with custom values
|
|
|
- for (const entry of Object.entries(overwriteParams)) {
|
|
|
- const [key, value] = entry;
|
|
|
+ Object.entries(overwriteParams).forEach(([propertyName, overwriteValue]) => {
|
|
|
+ const value = document[propertyName];
|
|
|
|
|
|
// distinguish between null and undefined
|
|
|
- if (_document[key] !== undefined) {
|
|
|
- document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
|
|
|
+ if (value !== undefined) {
|
|
|
+ const overwriteFunc = (typeof overwriteValue === 'function') ? overwriteValue : null;
|
|
|
+ _document[propertyName] = (overwriteFunc != null) ? overwriteFunc(value, { document: _document, propertyName, schema }) : overwriteValue;
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
|
|
|
- return document;
|
|
|
+ return _document;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -268,6 +444,15 @@ class ImportService {
|
|
|
// - import: throw err if there are pending migrations
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Delete all uploaded files
|
|
|
+ */
|
|
|
+ deleteAllZipFiles() {
|
|
|
+ fs.readdirSync(this.baseDir)
|
|
|
+ .filter(file => path.extname(file) === '.zip')
|
|
|
+ .forEach(file => fs.unlinkSync(path.join(this.baseDir, file)));
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
module.exports = ImportService;
|