Просмотр исходного кода

abort multipart upload on export failure

Futa Arai 2 лет назад
Родитель
Сommit
d35d7748f1

+ 25 - 30
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -24,8 +24,6 @@ class PageBulkExportService {
 
   crowi: any;
 
-  zipArchiver: Archiver;
-
   // multipart upload part size
   partSize = 5 * 1024 * 1024; // 5MB
 
@@ -43,23 +41,32 @@ class PageBulkExportService {
     const zipArchiver = this.setUpZipArchiver();
     const pagesWritable = this.getPageWritable(zipArchiver);
 
+    // init multipart upload
+    const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
     try {
-      const multipartUploadWritable = await this.getMultipartUploadWritable(uploadKey);
-
-      // Cannot directly pipe from pagesWritable to zipArchiver due to how the 'append' method works.
-      // Hence, execution of two pipelines is required.
-      pipeline(pageReadableStream, pagesWritable, this.handleStreamError);
-      pipeline(zipArchiver, multipartUploadWritable, this.handleStreamError);
+      if (multipartUploader == null) {
+        throw Error('Multipart upload not available for configured file upload type');
+      }
+      await multipartUploader.initUpload();
     }
     catch (err) {
-      logger.error(err);
-      // TODO: notify failure to client: https://redmine.weseek.co.jp/issues/78037
+      await this.handleExportError(err, multipartUploader);
+      return;
     }
+    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader);
+
+    // Cannot directly pipe from pagesWritable to zipArchiver due to how the 'append' method works.
+    // Hence, execution of two pipelines is required.
+    pipeline(pageReadableStream, pagesWritable, err => this.handleExportError(err, multipartUploader));
+    pipeline(zipArchiver, multipartUploadWritable, err => this.handleExportError(err, multipartUploader));
   }
 
-  handleStreamError(err: Error | null): void {
+  async handleExportError(err: Error | null, multipartUploader: IAwsMultipartUploader | undefined): Promise<void> {
     if (err != null) {
       logger.error(err);
+      if (multipartUploader != null) {
+        await multipartUploader.abortUpload();
+      }
       // TODO: notify failure to client: https://redmine.weseek.co.jp/issues/78037
     }
   }
@@ -128,38 +135,27 @@ class PageBulkExportService {
       if (err.code === 'ENOENT') logger.error(err);
       else throw err;
     });
-    // good practice to catch this error explicitly
-    zipArchiver.on('error', (err) => { throw err });
 
     return zipArchiver;
   }
 
-  private async getMultipartUploadWritable(uploadKey: string): Promise<Writable> {
-    const multipartUploader: IAwsMultipartUploader | undefined = this.crowi?.fileUploadService?.createMultipartUploader(uploadKey);
-
-    if (multipartUploader == null) {
-      throw Error('Multipart upload not available for configured file upload type');
-    }
-
+  private getMultipartUploadWritable(multipartUploader: IAwsMultipartUploader): Writable {
     let partNumber = 1;
     // Buffer to store stream data before upload. When the buffer is full, it will be uploaded as a part.
     let part = Buffer.alloc(this.partSize);
     let filledPartSize = 0;
 
-    await multipartUploader.initUpload();
-    logger.info(`Multipart upload initialized. Upload key: ${uploadKey}`);
-
     return new Writable({
       write: async(chunk: Buffer, encoding, callback) => {
         try {
           let offset = 0;
           while (offset < chunk.length) {
-          // The data size to add to buffer.
-          // - If the remaining chunk size is smaller than the remaining buffer size:
-          //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
-          // - If the remaining chunk size is larger than the remaining buffer size:
-          //     - Fill the buffer, and upload => dataSize is the remaining buffer size
-          //     - The remaining chunk after upload will be added to buffer in the next iteration
+            // The data size to add to buffer.
+            // - If the remaining chunk size is smaller than the remaining buffer size:
+            //     - Add all of the remaining chunk to buffer => dataSize is the remaining chunk size
+            // - If the remaining chunk size is larger than the remaining buffer size:
+            //     - Fill the buffer, and upload => dataSize is the remaining buffer size
+            //     - The remaining chunk after upload will be added to buffer in the next iteration
             const dataSize = Math.min(this.partSize - filledPartSize, chunk.length - offset);
             // Add chunk data to buffer
             // buffer = Buffer.concat([buffer, chunk.slice(offset, offset + dataSize)]);
@@ -194,7 +190,6 @@ class PageBulkExportService {
             await multipartUploader.uploadPart(finalPart, partNumber);
           }
           await multipartUploader.completeUpload();
-          logger.info(`Multipart upload completed. Upload key: ${uploadKey}`);
         }
         catch (err) {
           callback(err);

+ 25 - 3
apps/app/src/server/service/file-uploader/aws/multipart-upload.ts

@@ -1,5 +1,5 @@
 import {
-  CreateMultipartUploadCommand, UploadPartCommand, type S3Client, CompleteMultipartUploadCommand,
+  CreateMultipartUploadCommand, UploadPartCommand, type S3Client, CompleteMultipartUploadCommand, AbortMultipartUploadCommand,
 } from '@aws-sdk/client-s3';
 
 import loggerFactory from '~/utils/logger';
@@ -10,12 +10,14 @@ enum UploadStatus {
   BEFORE_INIT,
   IN_PROGRESS,
   COMPLETED,
+  ABORTED
 }
 
 export interface IAwsMultipartUploader {
   initUpload(): Promise<void>;
   uploadPart(body: Buffer, partNumber: number): Promise<void>;
   completeUpload(): Promise<void>;
+  abortUpload(): Promise<void>;
 }
 
 /**
@@ -53,6 +55,7 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
     }));
     this.uploadId = response.UploadId;
     this.currentStatus = UploadStatus.IN_PROGRESS;
+    logger.info(`Multipart upload initialized. Upload key: ${this.uploadKey}`);
   }
 
   async uploadPart(body: Buffer, partNumber: number): Promise<void> {
@@ -84,6 +87,19 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
       },
     }));
     this.currentStatus = UploadStatus.COMPLETED;
+    logger.info(`Multipart upload completed. Upload key: ${this.uploadKey}`);
+  }
+
+  async abortUpload(): Promise<void> {
+    this.validateUploadStatus(UploadStatus.IN_PROGRESS);
+
+    await this.s3Client.send(new AbortMultipartUploadCommand({
+      Bucket: this.bucket,
+      Key: this.uploadKey,
+      UploadId: this.uploadId,
+    }));
+    this.currentStatus = UploadStatus.ABORTED;
+    logger.info(`Multipart upload aborted. Upload key: ${this.uploadKey}`);
   }
 
   private validateUploadStatus(desiredStatus: UploadStatus): void {
@@ -95,11 +111,17 @@ export class AwsMultipartUploader implements IAwsMultipartUploader {
       errMsg = 'Multipart upload has already been completed';
     }
 
-    if (desiredStatus === UploadStatus.BEFORE_INIT) {
+    if (this.currentStatus === UploadStatus.ABORTED) {
+      errMsg = 'Multipart upload has been aborted';
+    }
+
+    // currentStatus is IN_PROGRESS or BEFORE_INIT
+
+    if (this.currentStatus === UploadStatus.IN_PROGRESS && desiredStatus === UploadStatus.BEFORE_INIT) {
       errMsg = 'Multipart upload has already been initiated';
     }
 
-    if (desiredStatus === UploadStatus.IN_PROGRESS) {
+    if (this.currentStatus === UploadStatus.BEFORE_INIT && desiredStatus === UploadStatus.IN_PROGRESS) {
       errMsg = 'Multipart upload not initiated';
     }