瀏覽代碼

Merge pull request #8842 from weseek/feat/135775-135786-gcs-multipart-upload-bulk-export

Feat/135775 135786 gcs multipart upload bulk export
Yuki Takei 1 年之前
父節點
當前提交
55a8e251ca

+ 9 - 12
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -18,7 +18,8 @@ import { Attachment } from '~/server/models';
 import type { ActivityDocument } from '~/server/models/activity';
 import type { PageModel, PageDocument } from '~/server/models/page';
 import Subscription from '~/server/models/subscription';
-import type { IAwsMultipartUploader } from '~/server/service/file-uploader/aws/multipart-upload';
+import type { FileUploader } from '~/server/service/file-uploader';
+import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
 import { preNotifyService } from '~/server/service/pre-notify';
 import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import loggerFactory from '~/utils/logger';
@@ -45,8 +46,8 @@ class PageBulkExportService {
 
   activityEvent: any;
 
-  // multipart upload part size
-  partSize = 5 * 1024 * 1024; // 5MB
+  // multipart upload max part size
+  maxPartSize = 5 * 1024 * 1024; // 5MB
 
   pageBatchSize = 100;
 
@@ -71,15 +72,12 @@ class PageBulkExportService {
     const pagesReadable = this.getPageReadable(basePagePath);
     const zipArchiver = this.setUpZipArchiver();
     const pagesWritable = this.getPageWritable(zipArchiver);
-    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.partSize);
+    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
 
     // init multipart upload
-    // TODO: Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
-    const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
+    const fileUploadService: FileUploader = this.crowi.fileUploadService;
+    const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
     let pageBulkExportJob: PageBulkExportJobDocument & HasObjectId;
-    if (multipartUploader == null) {
-      throw Error('Multipart upload not available for configured file upload type');
-    }
     try {
       await multipartUploader.initUpload();
       pageBulkExportJob = await PageBulkExportJob.create({
@@ -106,10 +104,9 @@ class PageBulkExportService {
   }
 
   private async handleExportErrorInStream(
-      err: Error | null, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, multipartUploader: IAwsMultipartUploader,
+      err: Error | null, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, multipartUploader: IMultipartUploader,
   ): Promise<void> {
     if (err != null) {
-      logger.error(err);
       await multipartUploader.abortUpload();
       await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED);
     }
@@ -184,7 +181,7 @@ class PageBulkExportService {
   }
 
   private getMultipartUploadWritable(
-      multipartUploader: IAwsMultipartUploader,
+      multipartUploader: IMultipartUploader,
       pageBulkExportJob: PageBulkExportJobDocument,
       attachment: IAttachmentDocument,
       activityParameters: ActivityParameters,

+ 4 - 8
apps/app/src/server/service/file-uploader/aws/index.ts

@@ -24,7 +24,7 @@ import {
 } from '../file-uploader';
 import { ContentHeaders } from '../utils';
 
-import { AwsMultipartUploader } from './multipart-upload';
+import { AwsMultipartUploader } from './multipart-uploader';
 
 
 const logger = loggerFactory('growi:service:fileUploaderAws');
@@ -111,12 +111,8 @@ const getFilePathOnStorage = (attachment: IAttachmentDocument) => {
   return filePath;
 };
 
-export interface IAwsFileUploader {
-  createMultipartUploader: (uploadKey: string) => AwsMultipartUploader
-}
-
 // TODO: rewrite this module to be a type-safe implementation
-class AwsFileUploader extends AbstractFileUploader implements IAwsFileUploader {
+class AwsFileUploader extends AbstractFileUploader {
 
   /**
    * @inheritdoc
@@ -237,9 +233,9 @@ class AwsFileUploader extends AbstractFileUploader implements IAwsFileUploader {
 
   }
 
-  createMultipartUploader(uploadKey: string) {
+  override createMultipartUploader(uploadKey: string, maxPartSize: number) {
     const s3 = S3Factory();
-    return new AwsMultipartUploader(s3, getS3Bucket(), uploadKey);
+    return new AwsMultipartUploader(s3, getS3Bucket(), uploadKey, maxPartSize);
   }
 
 }

+ 21 - 60
apps/app/src/server/service/file-uploader/aws/multipart-upload.ts → apps/app/src/server/service/file-uploader/aws/multipart-uploader.ts

@@ -5,6 +5,9 @@ import {
 
 import loggerFactory from '~/utils/logger';
 
+import { MultipartUploader, type IMultipartUploader } from '../multipart-uploader';
+
+
 const logger = loggerFactory('growi:services:fileUploaderAws:multipartUploader');
 
 enum UploadStatus {
@@ -14,15 +17,7 @@ enum UploadStatus {
   ABORTED
 }
 
-// Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
-export interface IAwsMultipartUploader {
-  initUpload(): Promise<void>;
-  uploadPart(body: Buffer, partNumber: number): Promise<void>;
-  completeUpload(): Promise<void>;
-  abortUpload(): Promise<void>;
-  uploadId: string | undefined;
-  getUploadedFileSize(): Promise<number>;
-}
+export type IAwsMultipartUploader = IMultipartUploader
 
 /**
  * Class for uploading files to S3 using multipart upload.
@@ -30,32 +25,22 @@ export interface IAwsMultipartUploader {
  * Each instance can only be used for one multipart upload, and cannot be reused once completed.
  * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
  */
-export class AwsMultipartUploader implements IAwsMultipartUploader {
+export class AwsMultipartUploader extends MultipartUploader implements IAwsMultipartUploader {
 
   private bucket: string | undefined;
 
-  private uploadKey: string;
-
-  private _uploadId: string | undefined;
-
   private s3Client: S3Client;
 
   private parts: { PartNumber: number; ETag: string | undefined; }[] = [];
 
-  private currentStatus: UploadStatus = UploadStatus.BEFORE_INIT;
-
-  private _uploadedFileSize: number | undefined;
+  constructor(s3Client: S3Client, bucket: string | undefined, uploadKey: string, maxPartSize: number) {
+    super(uploadKey, maxPartSize);
 
-  constructor(s3Client: S3Client, bucket: string | undefined, uploadKey: string) {
     this.s3Client = s3Client;
     this.bucket = bucket;
     this.uploadKey = uploadKey;
   }
 
-  get uploadId(): string | undefined {
-    return this._uploadId;
-  }
-
   async initUpload(): Promise<void> {
     this.validateUploadStatus(UploadStatus.BEFORE_INIT);
 
@@ -63,16 +48,20 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
       Bucket: this.bucket,
       Key: this.uploadKey,
     }));
+    if (response.UploadId == null) {
+      throw Error('UploadId is empty');
+    }
     this._uploadId = response.UploadId;
     this.currentStatus = UploadStatus.IN_PROGRESS;
     logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
   }
 
-  async uploadPart(body: Buffer, partNumber: number): Promise<void> {
+  async uploadPart(part: Buffer, partNumber: number): Promise<void> {
     this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+    this.validatePartSize(part.length);
 
     const uploadMetaData = await this.s3Client.send(new UploadPartCommand({
-      Body: body,
+      Body: part,
       Bucket: this.bucket,
       Key: this.uploadKey,
       PartNumber: partNumber,
@@ -83,6 +72,7 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
       PartNumber: partNumber,
       ETag: uploadMetaData.ETag,
     });
+    this._uploadedFileSize += part.length;
   }
 
   async completeUpload(): Promise<void> {
@@ -113,44 +103,15 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
   }
 
   async getUploadedFileSize(): Promise<number> {
-    if (this._uploadedFileSize != null) return this._uploadedFileSize;
-
-    this.validateUploadStatus(UploadStatus.COMPLETED);
-    const headData = await this.s3Client.send(new HeadObjectCommand({
-      Bucket: this.bucket,
-      Key: this.uploadKey,
-    }));
-    this._uploadedFileSize = headData.ContentLength;
-    return this._uploadedFileSize ?? 0;
-  }
-
-  private validateUploadStatus(desiredStatus: UploadStatus): void {
-    if (desiredStatus === this.currentStatus) return;
-
-    let errMsg: string | null = null;
-
     if (this.currentStatus === UploadStatus.COMPLETED) {
-      errMsg = 'Multipart upload has already been completed';
-    }
-
-    if (this.currentStatus === UploadStatus.ABORTED) {
-      errMsg = 'Multipart upload has been aborted';
-    }
-
-    // currentStatus is IN_PROGRESS or BEFORE_INIT
-
-    if (this.currentStatus === UploadStatus.IN_PROGRESS && desiredStatus === UploadStatus.BEFORE_INIT) {
-      errMsg = 'Multipart upload has already been initiated';
-    }
-
-    if (this.currentStatus === UploadStatus.BEFORE_INIT && desiredStatus === UploadStatus.IN_PROGRESS) {
-      errMsg = 'Multipart upload not initiated';
-    }
-
-    if (errMsg != null) {
-      logger.error(errMsg);
-      throw Error(errMsg);
+      const headData = await this.s3Client.send(new HeadObjectCommand({
+        Bucket: this.bucket,
+        Key: this.uploadKey,
+      }));
+      if (headData.ContentLength == null) throw Error('Could not fetch uploaded file size');
+      this._uploadedFileSize = headData.ContentLength;
     }
+    return this._uploadedFileSize;
   }
 
 }

+ 7 - 0
apps/app/src/server/service/file-uploader/file-uploader.ts

@@ -9,6 +9,8 @@ import loggerFactory from '~/utils/logger';
 
 import { configManager } from '../config-manager';
 
+import type { MultipartUploader } from './multipart-uploader';
+
 const logger = loggerFactory('growi:service:fileUploader');
 
 
@@ -39,6 +41,7 @@ export interface FileUploader {
   respond(res: Response, attachment: IAttachmentDocument, opts?: RespondOptions): void,
   findDeliveryFile(attachment: IAttachmentDocument): Promise<NodeJS.ReadableStream>,
   generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise<TemporaryUrl>,
+  createMultipartUploader: (uploadKey: string, maxPartSize: number) => MultipartUploader,
 }
 
 export abstract class AbstractFileUploader implements FileUploader {
@@ -151,6 +154,10 @@ export abstract class AbstractFileUploader implements FileUploader {
     return ResponseMode.RELAY;
   }
 
+  createMultipartUploader(uploadKey: string, maxPartSize: number): MultipartUploader {
+    throw new Error('Multipart upload not available for file upload type');
+  }
+
   /**
    * Respond to the HTTP request.
    */

+ 12 - 5
apps/app/src/server/service/file-uploader/gcs.ts → apps/app/src/server/service/file-uploader/gcs/index.ts

@@ -8,12 +8,14 @@ import {
 import type { IAttachmentDocument } from '~/server/models';
 import loggerFactory from '~/utils/logger';
 
-import { configManager } from '../config-manager';
-
+import { configManager } from '../../config-manager';
 import {
   AbstractFileUploader, type TemporaryUrl, type SaveFileParam,
-} from './file-uploader';
-import { ContentHeaders } from './utils';
+} from '../file-uploader';
+import { ContentHeaders } from '../utils';
+
+import type { IGcsMultipartUploader } from './multipart-uploader';
+import { GcsMultipartUploader } from './multipart-uploader';
 
 const logger = loggerFactory('growi:service:fileUploaderGcs');
 
@@ -62,7 +64,6 @@ async function isFileExists(file) {
   return res[0];
 }
 
-
 // TODO: rewrite this module to be a type-safe implementation
 class GcsFileUploader extends AbstractFileUploader {
 
@@ -170,6 +171,12 @@ class GcsFileUploader extends AbstractFileUploader {
 
   }
 
+  override createMultipartUploader(uploadKey: string, maxPartSize: number) {
+    const gcs = getGcsInstance();
+    const myBucket = gcs.bucket(getGcsBucket());
+    return new GcsMultipartUploader(myBucket, uploadKey, maxPartSize);
+  }
+
 }
 
 

+ 130 - 0
apps/app/src/server/service/file-uploader/gcs/multipart-uploader.ts

@@ -0,0 +1,130 @@
+import type { Bucket, File } from '@google-cloud/storage';
+// eslint-disable-next-line no-restricted-imports
+import axios from 'axios';
+
+import loggerFactory from '~/utils/logger';
+
+import { MultipartUploader, type IMultipartUploader } from '../multipart-uploader';
+
+const logger = loggerFactory('growi:services:fileUploaderGcs:multipartUploader');
+
+enum UploadStatus {
+  BEFORE_INIT,
+  IN_PROGRESS,
+  COMPLETED,
+  ABORTED
+}
+
+export type IGcsMultipartUploader = IMultipartUploader
+
+/**
+ * Class for uploading files to GCS using multipart upload.
+ * Create instance from GcsFileUploader class.
+ * Each instance can only be used for one multipart upload, and cannot be reused once completed.
+ * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
+ */
+export class GcsMultipartUploader extends MultipartUploader implements IGcsMultipartUploader {
+
+  private file: File;
+
+  // ref: https://cloud.google.com/storage/docs/performing-resumable-uploads?hl=en#chunked-upload
+  private readonly minPartSize = 256 * 1024; // 256KB
+
+  constructor(bucket: Bucket, uploadKey: string, maxPartSize: number) {
+    super(uploadKey, maxPartSize);
+
+    this.file = bucket.file(this.uploadKey);
+  }
+
+  async initUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.BEFORE_INIT);
+
+    const [uploadUrl] = await this.file.createResumableUpload();
+
+    this._uploadId = uploadUrl;
+    this.currentStatus = UploadStatus.IN_PROGRESS;
+    logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
+  }
+
+  async uploadPart(part: Buffer, partNumber: number): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+    this.validatePartSize(part.length);
+
+    // Upload the whole part in one request, or divide it in chunks and upload depending on the part size
+    if (part.length === this.maxPartSize) {
+      await this.uploadChunk(part);
+    }
+    else if (this.minPartSize < part.length && part.length < this.maxPartSize) {
+      const numOfMinPartSize = Math.floor(part.length / this.minPartSize);
+      const minPartSizeMultiplePartChunk = part.slice(0, numOfMinPartSize * this.minPartSize);
+      const lastPartChunk = part.slice(numOfMinPartSize * this.minPartSize);
+
+      await this.uploadChunk(minPartSizeMultiplePartChunk);
+      await this.uploadChunk(lastPartChunk, true);
+    }
+    else if (part.length < this.minPartSize) {
+      await this.uploadChunk(part, true);
+    }
+  }
+
+  async completeUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    // Send a request to complete the upload, in case the last uploadPart request did not request completion.
+    await axios.put(this.uploadId, {
+      headers: {
+        'Content-Range': `bytes */${this._uploadedFileSize}`,
+      },
+    });
+    this.currentStatus = UploadStatus.COMPLETED;
+    logger.info(`Multipart upload completed. Upload key: ${this.uploadKey}`);
+  }
+
+  async abortUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    try {
+      await axios.delete(this.uploadId);
+    }
+    catch (e) {
+      if (e.response?.status !== 499) {
+        throw e;
+      }
+    }
+    this.currentStatus = UploadStatus.ABORTED;
+    logger.info(`Multipart upload aborted. Upload key: ${this.uploadKey}`);
+  }
+
+  async getUploadedFileSize(): Promise<number> {
+    if (this.currentStatus === UploadStatus.COMPLETED) {
+      const [metadata] = await this.file.getMetadata();
+      this._uploadedFileSize = metadata.size;
+    }
+    return this._uploadedFileSize;
+  }
+
+  private uploadChunk = async(chunk, isLastUpload = false) => {
+    // If chunk size is larger than the minimal part size, it is required to be a multiple of the minimal part size
+    // ref: https://cloud.google.com/storage/docs/performing-resumable-uploads?hl=en#chunked-upload
+    if (chunk.length > this.minPartSize && chunk.length % this.minPartSize !== 0) throw Error(`chunk must be a multiple of ${this.minPartSize}`);
+
+    const range = isLastUpload
+      ? `bytes ${this._uploadedFileSize}-${this._uploadedFileSize + chunk.length - 1}/${this._uploadedFileSize + chunk.length}`
+      : `bytes ${this._uploadedFileSize}-${this._uploadedFileSize + chunk.length - 1}/*`;
+
+    try {
+      await axios.put(this.uploadId, chunk, {
+        headers: {
+          'Content-Range': `${range}`,
+        },
+      });
+    }
+    catch (e) {
+      if (e.response?.status !== 308) {
+        throw e;
+      }
+    }
+    this._uploadedFileSize += chunk.length;
+  };
+
+}

+ 86 - 0
apps/app/src/server/service/file-uploader/multipart-uploader.ts

@@ -0,0 +1,86 @@
+import loggerFactory from '~/utils/logger';
+
+const logger = loggerFactory('growi:services:fileUploader:multipartUploader');
+
+enum UploadStatus {
+  BEFORE_INIT,
+  IN_PROGRESS,
+  COMPLETED,
+  ABORTED
+}
+
+export interface IMultipartUploader {
+  initUpload(): Promise<void>;
+  uploadPart(body: Buffer, partNumber: number): Promise<void>;
+  completeUpload(): Promise<void>;
+  abortUpload(): Promise<void>;
+  uploadId: string;
+  getUploadedFileSize(): Promise<number>;
+}
+
+export abstract class MultipartUploader implements IMultipartUploader {
+
+  protected uploadKey: string;
+
+  protected _uploadId: string | undefined;
+
+  protected currentStatus: UploadStatus = UploadStatus.BEFORE_INIT;
+
+  protected _uploadedFileSize = 0;
+
+  protected readonly maxPartSize: number;
+
+  constructor(uploadKey: string, maxPartSize: number) {
+    this.maxPartSize = maxPartSize;
+    this.uploadKey = uploadKey;
+  }
+
+  get uploadId(): string {
+    if (this._uploadId == null) throw Error('UploadId is empty');
+    return this._uploadId;
+  }
+
+  abstract initUpload(): Promise<void>
+
+  abstract uploadPart(part: Buffer, partNumber: number): Promise<void>
+
+  abstract completeUpload(): Promise<void>
+
+  abstract abortUpload(): Promise<void>
+
+  abstract getUploadedFileSize(): Promise<number>
+
+  protected validatePartSize(partSize: number): void {
+    if (partSize > this.maxPartSize) throw Error(`partSize must be less than or equal to ${this.maxPartSize}`);
+  }
+
+  protected validateUploadStatus(desiredStatus: UploadStatus): void {
+    if (desiredStatus === this.currentStatus) return;
+
+    let errMsg: string | null = null;
+
+    if (this.currentStatus === UploadStatus.COMPLETED) {
+      errMsg = 'Multipart upload has already been completed';
+    }
+
+    if (this.currentStatus === UploadStatus.ABORTED) {
+      errMsg = 'Multipart upload has been aborted';
+    }
+
+    // currentStatus is IN_PROGRESS or BEFORE_INIT
+
+    if (this.currentStatus === UploadStatus.IN_PROGRESS && desiredStatus === UploadStatus.BEFORE_INIT) {
+      errMsg = 'Multipart upload has already been initiated';
+    }
+
+    if (this.currentStatus === UploadStatus.BEFORE_INIT && desiredStatus === UploadStatus.IN_PROGRESS) {
+      errMsg = 'Multipart upload not initiated';
+    }
+
+    if (errMsg != null) {
+      logger.error(errMsg);
+      throw Error(errMsg);
+    }
+  }
+
+}