Parcourir la source

create Transform to convert batch to partSize

Futa Arai il y a 2 ans
Parent
commit
439dde02ec

+ 54 - 37
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,11 +1,12 @@
 import type { Readable } from 'stream';
-import { Writable, pipeline } from 'stream';
+import { Transform, Writable, pipeline } from 'stream';
 
 import { type IPage, isPopulated } from '@growi/core';
 import { normalizePath } from '@growi/core/dist/utils/path-utils';
 import type { Archiver } from 'archiver';
 import archiver from 'archiver';
 import type { QueueObject } from 'async';
+import gc from 'expose-gc/function';
 import mongoose from 'mongoose';
 
 import type { PageModel, PageDocument } from '~/server/models/page';
@@ -40,8 +41,10 @@ class PageBulkExportService {
     const pagesReadable = this.getPageReadable(basePagePath);
     const zipArchiver = this.setUpZipArchiver();
     const pagesWritable = this.getPageWritable(zipArchiver);
+    const bufferToPartSizeTransform = this.getBufferToPartSizeTransform();
 
     // init multipart upload
+    // TODO: Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
     const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
     try {
       if (multipartUploader == null) {
@@ -58,7 +61,7 @@ class PageBulkExportService {
     // Cannot directly pipe from pagesWritable to zipArchiver due to how the 'append' method works.
     // Hence, execution of two pipelines is required.
     pipeline(pagesReadable, pagesWritable, err => this.handleExportError(err, multipartUploader));
-    pipeline(zipArchiver, multipartUploadWritable, err => this.handleExportError(err, multipartUploader));
+    pipeline(zipArchiver, bufferToPartSizeTransform, multipartUploadWritable, err => this.handleExportError(err, multipartUploader));
   }
 
   async handleExportError(err: Error | null, multipartUploader: IAwsMultipartUploader | undefined): Promise<void> {
@@ -139,55 +142,69 @@ class PageBulkExportService {
     return zipArchiver;
   }
 
-  private getMultipartUploadWritable(multipartUploader: IAwsMultipartUploader): Writable {
-    let partNumber = 1;
-    // Buffer to store stream data before upload. When the buffer is full, it will be uploaded as a part.
+  private getBufferToPartSizeTransform(): Transform {
     let buffer = Buffer.alloc(this.partSize);
     let filledBufferSize = 0;
 
+    const partSize = this.partSize;
+
+    return new Transform({
+      transform(chunk: Buffer, encoding, callback) {
+        let offset = 0;
+        while (offset < chunk.length) {
+          // The data size to add to buffer.
+          // - If the remaining chunk size is smaller than the remaining buffer size:
+          //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
+          // - If the remaining chunk size is larger than the remaining buffer size:
+          //     - Fill the buffer, and upload => dataSize is the remaining buffer size
+          //     - The remaining chunk after upload will be added to buffer in the next iteration
+          const dataSize = Math.min(partSize - filledBufferSize, chunk.length - offset);
+          // Add chunk data to buffer
+          chunk.copy(buffer, filledBufferSize, offset, offset + dataSize);
+          filledBufferSize += dataSize;
+
+          // When buffer reaches partSize, push to next stream
+          if (filledBufferSize === partSize) {
+            this.push(buffer);
+            // Reset buffer after push
+            buffer = Buffer.alloc(partSize);
+            filledBufferSize = 0;
+          }
+
+          offset += dataSize;
+        }
+        callback();
+      },
+      flush(callback) {
+        // push the final buffer
+        if (filledBufferSize > 0) {
+          this.push(buffer.slice(0, filledBufferSize));
+        }
+        callback();
+      },
+    });
+  }
+
+  private getMultipartUploadWritable(multipartUploader: IAwsMultipartUploader): Writable {
+    let partNumber = 1;
+
     return new Writable({
-      write: async(chunk: Buffer, encoding, callback) => {
+      write: async(part: Buffer, encoding, callback) => {
         try {
-          let offset = 0;
-          while (offset < chunk.length) {
-            // The data size to add to buffer.
-            // - If the remaining chunk size is smaller than the remaining buffer size:
-            //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
-            // - If the remaining chunk size is larger than the remaining buffer size:
-            //     - Fill the buffer, and upload => dataSize is the remaining buffer size
-            //     - The remaining chunk after upload will be added to buffer in the next iteration
-            const dataSize = Math.min(this.partSize - filledBufferSize, chunk.length - offset);
-            // Add chunk data to buffer
-            chunk.copy(buffer, filledBufferSize, offset, offset + dataSize);
-            filledBufferSize += dataSize;
-
-            // When buffer reaches partSize, upload
-            if (filledBufferSize === this.partSize) {
-              // eslint-disable-next-line no-await-in-loop
-              await multipartUploader.uploadPart(buffer, partNumber);
-              // Reset buffer after upload
-              buffer = Buffer.alloc(this.partSize);
-              filledBufferSize = 0;
-              partNumber += 1;
-            }
-
-            offset += dataSize;
-          }
+          await multipartUploader.uploadPart(part, partNumber);
+          partNumber += 1;
+          // First aid to prevent unexplained memory leaks
+          logger.info('global.gc() invoked.');
+          gc();
         }
         catch (err) {
           callback(err);
           return;
         }
-
         callback();
       },
       async final(callback) {
         try {
-          if (filledBufferSize > 0) {
-            const finalPart = Buffer.alloc(filledBufferSize);
-            buffer.copy(finalPart, 0, 0, filledBufferSize);
-            await multipartUploader.uploadPart(finalPart, partNumber);
-          }
           await multipartUploader.completeUpload();
         }
         catch (err) {

+ 3 - 0
apps/app/src/server/service/file-uploader/aws/multipart-upload.ts

@@ -13,6 +13,7 @@ enum UploadStatus {
   ABORTED
 }
 
+// Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
 export interface IAwsMultipartUploader {
   initUpload(): Promise<void>;
   uploadPart(body: Buffer, partNumber: number): Promise<void>;
@@ -56,6 +57,7 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
     this.uploadId = response.UploadId;
     this.currentStatus = UploadStatus.IN_PROGRESS;
     logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
+    console.time('Multipart upload');
   }
 
   async uploadPart(body: Buffer, partNumber: number): Promise<void> {
@@ -88,6 +90,7 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
     }));
     this.currentStatus = UploadStatus.COMPLETED;
     logger.info(`Multipart upload completed. Upload key: ${this.uploadKey}`);
+    console.timeEnd('Multipart upload');
   }
 
   async abortUpload(): Promise<void> {