Просмотр исходного кода

add multipart uploader for gcs

Futa Arai 1 год назад
Родитель
Сommit
452a038b81

+ 256 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-old.ts

@@ -0,0 +1,256 @@
+import type { Readable } from 'stream';
+import { Writable, pipeline } from 'stream';
+
+import type { HasObjectId } from '@growi/core';
+import { type IPage, isPopulated, SubscriptionStatusType } from '@growi/core';
+import { normalizePath } from '@growi/core/dist/utils/path-utils';
+import type { Archiver } from 'archiver';
+import archiver from 'archiver';
+import type { QueueObject } from 'async';
+import gc from 'expose-gc/function';
+import mongoose from 'mongoose';
+
+import type { SupportedActionType } from '~/interfaces/activity';
+import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
+import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
+import type { IAttachmentDocument } from '~/server/models';
+import { Attachment } from '~/server/models';
+import type { ActivityDocument } from '~/server/models/activity';
+import type { PageModel, PageDocument } from '~/server/models/page';
+import Subscription from '~/server/models/subscription';
+import type { IAwsMultipartUploader } from '~/server/service/file-uploader/aws/multipart-upload';
+import { preNotifyService } from '~/server/service/pre-notify';
+import { getBufferToFixedSizeTransform } from '~/server/util/stream';
+import loggerFactory from '~/utils/logger';
+
+import { PageBulkExportFormat } from '../../interfaces/page-bulk-export';
+import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
+import PageBulkExportJob from '../models/page-bulk-export-job';
+
+const logger = loggerFactory('growi:services:PageBulkExportService');
+
+// Custom type for back pressure workaround
+interface ArchiverWithQueue extends Archiver {
+  _queue?: QueueObject<any>;
+}
+
+type ActivityParameters ={
+  ip: string;
+  endpoint: string;
+}
+
+class PageBulkExportService {
+
+  crowi: any;
+
+  activityEvent: any;
+
+  // multipart upload part size
+  maxPartSize = 5 * 1024 * 1024; // 5MB
+
+  pageBatchSize = 100;
+
+  constructor(crowi) {
+    this.crowi = crowi;
+    this.activityEvent = crowi.event('activity');
+  }
+
+  async bulkExportWithBasePagePath(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
+    const Page = mongoose.model<IPage, PageModel>('Page');
+    const basePage = await Page.findByPathAndViewer(basePagePath, currentUser, null, true);
+
+    if (basePage == null) {
+      throw new Error('Base page not found or not accessible');
+    }
+
+    const timeStamp = (new Date()).getTime();
+    const originalName = `page-bulk-export-${timeStamp}.zip`;
+    const attachment = Attachment.createWithoutSave(null, currentUser, originalName, 'zip', 0, AttachmentType.PAGE_BULK_EXPORT);
+    const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
+
+    const pagesReadable = this.getPageReadable(basePagePath);
+    const zipArchiver = this.setUpZipArchiver();
+    const pagesWritable = this.getPageWritable(zipArchiver);
+    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
+
+    // init multipart upload
+    // TODO: Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
+    const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
+    let pageBulkExportJob: PageBulkExportJobDocument & HasObjectId;
+    if (multipartUploader == null) {
+      throw Error('Multipart upload not available for configured file upload type');
+    }
+    try {
+      await multipartUploader.initUpload();
+      pageBulkExportJob = await PageBulkExportJob.create({
+        user: currentUser,
+        page: basePage,
+        uploadId: multipartUploader.uploadId,
+        format: PageBulkExportFormat.markdown,
+      });
+      await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
+    }
+    catch (err) {
+      logger.error(err);
+      await multipartUploader.abortUpload();
+      throw err;
+    }
+
+    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment, activityParameters);
+
+    // Cannot directly pipe from pagesWritable to zipArchiver due to how the 'append' method works.
+    // Hence, execution of two pipelines is required.
+    pipeline(pagesReadable, pagesWritable, err => this.handleExportErrorInStream(err, activityParameters, pageBulkExportJob, multipartUploader));
+    pipeline(zipArchiver, bufferToPartSizeTransform, multipartUploadWritable,
+      err => this.handleExportErrorInStream(err, activityParameters, pageBulkExportJob, multipartUploader));
+  }
+
+  private async handleExportErrorInStream(
+      err: Error | null, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, multipartUploader: IAwsMultipartUploader,
+  ): Promise<void> {
+    if (err != null) {
+      logger.error(err);
+      await multipartUploader.abortUpload();
+      await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED);
+    }
+  }
+
+  /**
+   * Get a Readable of all the pages under the specified path, including the root page.
+   */
+  private getPageReadable(basePagePath: string): Readable {
+    const Page = mongoose.model<IPage, PageModel>('Page');
+    const { PageQueryBuilder } = Page;
+
+    const builder = new PageQueryBuilder(Page.find())
+      .addConditionToListWithDescendants(basePagePath);
+
+    return builder
+      .query
+      .populate('revision')
+      .lean()
+      .cursor({ batchSize: this.pageBatchSize });
+  }
+
+  /**
+   * Get a Writable that writes the page body to a zip file
+   */
+  private getPageWritable(zipArchiver: Archiver): Writable {
+    return new Writable({
+      objectMode: true,
+      write: async(page: PageDocument, encoding, callback) => {
+        try {
+          const revision = page.revision;
+
+          if (revision != null && isPopulated(revision)) {
+            const markdownBody = revision.body;
+            const pathNormalized = normalizePath(page.path);
+            // Since archiver does not provide a proper way to back pressure at the moment, use the _queue property as a workaround
+            // ref: https://github.com/archiverjs/node-archiver/issues/611
+            const { _queue } = zipArchiver.append(markdownBody, { name: `${pathNormalized}.md` }) as ArchiverWithQueue;
+            if (_queue == null) {
+              throw Error('Cannot back pressure the export pipeline. Aborting the export.');
+            }
+            if (_queue.length() > this.pageBatchSize) {
+              await _queue.drain();
+            }
+          }
+        }
+        catch (err) {
+          callback(err);
+          return;
+        }
+        callback();
+      },
+      final: (callback) => {
+        zipArchiver.finalize();
+        callback();
+      },
+    });
+  }
+
+  private setUpZipArchiver(): Archiver {
+    const zipArchiver = archiver('zip', {
+      zlib: { level: 9 }, // maximum compression
+    });
+
+    // good practice to catch warnings (ie stat failures and other non-blocking errors)
+    zipArchiver.on('warning', (err) => {
+      if (err.code === 'ENOENT') logger.error(err);
+      else throw err;
+    });
+
+    return zipArchiver;
+  }
+
+  private getMultipartUploadWritable(
+      multipartUploader: IAwsMultipartUploader,
+      pageBulkExportJob: PageBulkExportJobDocument,
+      attachment: IAttachmentDocument,
+      activityParameters: ActivityParameters,
+  ): Writable {
+    let partNumber = 1;
+
+    return new Writable({
+      write: async(part: Buffer, encoding, callback) => {
+        try {
+          await multipartUploader.uploadPart(part, partNumber);
+          partNumber += 1;
+          // First aid to prevent unexplained memory leaks
+          logger.info('global.gc() invoked.');
+          gc();
+        }
+        catch (err) {
+          callback(err);
+          return;
+        }
+        callback();
+      },
+      final: async(callback) => {
+        try {
+          await multipartUploader.completeUpload();
+
+          const fileSize = await multipartUploader.getUploadedFileSize();
+          attachment.fileSize = fileSize;
+          await attachment.save();
+
+          pageBulkExportJob.completedAt = new Date();
+          pageBulkExportJob.attachment = attachment._id;
+          await pageBulkExportJob.save();
+
+          await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED);
+        }
+        catch (err) {
+          callback(err);
+          return;
+        }
+        callback();
+      },
+    });
+  }
+
+  private async notifyExportResult(
+      activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType,
+  ) {
+    const activity = await this.crowi.activityService.createActivity({
+      ...activityParameters,
+      action,
+      targetModel: SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB,
+      target: pageBulkExportJob,
+      user: pageBulkExportJob.user,
+      snapshot: {
+        username: isPopulated(pageBulkExportJob.user) ? pageBulkExportJob.user.username : '',
+      },
+    });
+    const getAdditionalTargetUsers = (activity: ActivityDocument) => [activity.user];
+    const preNotify = preNotifyService.generatePreNotify(activity, getAdditionalTargetUsers);
+    this.activityEvent.emit('updated', activity, pageBulkExportJob, preNotify);
+  }
+
+}
+
+// eslint-disable-next-line import/no-mutable-exports
+export let pageBulkExportService: PageBulkExportService | undefined; // singleton instance
+export default function instanciate(crowi): void {
+  pageBulkExportService = new PageBulkExportService(crowi);
+}

