Просмотр исходного кода

GW-439: refactor stream and emit event

Yuki Takei 6 лет назад
Родитель
Сommit
824960e7ce
2 измененных файлов с 83 добавлено и 45 удалено
  1. 48 45
      src/server/service/import.js
  2. 35 0
      src/server/util/batch-stream.js

+ 48 - 45
src/server/service/import.js

@@ -1,11 +1,18 @@
 const logger = require('@alias/logger')('growi:services:ImportService'); // eslint-disable-line no-unused-vars
 const fs = require('fs');
 const path = require('path');
+
+const { Writable, Transform } = require('stream');
 const JSONStream = require('JSONStream');
 const streamToPromise = require('stream-to-promise');
 const unzipper = require('unzipper');
+
 const { ObjectId } = require('mongoose').Types;
 
+const { createBatchStream } = require('../util/batch-stream');
+
+const BULK_IMPORT_SIZE = 100;
+
 class ImportService {
 
   constructor(crowi) {
@@ -13,7 +20,6 @@ class ImportService {
     this.growiBridgeService = crowi.growiBridgeService;
     this.getFile = this.growiBridgeService.getFile.bind(this);
     this.baseDir = path.join(crowi.tmpDir, 'imports');
-    this.per = 100;
     this.keepOriginal = this.keepOriginal.bind(this);
 
     // { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
@@ -104,63 +110,60 @@ class ImportService {
    * @return {insertedIds: Array.<string>, failedIds: Array.<string>}
    */
   async import(Model, jsonFile, overwriteParams = {}) {
-    // streamToPromise(jsonStream) throws an error, use new Promise instead
-    return new Promise((resolve, reject) => {
-      const collectionName = Model.collection.name;
+    // prepare functions invoked from custom streams
+    const convertDocuments = this.convertDocuments.bind(this);
+    const execUnorderedBulkOpSafely = this.execUnorderedBulkOpSafely.bind(this);
 
-      let counter = 0;
-      let insertedIds = [];
-      let failedIds = [];
-      let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
+    const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
 
-      const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
-      const jsonStream = readStream.pipe(JSONStream.parse('*'));
+    const jsonStream = JSONStream.parse('*');
 
-      jsonStream.on('data', async(document) => {
-        // documents are not persisted until unorderedBulkOp.execute()
-        unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
+    const convertStream = new Transform({
+      objectMode: true,
+      transform(doc, encoding, callback) {
+        const converted = convertDocuments(Model, doc, overwriteParams);
+        this.push(converted);
+        callback();
+      },
+    });
 
-        counter++;
+    const batchStream = createBatchStream(BULK_IMPORT_SIZE);
 
-        if (counter % this.per === 0) {
-          // puase jsonStream to prevent more items to be added to unorderedBulkOp
-          jsonStream.pause();
+    const writeStream = new Writable({
+      objectMode: true,
+      async write(batch, encoding, callback) {
+        const unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
 
-          const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
-          insertedIds = [...insertedIds, ..._insertedIds];
-          failedIds = [...failedIds, ..._failedIds];
+        // documents are not persisted until unorderedBulkOp.execute()
+        batch.forEach((document) => {
+          unorderedBulkOp.insert(document);
+        });
 
-          // reset initializeUnorderedBulkOp
-          unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
+        // exec
+        const { insertedIds, failedIds } = await execUnorderedBulkOpSafely(unorderedBulkOp);
 
-          // resume jsonStream
-          jsonStream.resume();
-        }
-      });
+        // TODO: emit event
 
-      jsonStream.on('end', async(data) => {
-        // insert the rest. avoid errors when unorderedBulkOp has no items
-        if (unorderedBulkOp.s.currentBatch !== null) {
-          const { insertedIds: _insertedIds, failedIds: _failedIds } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
-          insertedIds = [...insertedIds, ..._insertedIds];
-          failedIds = [...failedIds, ..._failedIds];
-        }
+        callback();
+      },
+      final(callback) {
+        // TODO: logger.info
+        // TODO: emit event
 
-        logger.info(`Done. Inserted ${insertedIds.length} ${collectionName}.`);
+        callback();
+      },
+    });
 
-        if (failedIds.length > 0) {
-          logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
-        }
+    readStream
+      .pipe(jsonStream)
+      .pipe(convertStream)
+      .pipe(batchStream)
+      .pipe(writeStream);
 
-        // clean up tmp directory
-        fs.unlinkSync(jsonFile);
+    await streamToPromise(writeStream);
 
-        return resolve({
-          insertedIds,
-          failedIds,
-        });
-      });
-    });
+    // clean up tmp directory
+    fs.unlinkSync(jsonFile);
   }
 
   /**

+ 35 - 0
src/server/util/batch-stream.js

@@ -0,0 +1,35 @@
+const { Transform } = require('stream');
+
+function createBatchStream(batchSize) {
+  let batchBuffer = [];
+
+  return new Transform({
+    // object mode
+    objectMode: true,
+
+    transform(doc, encoding, callback) {
+      batchBuffer.push(doc);
+
+      if (batchBuffer.length >= batchSize) {
+        this.push(batchBuffer);
+
+        // reset buffer
+        batchBuffer = [];
+      }
+
+      callback();
+    },
+
+    final(callback) {
+      if (batchBuffer.length > 0) {
+        this.push(batchBuffer);
+      }
+      callback();
+    },
+
+  });
+}
+
+module.exports = {
+  createBatchStream,
+};