Futa Arai 2 лет назад
Родитель
Сommit
627b526843

+ 55 - 36
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,5 +1,5 @@
 import type { Readable } from 'stream';
-import { Writable } from 'stream';
+import { Writable, pipeline } from 'stream';
 
 import { type IPage, isPopulated } from '@growi/core';
 import { normalizePath } from '@growi/core/dist/utils/path-utils';
@@ -15,8 +15,6 @@ import loggerFactory from '~/utils/logger';
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
 
-const streamToPromise = require('stream-to-promise');
-
 // Custom type for back pressure workaround
 interface ArchiverWithQueue extends Archiver {
   _queue?: QueueObject<any>;
@@ -31,6 +29,8 @@ class PageBulkExportService {
   // multipart upload part size
   partSize = 5 * 1024 * 1024; // 5MB
 
+  pageBatchSize = 100;
+
   constructor(crowi) {
     this.crowi = crowi;
   }
@@ -46,13 +46,21 @@ class PageBulkExportService {
     try {
       const multipartUploadWritable = await this.getMultipartUploadWritable(uploadKey);
 
-      zipArchiver.pipe(multipartUploadWritable);
-      pageReadableStream.pipe(pagesWritable);
-
-      await streamToPromise(multipartUploadWritable);
+      // Cannot directly pipe from pagesWritable to zipArchiver due to how the 'append' method works.
+      // Hence, execution of two pipelines is required.
+      pipeline(pageReadableStream, pagesWritable, this.handleStreamError);
+      pipeline(zipArchiver, multipartUploadWritable, this.handleStreamError);
     }
     catch (err) {
       logger.error(err);
+      // TODO: notify failure to client: https://redmine.weseek.co.jp/issues/78037
+    }
+  }
+
+  handleStreamError(err: Error | null): void {
+    if (err != null) {
+      logger.error(err);
+      // TODO: notify failure to client: https://redmine.weseek.co.jp/issues/78037
     }
   }
 
@@ -70,7 +78,7 @@ class PageBulkExportService {
       .query
       .populate('revision')
       .lean()
-      .cursor({ batchSize: 100 });
+      .cursor({ batchSize: this.pageBatchSize });
   }
 
   /**
@@ -92,16 +100,15 @@ class PageBulkExportService {
             if (_queue == null) {
               throw Error('Cannot back pressure the export pipeline. Aborting the export.');
             }
-            if (_queue.length() > 100) {
+            if (_queue.length() > this.pageBatchSize) {
               await _queue.drain();
             }
           }
         }
         catch (err) {
-          logger.error(err);
-          throw Error('Failed to export page tree');
+          callback(err);
+          return;
         }
-
         callback();
       },
       final: (callback) => {
@@ -144,43 +151,55 @@ class PageBulkExportService {
 
     return new Writable({
       write: async(chunk: Buffer, encoding, callback) => {
-        let offset = 0;
-        while (offset < chunk.length) {
+        try {
+          let offset = 0;
+          while (offset < chunk.length) {
           // The data size to add to buffer.
           // - If the remaining chunk size is smaller than the remaining buffer size:
           //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
           // - If the remaining chunk size is larger than the remaining buffer size:
           //     - Fill the buffer, and upload => dataSize is the remaining buffer size
           //     - The remaining chunk after upload will be added to buffer in the next iteration
-          const dataSize = Math.min(this.partSize - filledPartSize, chunk.length - offset);
-          // Add chunk data to buffer
-          // buffer = Buffer.concat([buffer, chunk.slice(offset, offset + dataSize)]);
-          chunk.copy(part, filledPartSize, offset, offset + dataSize);
-          filledPartSize += dataSize;
-
-          // When buffer reaches partSize, upload
-          if (filledPartSize === this.partSize) {
-            // eslint-disable-next-line no-await-in-loop
-            await multipartUploader.uploadPart(part, partNumber);
-            // Reset buffer after upload
-            part = Buffer.alloc(this.partSize);
-            filledPartSize = 0;
-            partNumber += 1;
-          }
+            const dataSize = Math.min(this.partSize - filledPartSize, chunk.length - offset);
+            // Add chunk data to buffer
+            // buffer = Buffer.concat([buffer, chunk.slice(offset, offset + dataSize)]);
+            chunk.copy(part, filledPartSize, offset, offset + dataSize);
+            filledPartSize += dataSize;
+
+            // When buffer reaches partSize, upload
+            if (filledPartSize === this.partSize) {
+              // eslint-disable-next-line no-await-in-loop
+              await multipartUploader.uploadPart(part, partNumber);
+              // Reset buffer after upload
+              part = Buffer.alloc(this.partSize);
+              filledPartSize = 0;
+              partNumber += 1;
+            }
 
-          offset += dataSize;
+            offset += dataSize;
+          }
+        }
+        catch (err) {
+          callback(err);
+          return;
         }
 
         callback();
       },
       async final(callback) {
-        if (filledPartSize > 0) {
-          const finalPart = Buffer.alloc(filledPartSize);
-          part.copy(finalPart, 0, 0, filledPartSize);
-          await multipartUploader.uploadPart(finalPart, partNumber);
+        try {
+          if (filledPartSize > 0) {
+            const finalPart = Buffer.alloc(filledPartSize);
+            part.copy(finalPart, 0, 0, filledPartSize);
+            await multipartUploader.uploadPart(finalPart, partNumber);
+          }
+          await multipartUploader.completeUpload();
+          logger.info(`Multipart upload completed. Upload key: ${uploadKey}`);
+        }
+        catch (err) {
+          callback(err);
+          return;
         }
-        await multipartUploader.completeUpload();
-        logger.info(`Multipart upload completed. Upload key: ${uploadKey}`);
         callback();
       },
     });