+ 8 - 9
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -18,7 +18,7 @@ import { Attachment } from '~/server/models';
 import type { ActivityDocument } from '~/server/models/activity';
 import type { ActivityDocument } from '~/server/models/activity';
 import type { PageModel, PageDocument } from '~/server/models/page';
 import type { PageModel, PageDocument } from '~/server/models/page';
 import Subscription from '~/server/models/subscription';
 import Subscription from '~/server/models/subscription';
-import type { IAwsMultipartUploader } from '~/server/service/file-uploader/aws/multipart-upload';
+import type { IGcsMultipartUploader } from '~/server/service/file-uploader/gcs/multipart-upload';
 import { preNotifyService } from '~/server/service/pre-notify';
 import { preNotifyService } from '~/server/service/pre-notify';
 import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
@@ -45,8 +45,8 @@ class PageBulkExportService {
 
 
   activityEvent: any;
   activityEvent: any;
 
 
-  // multipart upload part size
-  partSize = 5 * 1024 * 1024; // 5MB
+  // multipart upload max part size
+  maxPartSize = 5 * 1024 * 1024; // 5MB
 
 
   pageBatchSize = 100;
   pageBatchSize = 100;
 
 
@@ -71,11 +71,11 @@ class PageBulkExportService {
     const pagesReadable = this.getPageReadable(basePagePath);
     const pagesReadable = this.getPageReadable(basePagePath);
     const zipArchiver = this.setUpZipArchiver();
     const zipArchiver = this.setUpZipArchiver();
     const pagesWritable = this.getPageWritable(zipArchiver);
     const pagesWritable = this.getPageWritable(zipArchiver);
-    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.partSize);
+    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
 
 
     // init multipart upload
     // init multipart upload
     // TODO: Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
     // TODO: Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
-    const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
+    const multipartUploader: IGcsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey, this.maxPartSize);
     let pageBulkExportJob: PageBulkExportJobDocument & HasObjectId;
     let pageBulkExportJob: PageBulkExportJobDocument & HasObjectId;
     if (multipartUploader == null) {
     if (multipartUploader == null) {
       throw Error('Multipart upload not available for configured file upload type');
       throw Error('Multipart upload not available for configured file upload type');
@@ -106,10 +106,9 @@ class PageBulkExportService {
   }
   }
 
 
   private async handleExportErrorInStream(
   private async handleExportErrorInStream(
-      err: Error | null, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, multipartUploader: IAwsMultipartUploader,
+      err: Error | null, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, multipartUploader: IGcsMultipartUploader,
   ): Promise<void> {
   ): Promise<void> {
     if (err != null) {
     if (err != null) {
-      logger.error(err);
       await multipartUploader.abortUpload();
       await multipartUploader.abortUpload();
       await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED);
       await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED);
     }
     }
@@ -184,7 +183,7 @@ class PageBulkExportService {
   }
   }
 
 
   private getMultipartUploadWritable(
   private getMultipartUploadWritable(
-      multipartUploader: IAwsMultipartUploader,
+      multipartUploader: IGcsMultipartUploader,
       pageBulkExportJob: PageBulkExportJobDocument,
       pageBulkExportJob: PageBulkExportJobDocument,
       attachment: IAttachmentDocument,
       attachment: IAttachmentDocument,
       activityParameters: ActivityParameters,
       activityParameters: ActivityParameters,
@@ -194,7 +193,7 @@ class PageBulkExportService {
     return new Writable({
     return new Writable({
       write: async(part: Buffer, encoding, callback) => {
       write: async(part: Buffer, encoding, callback) => {
         try {
         try {
-          await multipartUploader.uploadPart(part, partNumber);
+          await multipartUploader.uploadPart(part, partNumber, this.maxPartSize);
           partNumber += 1;
           partNumber += 1;
           // First aid to prevent unexplained memory leaks
           // First aid to prevent unexplained memory leaks
           logger.info('global.gc() invoked.');
           logger.info('global.gc() invoked.');

+ 2 - 1
apps/app/src/server/service/file-uploader/aws/index.ts

@@ -24,6 +24,7 @@ import {
 } from '../file-uploader';
 } from '../file-uploader';
 import { ContentHeaders } from '../utils';
 import { ContentHeaders } from '../utils';
 
 
+import type { IAwsMultipartUploader } from './multipart-upload';
 import { AwsMultipartUploader } from './multipart-upload';
 import { AwsMultipartUploader } from './multipart-upload';
 
 
 
 
@@ -88,7 +89,7 @@ const getFilePathOnStorage = (attachment: IAttachmentDocument) => {
 };
 };
 
 
 export interface IAwsFileUploader {
 export interface IAwsFileUploader {
-  createMultipartUploader: (uploadKey: string) => AwsMultipartUploader
+  createMultipartUploader: (uploadKey: string) => IAwsMultipartUploader
 }
 }
 
 
 // TODO: rewrite this module to be a type-safe implementation
 // TODO: rewrite this module to be a type-safe implementation

+ 20 - 15
apps/app/src/server/service/file-uploader/aws/multipart-upload.ts

@@ -17,10 +17,10 @@ enum UploadStatus {
 // Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
 // Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
 export interface IAwsMultipartUploader {
 export interface IAwsMultipartUploader {
   initUpload(): Promise<void>;
   initUpload(): Promise<void>;
-  uploadPart(body: Buffer, partNumber: number): Promise<void>;
+  uploadPart(part: Buffer, partNumber: number): Promise<void>;
   completeUpload(): Promise<void>;
   completeUpload(): Promise<void>;
   abortUpload(): Promise<void>;
   abortUpload(): Promise<void>;
-  uploadId: string | undefined;
+  uploadId: string;
   getUploadedFileSize(): Promise<number>;
   getUploadedFileSize(): Promise<number>;
 }
 }
 
 
@@ -44,7 +44,7 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
 
 
   private currentStatus: UploadStatus = UploadStatus.BEFORE_INIT;
   private currentStatus: UploadStatus = UploadStatus.BEFORE_INIT;
 
 
-  private _uploadedFileSize: number | undefined;
+  private _uploadedFileSize = 0;
 
 
   constructor(s3Client: S3Client, bucket: string | undefined, uploadKey: string) {
   constructor(s3Client: S3Client, bucket: string | undefined, uploadKey: string) {
     this.s3Client = s3Client;
     this.s3Client = s3Client;
@@ -52,7 +52,8 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
     this.uploadKey = uploadKey;
     this.uploadKey = uploadKey;
   }
   }
 
 
-  get uploadId(): string | undefined {
+  get uploadId(): string {
+    if (this._uploadId == null) throw Error('UploadId is empty');
     return this._uploadId;
     return this._uploadId;
   }
   }
 
 
@@ -63,16 +64,19 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
       Bucket: this.bucket,
       Bucket: this.bucket,
       Key: this.uploadKey,
       Key: this.uploadKey,
     }));
     }));
+    if (response.UploadId == null) {
+      throw Error('UploadId is empty');
+    }
     this._uploadId = response.UploadId;
     this._uploadId = response.UploadId;
     this.currentStatus = UploadStatus.IN_PROGRESS;
     this.currentStatus = UploadStatus.IN_PROGRESS;
     logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
     logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
   }
   }
 
 
-  async uploadPart(body: Buffer, partNumber: number): Promise<void> {
+  async uploadPart(part: Buffer, partNumber: number): Promise<void> {
     this.validateUploadStatus(UploadStatus.IN_PROGRESS);
     this.validateUploadStatus(UploadStatus.IN_PROGRESS);
 
 
     const uploadMetaData = await this.s3Client.send(new UploadPartCommand({
     const uploadMetaData = await this.s3Client.send(new UploadPartCommand({
-      Body: body,
+      Body: part,
       Bucket: this.bucket,
       Bucket: this.bucket,
       Key: this.uploadKey,
       Key: this.uploadKey,
       PartNumber: partNumber,
       PartNumber: partNumber,
@@ -83,6 +87,7 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
       PartNumber: partNumber,
       PartNumber: partNumber,
       ETag: uploadMetaData.ETag,
       ETag: uploadMetaData.ETag,
     });
     });
+    this._uploadedFileSize += part.length;
   }
   }
 
 
   async completeUpload(): Promise<void> {
   async completeUpload(): Promise<void> {
@@ -113,15 +118,15 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
   }
   }
 
 
   async getUploadedFileSize(): Promise<number> {
   async getUploadedFileSize(): Promise<number> {
-    if (this._uploadedFileSize != null) return this._uploadedFileSize;
-
-    this.validateUploadStatus(UploadStatus.COMPLETED);
-    const headData = await this.s3Client.send(new HeadObjectCommand({
-      Bucket: this.bucket,
-      Key: this.uploadKey,
-    }));
-    this._uploadedFileSize = headData.ContentLength;
-    return this._uploadedFileSize ?? 0;
+    if (this.currentStatus === UploadStatus.COMPLETED) {
+      const headData = await this.s3Client.send(new HeadObjectCommand({
+        Bucket: this.bucket,
+        Key: this.uploadKey,
+      }));
+      if (headData.ContentLength == null) throw Error('Could not fetch uploaded file size');
+      this._uploadedFileSize = headData.ContentLength;
+    }
+    return this._uploadedFileSize;
   }
   }
 
 
   private validateUploadStatus(desiredStatus: UploadStatus): void {
   private validateUploadStatus(desiredStatus: UploadStatus): void {

+ 15 - 4
apps/app/src/server/service/file-uploader/gcs.ts → apps/app/src/server/service/file-uploader/gcs/index.ts

@@ -8,12 +8,14 @@ import {
 import type { IAttachmentDocument } from '~/server/models';
 import type { IAttachmentDocument } from '~/server/models';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
 
 
-import { configManager } from '../config-manager';
-
+import { configManager } from '../../config-manager';
 import {
 import {
   AbstractFileUploader, type TemporaryUrl, type SaveFileParam,
   AbstractFileUploader, type TemporaryUrl, type SaveFileParam,
-} from './file-uploader';
-import { ContentHeaders } from './utils';
+} from '../file-uploader';
+import { ContentHeaders } from '../utils';
+
+import type { IGcsMultipartUploader } from './multipart-upload';
+import { GcsMultipartUploader } from './multipart-upload';
 
 
 const logger = loggerFactory('growi:service:fileUploaderGcs');
 const logger = loggerFactory('growi:service:fileUploaderGcs');
 
 
@@ -62,6 +64,9 @@ async function isFileExists(file) {
   return res[0];
   return res[0];
 }
 }
 
 
+export interface IGcsFileUploader {
+  createMultipartUploader: (uploadKey: string) => IGcsMultipartUploader
+}
 
 
 // TODO: rewrite this module to be a type-safe implementation
 // TODO: rewrite this module to be a type-safe implementation
 class GcsFileUploader extends AbstractFileUploader {
 class GcsFileUploader extends AbstractFileUploader {
@@ -170,6 +175,12 @@ class GcsFileUploader extends AbstractFileUploader {
 
 
   }
   }
 
 
+  createMultipartUploader(uploadKey: string, maxPartSize: number) {
+    const gcs = getGcsInstance();
+    const myBucket = gcs.bucket(getGcsBucket());
+    return new GcsMultipartUploader(myBucket, uploadKey, maxPartSize);
+  }
+
 }
 }
 
 
 
 

+ 188 - 0
apps/app/src/server/service/file-uploader/gcs/multipart-upload.ts

@@ -0,0 +1,188 @@
+import type { Bucket, File } from '@google-cloud/storage';
+
+import loggerFactory from '~/utils/logger';
+
+import axios from 'src/utils/axios';
+
+const logger = loggerFactory('growi:services:fileUploaderGcs:multipartUploader');
+
+enum UploadStatus {
+  BEFORE_INIT,
+  IN_PROGRESS,
+  COMPLETED,
+  ABORTED
+}
+
+// Create abstract interface IMultipartUploader in https://redmine.weseek.co.jp/issues/135775
+export interface IGcsMultipartUploader {
+  initUpload(): Promise<void>;
+  uploadPart(body: Buffer, partNumber: number, maxPartSize?: number): Promise<void>;
+  completeUpload(): Promise<void>;
+  abortUpload(): Promise<void>;
+  uploadId: string;
+  getUploadedFileSize(): Promise<number>;
+}
+
+/**
+ * Class for uploading files to GCS using multipart upload.
+ * Create instance from GcsFileUploader class.
+ * Each instance can only be used for one multipart upload, and cannot be reused once completed.
+ * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
+ */
+export class GcsMultipartUploader implements IGcsMultipartUploader {
+
+  private uploadKey: string;
+
+  private _uploadId: string | undefined; // URL of GCS resumable upload
+
+  private file: File;
+
+  private currentStatus: UploadStatus = UploadStatus.BEFORE_INIT;
+
+  private _uploadedFileSize = 0;
+
+  // ref: https://cloud.google.com/storage/docs/performing-resumable-uploads?hl=en#chunked-upload
+  private readonly minPartSize = 256 * 1024; // 256KB
+
+  private readonly maxPartSize: number;
+
+  constructor(bucket: Bucket, uploadKey: string, maxPartSize: number) {
+    this.validateUploadPartSize(maxPartSize);
+    this.maxPartSize = maxPartSize;
+    this.uploadKey = uploadKey;
+    this.file = bucket.file(this.uploadKey);
+  }
+
+  get uploadId(): string {
+    if (this._uploadId == null) throw Error('UploadId is empty');
+    return this._uploadId;
+  }
+
+  async initUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.BEFORE_INIT);
+
+    const [uploadUrl] = await this.file.createResumableUpload();
+
+    this._uploadId = uploadUrl;
+    this.currentStatus = UploadStatus.IN_PROGRESS;
+    logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
+  }
+
+  async uploadPart(part: Buffer, partNumber: number): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+    this.validatePartSize(part.length);
+
+    if (part.length === this.maxPartSize) {
+      await this.uploadChunk(part);
+    }
+    else if (this.minPartSize < part.length && part.length < this.maxPartSize) {
+      const numOfMinPartSize = Math.floor(part.length / this.minPartSize);
+      const minPartSizeMultiplePartChunk = part.slice(0, numOfMinPartSize * this.minPartSize);
+      const lastPartChunk = part.slice(numOfMinPartSize * this.minPartSize);
+
+      await this.uploadChunk(minPartSizeMultiplePartChunk);
+      await this.uploadChunk(lastPartChunk, true);
+    }
+    else if (part.length < this.minPartSize) {
+      await this.uploadChunk(part, true);
+    }
+  }
+
+  async completeUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    // Send a request to complete the upload, in case the last uploadPart request did not request completion.
+    await axios.put(this.uploadId, {
+      headers: {
+        'Content-Range': `bytes */${this._uploadedFileSize}`,
+      },
+    });
+    this.currentStatus = UploadStatus.COMPLETED;
+    logger.info(`Multipart upload completed. Upload key: ${this.uploadKey}`);
+  }
+
+  async abortUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    try {
+      await axios.delete(this.uploadId);
+    }
+    catch (e) {
+      if (e.response?.status !== 499) {
+        throw e;
+      }
+    }
+    this.currentStatus = UploadStatus.ABORTED;
+    logger.info(`Multipart upload aborted. Upload key: ${this.uploadKey}`);
+  }
+
+  async getUploadedFileSize(): Promise<number> {
+    if (this.currentStatus === UploadStatus.COMPLETED) {
+      const [metadata] = await this.file.getMetadata();
+      this._uploadedFileSize = metadata.size;
+    }
+    return this._uploadedFileSize;
+  }
+
+  private uploadChunk = async(_part, isLastUpload = false) => {
+    this.validateUploadPartSize(_part.length);
+
+    const range = isLastUpload
+      ? `bytes ${this._uploadedFileSize}-${this._uploadedFileSize + _part.length - 1}/${this._uploadedFileSize + _part.length}`
+      : `bytes ${this._uploadedFileSize}-${this._uploadedFileSize + _part.length - 1}/*`;
+
+    try {
+      await axios.put(this.uploadId, _part, {
+        headers: {
+          'Content-Range': `${range}`,
+        },
+      });
+    }
+    catch (e) {
+      if (e.response?.status !== 308) {
+        throw e;
+      }
+    }
+    this._uploadedFileSize += _part.length;
+  };
+
+  // If part size is larger than the minimal part size, it is required to be a multiple of the minimal part size
+  // ref: https://cloud.google.com/storage/docs/performing-resumable-uploads?hl=en#chunked-upload
+  private validateUploadPartSize(uploadPartSize: number) {
+    if (uploadPartSize > this.minPartSize && uploadPartSize % this.minPartSize !== 0) throw Error(`uploadPartSize must be a multiple of ${this.minPartSize}`);
+  }
+
+  private validatePartSize(partSize) {
+    if (partSize > this.maxPartSize) throw Error(`partSize must be less than or equal to ${this.maxPartSize}`);
+  }
+
+  private validateUploadStatus(desiredStatus: UploadStatus): void {
+    if (desiredStatus === this.currentStatus) return;
+
+    let errMsg: string | null = null;
+
+    if (this.currentStatus === UploadStatus.COMPLETED) {
+      errMsg = 'Multipart upload has already been completed';
+    }
+
+    if (this.currentStatus === UploadStatus.ABORTED) {
+      errMsg = 'Multipart upload has been aborted';
+    }
+
+    // currentStatus is IN_PROGRESS or BEFORE_INIT
+
+    if (this.currentStatus === UploadStatus.IN_PROGRESS && desiredStatus === UploadStatus.BEFORE_INIT) {
+      errMsg = 'Multipart upload has already been initiated';
+    }
+
+    if (this.currentStatus === UploadStatus.BEFORE_INIT && desiredStatus === UploadStatus.IN_PROGRESS) {
+      errMsg = 'Multipart upload not initiated';
+    }
+
+    if (errMsg != null) {
+      logger.error(errMsg);
+      throw Error(errMsg);
+    }
+  }
+
+}