Просмотр исходного кода

define getBufferToFixedSizeTransform as util

Futa Arai 2 лет назад
Родитель
Сommit
0716823df3

+ 3 - 49
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,16 +1,16 @@
 import type { Readable } from 'stream';
-import { Transform, Writable, pipeline } from 'stream';
+import { Writable, pipeline } from 'stream';
 
 import { type IPage, isPopulated } from '@growi/core';
 import { normalizePath } from '@growi/core/dist/utils/path-utils';
 import type { Archiver } from 'archiver';
 import archiver from 'archiver';
 import type { QueueObject } from 'async';
-import gc from 'expose-gc/function';
 import mongoose from 'mongoose';
 
 import type { PageModel, PageDocument } from '~/server/models/page';
 import type { IAwsMultipartUploader } from '~/server/service/file-uploader/aws/multipart-upload';
+import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import loggerFactory from '~/utils/logger';
 
 
@@ -41,7 +41,7 @@ class PageBulkExportService {
     const pagesReadable = this.getPageReadable(basePagePath);
     const zipArchiver = this.setUpZipArchiver();
     const pagesWritable = this.getPageWritable(zipArchiver);
-    const bufferToPartSizeTransform = this.getBufferToPartSizeTransform();
+    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.partSize);
 
     // init multipart upload
     // TODO: Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
@@ -142,49 +142,6 @@ class PageBulkExportService {
     return zipArchiver;
   }
 
-  private getBufferToPartSizeTransform(): Transform {
-    let buffer = Buffer.alloc(this.partSize);
-    let filledBufferSize = 0;
-
-    const partSize = this.partSize;
-
-    return new Transform({
-      transform(chunk: Buffer, encoding, callback) {
-        let offset = 0;
-        while (offset < chunk.length) {
-          // The data size to add to buffer.
-          // - If the remaining chunk size is smaller than the remaining buffer size:
-          //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
-          // - If the remaining chunk size is larger than the remaining buffer size:
-          //     - Fill the buffer, and upload => dataSize is the remaining buffer size
-          //     - The remaining chunk after upload will be added to buffer in the next iteration
-          const dataSize = Math.min(partSize - filledBufferSize, chunk.length - offset);
-          // Add chunk data to buffer
-          chunk.copy(buffer, filledBufferSize, offset, offset + dataSize);
-          filledBufferSize += dataSize;
-
-          // When buffer reaches partSize, push to next stream
-          if (filledBufferSize === partSize) {
-            this.push(buffer);
-            // Reset buffer after push
-            buffer = Buffer.alloc(partSize);
-            filledBufferSize = 0;
-          }
-
-          offset += dataSize;
-        }
-        callback();
-      },
-      flush(callback) {
-        // push the final buffer
-        if (filledBufferSize > 0) {
-          this.push(buffer.slice(0, filledBufferSize));
-        }
-        callback();
-      },
-    });
-  }
-
   private getMultipartUploadWritable(multipartUploader: IAwsMultipartUploader): Writable {
     let partNumber = 1;
 
@@ -193,9 +150,6 @@ class PageBulkExportService {
         try {
           await multipartUploader.uploadPart(part, partNumber);
           partNumber += 1;
-          // First aid to prevent unexplained memory leaks
-          logger.info('global.gc() invoked.');
-          gc();
         }
         catch (err) {
           callback(err);

+ 0 - 2
apps/app/src/server/service/file-uploader/aws/multipart-upload.ts

@@ -57,7 +57,6 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
     this.uploadId = response.UploadId;
     this.currentStatus = UploadStatus.IN_PROGRESS;
     logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
-    console.time('Multipart upload');
   }
 
   async uploadPart(body: Buffer, partNumber: number): Promise<void> {
@@ -90,7 +89,6 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
     }));
     this.currentStatus = UploadStatus.COMPLETED;
     logger.info(`Multipart upload completed. Upload key: ${this.uploadKey}`);
-    console.timeEnd('Multipart upload');
   }
 
   async abortUpload(): Promise<void> {

+ 43 - 0
apps/app/src/server/util/stream.ts

@@ -1,3 +1,5 @@
+import { Transform } from 'stream';
+
 export const convertStreamToBuffer = (stream: any): Promise<Buffer> => {
 
   return new Promise((resolve, reject) => {
@@ -12,3 +14,44 @@ export const convertStreamToBuffer = (stream: any): Promise<Buffer> => {
 
   });
 };
+
+export const getBufferToFixedSizeTransform = (size: number): Transform => {
+  let buffer = Buffer.alloc(size);
+  let filledBufferSize = 0;
+
+  return new Transform({
+    transform(chunk: Buffer, encoding, callback) {
+      let offset = 0;
+      while (offset < chunk.length) {
+        // The data size to add to buffer.
+        // - If the remaining chunk size is smaller than the remaining buffer size:
+        //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
+        // - If the remaining chunk size is larger than the remaining buffer size:
+        //     - Fill the buffer, and upload => dataSize is the remaining buffer size
+        //     - The remaining chunk after upload will be added to buffer in the next iteration
+        const dataSize = Math.min(size - filledBufferSize, chunk.length - offset);
+        // Add chunk data to buffer
+        chunk.copy(buffer, filledBufferSize, offset, offset + dataSize);
+        filledBufferSize += dataSize;
+
+        // When buffer reaches size, push to next stream
+        if (filledBufferSize === size) {
+          this.push(buffer);
+          // Reset buffer after push
+          buffer = Buffer.alloc(size);
+          filledBufferSize = 0;
+        }
+
+        offset += dataSize;
+      }
+      callback();
+    },
+    flush(callback) {
+      // push the final buffer
+      if (filledBufferSize > 0) {
+        this.push(buffer.slice(0, filledBufferSize));
+      }
+      callback();
+    },
+  });
+};