Futa Arai 2 лет назад
Родитель
Сommit
5fae4ac823

+ 4 - 10
apps/app/src/features/page-bulk-export/server/routes/apiv3/page-bulk-export.ts

@@ -34,16 +34,10 @@ module.exports = (crowi: Crowi): Router => {
 
     const { path, format } = req.body;
 
-    try {
-      // temporal await, remove it after multi-part upload is implemented in https://redmine.weseek.co.jp/issues/78038
-      await pageBulkExportService?.bulkExportWithBasePagePath(path);
-
-      return res.apiv3({}, 204);
-    }
-    catch (err) {
-      logger.error(err);
-      return res.apiv3Err(new ErrorV3('Error occurred in exporting page tree'));
-    }
+    console.time('pageBulkExport');
+    pageBulkExportService?.bulkExportWithBasePagePath(path);
+    console.timeEnd('pageBulkExport');
+    return res.apiv3({}, 204);
   });
 
   return router;

+ 50 - 46
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,5 +1,3 @@
-import fs from 'fs';
-import path from 'path';
 import { Writable } from 'stream';
 
 import { type IPage, isPopulated } from '@growi/core';
@@ -21,6 +19,8 @@ class PageBulkExportService {
 
   crowi: any;
 
+  zipArchiver: Archiver;
+
   // multipart upload part size
   partSize = 5 * 1024 * 1024; // 5MB
 
@@ -28,10 +28,31 @@ class PageBulkExportService {
     this.crowi = crowi;
   }
 
+  async bulkExportWithBasePagePath(basePagePath: string): Promise<void> {
+    const timeStamp = (new Date()).getTime();
+    const uploadKey = `page-bulk-export-${timeStamp}.zip`;
+
+    const pageReadableStream = this.getPageReadableStream(basePagePath);
+    const zipArchiver = this.setUpZipArchiver();
+    const pagesWritable = this.getPageWritable(zipArchiver);
+
+    try {
+      const multipartUploadWritable = await this.getMultipartUploadWritable(uploadKey);
+
+      zipArchiver.pipe(multipartUploadWritable);
+      pageReadableStream.pipe(pagesWritable);
+
+      await streamToPromise(multipartUploadWritable);
+    }
+    catch (err) {
+      logger.error(err);
+    }
+  }
+
   /**
    * Get a ReadableStream of all the pages under the specified path, including the root page.
    */
-  getPageReadableStream(basePagePath: string) {
+  private getPageReadableStream(basePagePath: string) {
     const Page = mongoose.model<IPage, PageModel>('Page');
     const { PageQueryBuilder } = Page;
 
@@ -48,10 +69,10 @@ class PageBulkExportService {
   /**
    * Get a Writable that writes the page body to a zip file
    */
-  getPageWritable(archive: Archiver) {
+  private getPageWritable(zipArchiver: Archiver) {
     return new Writable({
       objectMode: true,
-      async write(page: PageDocument, encoding, callback) {
+      write: async(page: PageDocument, encoding, callback) => {
         try {
           const revision = page.revision;
 
@@ -59,7 +80,7 @@ class PageBulkExportService {
             const markdownBody = revision.body;
             // write to zip
             const pathNormalized = normalizePath(page.path);
-            archive.append(markdownBody, { name: `${pathNormalized}.md` });
+            zipArchiver.append(markdownBody, { name: `${pathNormalized}.md` });
           }
         }
         catch (err) {
@@ -69,16 +90,14 @@ class PageBulkExportService {
 
         callback();
       },
-      final(callback) {
-        archive.finalize();
+      final: (callback) => {
+        zipArchiver.finalize();
         callback();
       },
     });
   }
 
-  setUpZipArchiver(timeStamp: number): Archiver {
-    const zipFilePath = path.join(__dirname, `${timeStamp}.md.zip`);
-
+  private setUpZipArchiver(): Archiver {
     const archive = archiver('zip', {
       zlib: { level: 9 }, // maximum compression
     });
@@ -91,61 +110,46 @@ class PageBulkExportService {
     // good practice to catch this error explicitly
     archive.on('error', (err) => { throw err });
 
-    // pipe archive data to the file
-    const output = fs.createWriteStream(zipFilePath);
-    archive.pipe(output);
-
     return archive;
   }
 
-  async bulkExportWithBasePagePath(basePagePath: string): Promise<void> {
-    if (this.crowi?.fileUploadService?.createMultipartUploader == null) {
+  private async getMultipartUploadWritable(uploadKey: string): Promise<Writable> {
+    const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
+
+    if (multipartUploader == null) {
       throw Error('Multipart upload not available for configured file upload type');
     }
-    const timeStamp = (new Date()).getTime();
-
-    const uploadKey = `page-bulk-export-${timeStamp}`;
-
-    // get pages with descendants as stream
-    const pageReadableStream = this.getPageReadableStream(basePagePath);
-
-    const archive = this.setUpZipArchiver(timeStamp);
-
-    const pagesWritable = this.getPageWritable(archive);
-
-    const multipartUploadWritable = await this.getMultipartUploadWritable(uploadKey, this.partSize);
-
-    archive.pipe(multipartUploadWritable);
-    pageReadableStream.pipe(pagesWritable);
-
-    await streamToPromise(archive);
-  }
-
-
-  async getMultipartUploadWritable(uploadKey: string, partSize: number) {
-    const multipartUploader: IAwsMultipartUploader = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
 
     let partNumber = 1;
+    // Buffer to store stream data before upload. When the buffer is full, it will be uploaded as a part.
     let buffer = Buffer.alloc(0);
 
     await multipartUploader.initUpload();
 
     return new Writable({
-      objectMode: true,
-      async write(chunk, encoding, callback) {
+      write: async(chunk: Buffer, encoding, callback) => {
         let offset = 0;
         while (offset < chunk.length) {
-          const chunkSize = Math.min(partSize - buffer.length, chunk.length - offset);
-          buffer = Buffer.concat([buffer, chunk.slice(offset, offset + chunkSize)]);
-          if (buffer.length === partSize) {
+          // The data size to add to buffer.
+          // - If the remaining chunk size is smaller than the remaining buffer size:
+          //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
+          // - If the remaining chunk size is larger than the remaining buffer size:
+          //     - Fill the buffer, and upload => dataSize is the remaining buffer size
+          //     - The remaining chunk after upload will be added to buffer in the next iteration
+          const dataSize = Math.min(this.partSize - buffer.length, chunk.length - offset);
+          // Add chunk data to buffer
+          buffer = Buffer.concat([buffer, chunk.slice(offset, offset + dataSize)]);
+
+          // When buffer reaches partSize, upload
+          if (buffer.length === this.partSize) {
             // eslint-disable-next-line no-await-in-loop
             await multipartUploader.uploadPart(buffer, partNumber);
-
+            // Reset buffer after upload
             buffer = Buffer.alloc(0);
             partNumber += 1;
           }
 
-          offset += chunkSize;
+          offset += dataSize;
         }
 
         callback();

+ 2 - 2
apps/app/src/server/service/file-uploader/aws/index.ts

@@ -98,7 +98,7 @@ const getFilePathOnStorage = (attachment) => {
 };
 
 export interface IAwsFileUploader {
-  createMultipartUploader: (uploadKey: string) => Promise<AwsMultipartUploader>
+  createMultipartUploader: (uploadKey: string) => AwsMultipartUploader
 }
 
 // TODO: rewrite this module to be a type-safe implementation
@@ -225,7 +225,7 @@ class AwsFileUploader extends AbstractFileUploader implements IAwsFileUploader {
 
   }
 
-  async createMultipartUploader(uploadKey: string) {
+  createMultipartUploader(uploadKey: string) {
     const s3 = S3Factory();
     const awsConfig = getAwsConfig();
     return new AwsMultipartUploader(s3, awsConfig.bucket, uploadKey);