Просмотр исходного кода

Merge pull request #8520 from weseek/feat/78038-135783-multipart-upload-page-export

feat: Multipart upload page export
Yuki Takei 2 лет назад
Родитель
Сommit
d782727688

+ 2 - 1
apps/app/package.json

@@ -83,6 +83,7 @@
     "@keycloak/keycloak-admin-client": "^18.0.0",
     "@slack/web-api": "^6.2.4",
     "@slack/webhook": "^6.0.0",
+    "@types/async": "^3.2.24",
     "@types/jest": "^29.5.2",
     "@types/ldapjs": "^2.2.5",
     "JSONStream": "^1.3.5",
@@ -229,9 +230,9 @@
     "@next/bundle-analyzer": "^14.1.3",
     "@swc-node/jest": "^1.6.2",
     "@swc/jest": "^0.2.24",
-    "@types/archiver": "^6.0.2",
     "@testing-library/react": "^14.1.2",
     "@testing-library/user-event": "^14.5.2",
+    "@types/archiver": "^6.0.2",
     "@types/express": "^4.17.11",
     "@types/jest": "^29.5.2",
     "@types/react-scroll": "^1.8.4",

+ 2 - 10
apps/app/src/features/page-bulk-export/server/routes/apiv3/page-bulk-export.ts

@@ -34,16 +34,8 @@ module.exports = (crowi: Crowi): Router => {
 
     const { path, format } = req.body;
 
-    try {
-      // temporal await, remove it after multi-part upload is implemented in https://redmine.weseek.co.jp/issues/78038
-      await pageBulkExportService?.bulkExportWithBasePagePath(path);
-
-      return res.apiv3({}, 204);
-    }
-    catch (err) {
-      logger.error(err);
-      return res.apiv3Err(new ErrorV3('Error occurred in exporting page tree'));
-    }
+    pageBulkExportService?.bulkExportWithBasePagePath(path);
+    return res.apiv3({}, 204);
   });
 
   return router;

+ 123 - 46
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,100 +1,177 @@
-import fs from 'fs';
-import path from 'path';
-import { Writable } from 'stream';
+import type { Readable } from 'stream';
+import { Writable, pipeline } from 'stream';
 
 import { type IPage, isPopulated } from '@growi/core';
 import { normalizePath } from '@growi/core/dist/utils/path-utils';
-import archiver, { Archiver } from 'archiver';
+import type { Archiver } from 'archiver';
+import archiver from 'archiver';
+import type { QueueObject } from 'async';
+import gc from 'expose-gc/function';
 import mongoose from 'mongoose';
 
-import { PageModel, PageDocument } from '~/server/models/page';
+import type { PageModel, PageDocument } from '~/server/models/page';
+import type { IAwsMultipartUploader } from '~/server/service/file-uploader/aws/multipart-upload';
+import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import loggerFactory from '~/utils/logger';
 
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
 
-const streamToPromise = require('stream-to-promise');
+// Custom type for back pressure workaround
+interface ArchiverWithQueue extends Archiver {
+  _queue?: QueueObject<any>;
+}
 
 class PageBulkExportService {
 
   crowi: any;
 
+  // multipart upload part size
+  partSize = 5 * 1024 * 1024; // 5MB
+
+  pageBatchSize = 100;
+
   constructor(crowi) {
     this.crowi = crowi;
   }
 
-  getPageReadableStream(basePagePath: string) {
+  async bulkExportWithBasePagePath(basePagePath: string): Promise<void> {
+    const timeStamp = (new Date()).getTime();
+    const uploadKey = `page-bulk-export-${timeStamp}.zip`;
+
+    const pagesReadable = this.getPageReadable(basePagePath);
+    const zipArchiver = this.setUpZipArchiver();
+    const pagesWritable = this.getPageWritable(zipArchiver);
+    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.partSize);
+
+    // init multipart upload
+    // TODO: Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
+    const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
+    try {
+      if (multipartUploader == null) {
+        throw Error('Multipart upload not available for configured file upload type');
+      }
+      await multipartUploader.initUpload();
+    }
+    catch (err) {
+      await this.handleExportError(err, multipartUploader);
+      return;
+    }
+    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader);
+
+    // Cannot directly pipe from pagesWritable to zipArchiver due to how the 'append' method works.
+    // Hence, execution of two pipelines is required.
+    pipeline(pagesReadable, pagesWritable, err => this.handleExportError(err, multipartUploader));
+    pipeline(zipArchiver, bufferToPartSizeTransform, multipartUploadWritable, err => this.handleExportError(err, multipartUploader));
+  }
+
+  async handleExportError(err: Error | null, multipartUploader: IAwsMultipartUploader | undefined): Promise<void> {
+    if (err != null) {
+      logger.error(err);
+      if (multipartUploader != null) {
+        await multipartUploader.abortUpload();
+      }
+      // TODO: notify failure to client: https://redmine.weseek.co.jp/issues/78037
+    }
+  }
+
+  /**
+   * Get a Readable of all the pages under the specified path, including the root page.
+   */
+  private getPageReadable(basePagePath: string): Readable {
     const Page = mongoose.model<IPage, PageModel>('Page');
     const { PageQueryBuilder } = Page;
 
     const builder = new PageQueryBuilder(Page.find())
-      .addConditionToListOnlyDescendants(basePagePath);
+      .addConditionToListWithDescendants(basePagePath);
 
     return builder
       .query
       .populate('revision')
       .lean()
-      .cursor({ batchSize: 100 }); // convert to stream
+      .cursor({ batchSize: this.pageBatchSize });
   }
 
