فهرست منبع

fix promise and keep insertedIds and failedIds

mizozobu 6 سال پیش
والد
کامیت
704ef42e12
2فایلهای تغییر یافته به همراه82 افزوده شده و 51 حذف شده
  1. 25 3
      src/server/routes/apiv3/import.js
  2. 57 48
      src/server/service/import.js

+ 25 - 3
src/server/routes/apiv3/import.js

@@ -115,7 +115,7 @@ module.exports = (crowi) => {
       // validate with meta.json
       importService.validate(meta);
 
-      await Promise.all(filteredFileStats.map(async({ fileName, collectionName, size }) => {
+      const results = await Promise.all(filteredFileStats.map(async({ fileName, collectionName, size }) => {
         const Model = growiBridgeService.getModelFromCollectionName(collectionName);
         const jsonFile = importService.getFile(fileName);
 
@@ -125,11 +125,33 @@ module.exports = (crowi) => {
           overwriteParams = await overwriteParamsFn(Model, schema[collectionName], req);
         }
 
-        await importService.import(Model, jsonFile, overwriteParams);
+        const { insertedIds, failedIds } = await importService.import(Model, jsonFile, overwriteParams);
+
+        return {
+          collectionName,
+          insertedIds,
+          failedIds,
+        };
       }));
 
+      // convert to
+      // {
+      //   [collectionName1]: {
+      //     insertedIds: [...],
+      //     failedIds: [...],
+      //   },
+      //   [collectionName2]: {
+      //     insertedIds: [...],
+      //     failedIds: [...],
+      //   },
+      // }
+      const result = {};
+      for (const { collectionName, insertedIds, failedIds } of results) {
+        result[collectionName] = { insertedIds, failedIds };
+      }
+
       // TODO: use res.apiv3
-      return res.send({ ok: true });
+      return res.send({ ok: true, result });
     }
     catch (err) {
       // TODO: use ApiV3Error

+ 57 - 48
src/server/service/import.js

@@ -67,61 +67,65 @@ class ImportService {
    * @param {object} Model instance of mongoose model
    * @param {string} jsonFile absolute path to the jsonFile being imported
    * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
+   * @return {insertedIds: Array.<string>, failedIds: Array.<string>}
    */
   async import(Model, jsonFile, overwriteParams = {}) {
-    const { collectionName } = Model.collection;
+    return new Promise((resolve, reject) => {
+      const { collectionName } = Model.collection;
 
-    let counter = 0;
-    let nInsertedTotal = 0;
+      let counter = 0;
+      let insertedIds = [];
+      let failedIds = [];
+      let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
 
-    let failedIds = [];
-    let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
+      const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
+      const jsonStream = readStream.pipe(JSONStream.parse('*'));
 
-    const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
-    const jsonStream = readStream.pipe(JSONStream.parse('*'));
+      jsonStream.on('data', async(document) => {
+        // documents are not persisted until unorderedBulkOp.execute()
+        unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
 
-    jsonStream.on('data', async(document) => {
-      // documents are not persisted until unorderedBulkOp.execute()
-      unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
+        counter++;
 
-      counter++;
+        if (counter % this.per === 0) {
+          // puase jsonStream to prevent more items to be added to unorderedBulkOp
+          jsonStream.pause();
 
-      if (counter % this.per === 0) {
-        // puase jsonStream to prevent more items to be added to unorderedBulkOp
-        jsonStream.pause();
+          const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
+          insertedIds = [...insertedIds, ..._insertedIds];
+          failedIds = [...failedIds, ..._failedIds];
 
-        const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
-        nInsertedTotal += nInserted;
-        failedIds = [...failedIds, ...failed];
+          // reset initializeUnorderedBulkOp
+          unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
 
-        // reset initializeUnorderedBulkOp
-        unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
+          // resume jsonStream
+          jsonStream.resume();
+        }
+      });
 
-        // resume jsonStream
-        jsonStream.resume();
-      }
-    });
+      jsonStream.on('end', async(data) => {
+        // insert the rest. avoid errors when unorderedBulkOp has no items
+        if (unorderedBulkOp.s.currentBatch !== null) {
+          const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
+          insertedIds = [...insertedIds, ..._insertedIds];
+          failedIds = [...failedIds, ..._failedIds];
+        }
 
-    jsonStream.on('end', async(data) => {
-      // insert the rest. avoid errors when unorderedBulkOp has no items
-      if (unorderedBulkOp.s.currentBatch !== null) {
-        const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
-        nInsertedTotal += nInserted;
-        failedIds = [...failedIds, ...failed];
-      }
+        logger.info(`Done. Inserted ${insertedIds.length} ${collectionName}.`);
 
-      logger.info(`Done. Inserted ${nInsertedTotal} ${collectionName}.`);
+        if (failedIds.length > 0) {
+          logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
+        }
 
-      if (failedIds.length > 0) {
-        logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
-      }
-    });
+        // clean up tmp directory
+        fs.unlinkSync(jsonFile);
 
-    // streamToPromise(jsonStream) throws error, so await readStream instead
-    await streamToPromise(readStream);
-
-    // clean up tmp directory
-    fs.unlinkSync(jsonFile);
+        return resolve({
+          insertedIds,
+          failedIds,
+        });
+      });
+    });
   }
 
   /**
@@ -165,29 +169,34 @@ class ImportService {
    */
   async execUnorderedBulkOpSafely(unorderedBulkOp) {
     // keep the number of documents inserted and failed for logger
-    let nInserted = 0;
-    const failed = [];
+    let insertedIds = [];
+    let failedIds = [];
 
     // try catch to skip errors
     try {
       const log = await unorderedBulkOp.execute();
-      nInserted = log.result.nInserted;
+      const _insertedIds = log.result.insertedIds.map(op => op._id);
+      insertedIds = [...insertedIds, ..._insertedIds];
     }
     catch (err) {
       const collectionName = unorderedBulkOp.s.namespace;
+
       for (const error of err.result.result.writeErrors) {
         logger.error(`${collectionName}: ${error.errmsg}`);
-        failed.push(error.err.op._id);
       }
 
-      nInserted = err.result.result.nInserted;
+      const _failedIds = err.result.result.writeErrors.map(err => err.err.op._id);
+      const _insertedIds = err.result.result.insertedIds.filter(op => !_failedIds.includes(op._id)).map(op => op._id);
+
+      failedIds = [...failedIds, ..._failedIds];
+      insertedIds = [...insertedIds, ..._insertedIds];
     }
 
-    logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${nInserted}. Failed: ${failed.length}.`);
+    logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${insertedIds.length}. Failed: ${failedIds.length}.`);
 
     return {
-      nInserted,
-      failed,
+      insertedIds,
+      failedIds,
     };
   }