Просмотр исходного кода

page bulk export with multipart upload

Futa Arai 2 лет назад
Родитель
Сommit
c641698c02

+ 88 - 25
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -4,10 +4,12 @@ import { Writable } from 'stream';
 
 import { type IPage, isPopulated } from '@growi/core';
 import { normalizePath } from '@growi/core/dist/utils/path-utils';
-import archiver, { Archiver } from 'archiver';
+import type { Archiver } from 'archiver';
+import archiver from 'archiver';
 import mongoose from 'mongoose';
 
-import { PageModel, PageDocument } from '~/server/models/page';
+import type { PageModel, PageDocument } from '~/server/models/page';
+import type { IAwsMultipartUploader } from '~/server/service/file-uploader/aws/multipart-upload';
 import loggerFactory from '~/utils/logger';
 
 
@@ -19,10 +21,16 @@ class PageBulkExportService {
 
   crowi: any;
 
+  // multipart upload part size
+  partSize = 5 * 1024 * 1024; // 5MB
+
   constructor(crowi) {
     this.crowi = crowi;
   }
 
+  /**
+   * Get a ReadableStream of all the pages under the specified path, including the root page.
+   */
   getPageReadableStream(basePagePath: string) {
     const Page = mongoose.model<IPage, PageModel>('Page');
     const { PageQueryBuilder } = Page;
@@ -37,8 +45,38 @@ class PageBulkExportService {
       .cursor({ batchSize: 100 }); // convert to stream
   }
 
-  setUpZipArchiver(): Archiver {
-    const timeStamp = (new Date()).getTime();
+  /**
+   * Get a Writable that writes the page body to a zip file
+   */
+  getPageWritable(archive: Archiver) {
+    return new Writable({
+      objectMode: true,
+      async write(page: PageDocument, encoding, callback) {
+        try {
+          const revision = page.revision;
+
+          if (revision != null && isPopulated(revision)) {
+            const markdownBody = revision.body;
+            // write to zip
+            const pathNormalized = normalizePath(page.path);
+            archive.append(markdownBody, { name: `${pathNormalized}.md` });
+          }
+        }
+        catch (err) {
+          logger.error(err);
+          throw Error('Failed to export page tree');
+        }
+
+        callback();
+      },
+      final(callback) {
+        archive.finalize();
+        callback();
+      },
+    });
+  }
+
+  setUpZipArchiver(timeStamp: number): Archiver {
     const zipFilePath = path.join(__dirname, `${timeStamp}.md.zip`);
 
     const archive = archiver('zip', {
@@ -61,40 +99,65 @@ class PageBulkExportService {
   }
 
   async bulkExportWithBasePagePath(basePagePath: string): Promise<void> {
+    if (this.crowi?.fileUploadService?.createMultipartUploader == null) {
+      throw Error('Multipart upload not available for configured file upload type');
+    }
+    const timeStamp = (new Date()).getTime();
+
+    const uploadKey = `page-bulk-export-${timeStamp}`;
+
     // get pages with descendants as stream
     const pageReadableStream = this.getPageReadableStream(basePagePath);
 
-    const archive = this.setUpZipArchiver();
+    const archive = this.setUpZipArchiver(timeStamp);
 
-    const pagesWritable = new Writable({
-      objectMode: true,
-      async write(page: PageDocument, encoding, callback) {
-        try {
-          const revision = page.revision;
+    const pagesWritable = this.getPageWritable(archive);
 
-          if (revision != null && isPopulated(revision)) {
-            const markdownBody = revision.body;
-            // write to zip
-            const pathNormalized = normalizePath(page.path);
-            archive.append(markdownBody, { name: `${pathNormalized}.md` });
+    const multipartUploadWritable = await this.getMultipartUploadWritable(uploadKey, this.partSize);
+
+    archive.pipe(multipartUploadWritable);
+    pageReadableStream.pipe(pagesWritable);
+
+    await streamToPromise(archive);
+  }
+
+
+  async getMultipartUploadWritable(uploadKey: string, partSize: number) {
+    const multipartUploader: IAwsMultipartUploader = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
+
+    let partNumber = 1;
+    let buffer = Buffer.alloc(0);
+
+    await multipartUploader.initUpload();
+
+    return new Writable({
+      objectMode: true,
+      async write(chunk, encoding, callback) {
+        let offset = 0;
+        while (offset < chunk.length) {
+          const chunkSize = Math.min(partSize - buffer.length, chunk.length - offset);
+          buffer = Buffer.concat([buffer, chunk.slice(offset, offset + chunkSize)]);
+          if (buffer.length === partSize) {
+            // eslint-disable-next-line no-await-in-loop
+            await multipartUploader.uploadPart(buffer, partNumber);
+
+            buffer = Buffer.alloc(0);
+            partNumber += 1;
           }
-        }
-        catch (err) {
-          logger.error(err);
-          throw Error('Failed to export page tree');
+
+          offset += chunkSize;
         }
 
         callback();
       },
-      final(callback) {
-        archive.finalize();
+      async final(callback) {
+        if (buffer.length > 0) {
+          await multipartUploader.uploadPart(buffer, partNumber);
+        }
+        await multipartUploader.completeUpload();
         callback();
       },
     });
-
-    pageReadableStream.pipe(pagesWritable);
-
-    await streamToPromise(archive);
   }
 
 }

+ 20 - 5
apps/app/src/server/service/file-uploader/aws.ts → apps/app/src/server/service/file-uploader/aws/index.ts

@@ -1,3 +1,5 @@
+import { Writable } from 'stream';
+
 import {
   S3Client,
   HeadObjectCommand,
@@ -8,6 +10,9 @@ import {
   ListObjectsCommand,
   type GetObjectCommandInput,
   ObjectCannedACL,
+  CreateMultipartUploadCommand,
+  UploadPartCommand,
+  CompleteMultipartUploadCommand,
 } from '@aws-sdk/client-s3';
 import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
 import urljoin from 'url-join';
@@ -16,12 +21,13 @@ import { ResponseMode, type RespondOptions } from '~/server/interfaces/attachmen
 import type { IAttachmentDocument } from '~/server/models';
 import loggerFactory from '~/utils/logger';
 
-import { configManager } from '../config-manager';
-
+import { configManager } from '../../config-manager';
 import {
   AbstractFileUploader, type TemporaryUrl, type SaveFileParam,
-} from './file-uploader';
-import { ContentHeaders } from './utils';
+} from '../file-uploader';
+import { ContentHeaders } from '../utils';
+
+import { AwsMultipartUploader } from './multipart-upload';
 
 
 const logger = loggerFactory('growi:service:fileUploaderAws');
@@ -91,9 +97,12 @@ const getFilePathOnStorage = (attachment) => {
   return filePath;
 };
 
+export interface IAwsFileUploader {
+  createMultipartUploader: (uploadKey: string) => Promise<AwsMultipartUploader>
+}
 
 // TODO: rewrite this module to be a type-safe implementation
-class AwsFileUploader extends AbstractFileUploader {
+class AwsFileUploader extends AbstractFileUploader implements IAwsFileUploader {
 
   /**
    * @inheritdoc
@@ -216,6 +225,12 @@ class AwsFileUploader extends AbstractFileUploader {
 
   }
 
+  async createMultipartUploader(uploadKey: string) {
+    const s3 = S3Factory();
+    const awsConfig = getAwsConfig();
+    return new AwsMultipartUploader(s3, awsConfig.bucket, uploadKey);
+  }
+
 }
 
 module.exports = (crowi) => {

+ 112 - 0
apps/app/src/server/service/file-uploader/aws/multipart-upload.ts

@@ -0,0 +1,112 @@
+import {
+  CreateMultipartUploadCommand, UploadPartCommand, type S3Client, CompleteMultipartUploadCommand,
+} from '@aws-sdk/client-s3';
+
+import loggerFactory from '~/utils/logger';
+
+const logger = loggerFactory('growi:services:fileUploaderAws:multipartUploader');
+
+enum UploadStatus {
+  BEFORE_INIT,
+  IN_PROGRESS,
+  COMPLETED,
+}
+
+export interface IAwsMultipartUploader {
+  initUpload(): Promise<void>;
+  uploadPart(body: Buffer, partNumber: number): Promise<void>;
+  completeUpload(): Promise<void>;
+}
+
+/**
+ * Class for uploading files to S3 using multipart upload.
+ * Create instance from AwsFileUploader class.
+ * Each instance can only be used for one multipart upload, and cannot be reused once completed.
+ * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
+ */
+export class AwsMultipartUploader implements IAwsMultipartUploader {
+
+  private bucket: string;
+
+  private uploadKey: string;
+
+  private uploadId: string | undefined;
+
+  private s3Client: S3Client;
+
+  private parts: { PartNumber: number; ETag: string | undefined; }[] = [];
+
+  private currentStatus: UploadStatus = UploadStatus.BEFORE_INIT;
+
+  constructor(s3Client: S3Client, bucket: string, uploadKey: string) {
+    this.s3Client = s3Client;
+    this.bucket = bucket;
+    this.uploadKey = uploadKey;
+  }
+
+  async initUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.BEFORE_INIT);
+
+    const response = await this.s3Client.send(new CreateMultipartUploadCommand({
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+    }));
+    this.uploadId = response.UploadId;
+    this.currentStatus = UploadStatus.IN_PROGRESS;
+  }
+
+  async uploadPart(body: Buffer, partNumber: number): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    const uploadMetaData = await this.s3Client.send(new UploadPartCommand({
+      Body: body,
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+      PartNumber: partNumber,
+      UploadId: this.uploadId,
+    }));
+
+    this.parts.push({
+      PartNumber: partNumber,
+      ETag: uploadMetaData.ETag,
+    });
+  }
+
+  async completeUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    await this.s3Client.send(new CompleteMultipartUploadCommand({
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+      UploadId: this.uploadId,
+      MultipartUpload: {
+        Parts: this.parts,
+      },
+    }));
+    this.currentStatus = UploadStatus.COMPLETED;
+  }
+
+  private validateUploadStatus(desiredStatus: UploadStatus): void {
+    if (desiredStatus === this.currentStatus) return;
+
+    let errMsg: string | null = null;
+
+    if (this.currentStatus === UploadStatus.COMPLETED) {
+      errMsg = 'Multipart upload has already been completed';
+    }
+
+    if (desiredStatus === UploadStatus.BEFORE_INIT) {
+      errMsg = 'Multipart upload has already been initiated';
+    }
+
+    if (desiredStatus === UploadStatus.IN_PROGRESS) {
+      errMsg = 'Multipart upload not initiated';
+    }
+
+    if (errMsg != null) {
+      logger.error(errMsg);
+      throw Error(errMsg);
+    }
+  }
+
+}