-  setUpZipArchiver(): Archiver {
-    const timeStamp = (new Date()).getTime();
-    const zipFilePath = path.join(__dirname, `${timeStamp}.md.zip`);
+  /**
+   * Get a Writable that writes the page body to a zip file
+   */
+  private getPageWritable(zipArchiver: Archiver): Writable {
+    return new Writable({
+      objectMode: true,
+      write: async(page: PageDocument, encoding, callback) => {
+        try {
+          const revision = page.revision;
+
+          if (revision != null && isPopulated(revision)) {
+            const markdownBody = revision.body;
+            const pathNormalized = normalizePath(page.path);
+            // Since archiver does not provide a proper way to back pressure at the moment, use the _queue property as a workaround
+            // ref: https://github.com/archiverjs/node-archiver/issues/611
+            const { _queue } = zipArchiver.append(markdownBody, { name: `${pathNormalized}.md` }) as ArchiverWithQueue;
+            if (_queue == null) {
+              throw Error('Cannot back pressure the export pipeline. Aborting the export.');
+            }
+            if (_queue.length() > this.pageBatchSize) {
+              await _queue.drain();
+            }
+          }
+        }
+        catch (err) {
+          callback(err);
+          return;
+        }
+        callback();
+      },
+      final: (callback) => {
+        zipArchiver.finalize();
+        callback();
+      },
+    });
+  }
 
-    const archive = archiver('zip', {
+  private setUpZipArchiver(): Archiver {
+    const zipArchiver = archiver('zip', {
       zlib: { level: 9 }, // maximum compression
     });
 
     // good practice to catch warnings (ie stat failures and other non-blocking errors)
-    archive.on('warning', (err) => {
+    zipArchiver.on('warning', (err) => {
       if (err.code === 'ENOENT') logger.error(err);
       else throw err;
     });
-    // good practice to catch this error explicitly
-    archive.on('error', (err) => { throw err });
-
-    // pipe archive data to the file
-    const output = fs.createWriteStream(zipFilePath);
-    archive.pipe(output);
 
-    return archive;
+    return zipArchiver;
   }
 
-  async bulkExportWithBasePagePath(basePagePath: string): Promise<void> {
-    // get pages with descendants as stream
-    const pageReadableStream = this.getPageReadableStream(basePagePath);
-
-    const archive = this.setUpZipArchiver();
+  private getMultipartUploadWritable(multipartUploader: IAwsMultipartUploader): Writable {
+    let partNumber = 1;
 
-    const pagesWritable = new Writable({
-      objectMode: true,
-      async write(page: PageDocument, encoding, callback) {
+    return new Writable({
+      write: async(part: Buffer, encoding, callback) => {
         try {
-          const revision = page.revision;
-
-          if (revision != null && isPopulated(revision)) {
-            const markdownBody = revision.body;
-            // write to zip
-            const pathNormalized = normalizePath(page.path);
-            archive.append(markdownBody, { name: `${pathNormalized}.md` });
-          }
+          await multipartUploader.uploadPart(part, partNumber);
+          partNumber += 1;
+          // First aid to prevent unexplained memory leaks
+          logger.info('global.gc() invoked.');
+          gc();
         }
         catch (err) {
-          logger.error(err);
-          throw Error('Failed to export page tree');
+          callback(err);
+          return;
         }
-
         callback();
       },
-      final(callback) {
-        archive.finalize();
+      async final(callback) {
+        try {
+          await multipartUploader.completeUpload();
+        }
+        catch (err) {
+          callback(err);
+          return;
+        }
         callback();
       },
     });
-
-    pageReadableStream.pipe(pagesWritable);
-
-    await streamToPromise(archive);
   }
 
 }

+ 20 - 5
apps/app/src/server/service/file-uploader/aws.ts → apps/app/src/server/service/file-uploader/aws/index.ts

@@ -1,3 +1,5 @@
+import { Writable } from 'stream';
+
 import {
   S3Client,
   HeadObjectCommand,
@@ -8,6 +10,9 @@ import {
   ListObjectsCommand,
   type GetObjectCommandInput,
   ObjectCannedACL,
+  CreateMultipartUploadCommand,
+  UploadPartCommand,
+  CompleteMultipartUploadCommand,
 } from '@aws-sdk/client-s3';
 import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
 import urljoin from 'url-join';
@@ -16,12 +21,13 @@ import { ResponseMode, type RespondOptions } from '~/server/interfaces/attachmen
 import type { IAttachmentDocument } from '~/server/models';
 import loggerFactory from '~/utils/logger';
 
-import { configManager } from '../config-manager';
-
+import { configManager } from '../../config-manager';
 import {
   AbstractFileUploader, type TemporaryUrl, type SaveFileParam,
-} from './file-uploader';
-import { ContentHeaders } from './utils';
+} from '../file-uploader';
+import { ContentHeaders } from '../utils';
+
+import { AwsMultipartUploader } from './multipart-upload';
 
 
 const logger = loggerFactory('growi:service:fileUploaderAws');
@@ -91,9 +97,12 @@ const getFilePathOnStorage = (attachment) => {
   return filePath;
 };
 
+export interface IAwsFileUploader {
+  createMultipartUploader: (uploadKey: string) => AwsMultipartUploader
+}
 
 // TODO: rewrite this module to be a type-safe implementation
-class AwsFileUploader extends AbstractFileUploader {
+class AwsFileUploader extends AbstractFileUploader implements IAwsFileUploader {
 
   /**
    * @inheritdoc
@@ -216,6 +225,12 @@ class AwsFileUploader extends AbstractFileUploader {
 
   }
 
+  createMultipartUploader(uploadKey: string) {
+    const s3 = S3Factory();
+    const awsConfig = getAwsConfig();
+    return new AwsMultipartUploader(s3, awsConfig.bucket, uploadKey);
+  }
+
 }
 
 module.exports = (crowi) => {

+ 135 - 0
apps/app/src/server/service/file-uploader/aws/multipart-upload.ts

@@ -0,0 +1,135 @@
+import {
+  CreateMultipartUploadCommand, UploadPartCommand, type S3Client, CompleteMultipartUploadCommand, AbortMultipartUploadCommand,
+} from '@aws-sdk/client-s3';
+
+import loggerFactory from '~/utils/logger';
+
+const logger = loggerFactory('growi:services:fileUploaderAws:multipartUploader');
+
+enum UploadStatus {
+  BEFORE_INIT,
+  IN_PROGRESS,
+  COMPLETED,
+  ABORTED
+}
+
+// Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
+export interface IAwsMultipartUploader {
+  initUpload(): Promise<void>;
+  uploadPart(body: Buffer, partNumber: number): Promise<void>;
+  completeUpload(): Promise<void>;
+  abortUpload(): Promise<void>;
+}
+
+/**
+ * Class for uploading files to S3 using multipart upload.
+ * Create instance from AwsFileUploader class.
+ * Each instance can only be used for one multipart upload, and cannot be reused once completed.
+ * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
+ */
+export class AwsMultipartUploader implements IAwsMultipartUploader {
+
+  private bucket: string;
+
+  private uploadKey: string;
+
+  private uploadId: string | undefined;
+
+  private s3Client: S3Client;
+
+  private parts: { PartNumber: number; ETag: string | undefined; }[] = [];
+
+  private currentStatus: UploadStatus = UploadStatus.BEFORE_INIT;
+
+  constructor(s3Client: S3Client, bucket: string, uploadKey: string) {
+    this.s3Client = s3Client;
+    this.bucket = bucket;
+    this.uploadKey = uploadKey;
+  }
+
+  async initUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.BEFORE_INIT);
+
+    const response = await this.s3Client.send(new CreateMultipartUploadCommand({
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+    }));
+    this.uploadId = response.UploadId;
+    this.currentStatus = UploadStatus.IN_PROGRESS;
+    logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
+  }
+
+  async uploadPart(body: Buffer, partNumber: number): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    const uploadMetaData = await this.s3Client.send(new UploadPartCommand({
+      Body: body,
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+      PartNumber: partNumber,
+      UploadId: this.uploadId,
+    }));
+
+    this.parts.push({
+      PartNumber: partNumber,
+      ETag: uploadMetaData.ETag,
+    });
+  }
+
+  async completeUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    await this.s3Client.send(new CompleteMultipartUploadCommand({
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+      UploadId: this.uploadId,
+      MultipartUpload: {
+        Parts: this.parts,
+      },
+    }));
+    this.currentStatus = UploadStatus.COMPLETED;
+    logger.info(`Multipart upload completed. Upload key: ${this.uploadKey}`);
+  }
+
+  async abortUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    await this.s3Client.send(new AbortMultipartUploadCommand({
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+      UploadId: this.uploadId,
+    }));
+    this.currentStatus = UploadStatus.ABORTED;
+    logger.info(`Multipart upload aborted. Upload key: ${this.uploadKey}`);
+  }
+
+  private validateUploadStatus(desiredStatus: UploadStatus): void {
+    if (desiredStatus === this.currentStatus) return;
+
+    let errMsg: string | null = null;
+
+    if (this.currentStatus === UploadStatus.COMPLETED) {
+      errMsg = 'Multipart upload has already been completed';
+    }
+
+    if (this.currentStatus === UploadStatus.ABORTED) {
+      errMsg = 'Multipart upload has been aborted';
+    }
+
+    // currentStatus is IN_PROGRESS or BEFORE_INIT
+
+    if (this.currentStatus === UploadStatus.IN_PROGRESS && desiredStatus === UploadStatus.BEFORE_INIT) {
+      errMsg = 'Multipart upload has already been initiated';
+    }
+
+    if (this.currentStatus === UploadStatus.BEFORE_INIT && desiredStatus === UploadStatus.IN_PROGRESS) {
+      errMsg = 'Multipart upload not initiated';
+    }
+
+    if (errMsg != null) {
+      logger.error(errMsg);
+      throw Error(errMsg);
+    }
+  }
+
+}

+ 43 - 0
apps/app/src/server/util/stream.ts

@@ -1,3 +1,5 @@
+import { Transform } from 'stream';
+
 export const convertStreamToBuffer = (stream: any): Promise<Buffer> => {
 
   return new Promise((resolve, reject) => {
@@ -12,3 +14,44 @@ export const convertStreamToBuffer = (stream: any): Promise<Buffer> => {
 
   });
 };
+
+export const getBufferToFixedSizeTransform = (size: number): Transform => {
+  let buffer = Buffer.alloc(size);
+  let filledBufferSize = 0;
+
+  return new Transform({
+    transform(chunk: Buffer, encoding, callback) {
+      let offset = 0;
+      while (offset < chunk.length) {
+        // The data size to add to buffer.
+        // - If the remaining chunk size is smaller than the remaining buffer size:
+        //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
+        // - If the remaining chunk size is larger than the remaining buffer size:
+        //     - Fill the buffer, and upload => dataSize is the remaining buffer size
+        //     - The remaining chunk after upload will be added to buffer in the next iteration
+        const dataSize = Math.min(size - filledBufferSize, chunk.length - offset);
+        // Add chunk data to buffer
+        chunk.copy(buffer, filledBufferSize, offset, offset + dataSize);
+        filledBufferSize += dataSize;
+
+        // When buffer reaches size, push to next stream
+        if (filledBufferSize === size) {
+          this.push(buffer);
+          // Reset buffer after push
+          buffer = Buffer.alloc(size);
+          filledBufferSize = 0;
+        }
+
+        offset += dataSize;
+      }
+      callback();
+    },
+    flush(callback) {
+      // push the final buffer
+      if (filledBufferSize > 0) {
+        this.push(buffer.slice(0, filledBufferSize));
+      }
+      callback();
+    },
+  });
+};

+ 5 - 0
yarn.lock

@@ -3817,6 +3817,11 @@
   resolved "https://registry.yarnpkg.com/@types/aria-query/-/aria-query-5.0.4.tgz#1a31c3d378850d2778dabb6374d036dcba4ba708"
   integrity sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw==
 
+"@types/async@^3.2.24":
+  version "3.2.24"
+  resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.24.tgz#3a96351047575bbcf2340541b2d955a35339608f"
+  integrity sha512-8iHVLHsCCOBKjCF2KwFe0p9Z3rfM9mL+sSP8btyR5vTjJRAqpBYD28/ZLgXPf0pjG1VxOvtCV/BgXkQbpSe8Hw==
+
 "@types/babel__core@^7.1.14", "@types/babel__core@^7.20.5":
   version "7.20.5"
   resolved "https://registry.yarnpkg.com/@types/babel__core/-/babel__core-7.20.5.tgz#3df15f27ba85319caa07ba08d0721889bb39c017"