Просмотр исходного кода

make resuming suspended bulk export job possible

Futa Arai 1 год назад
Родитель
Сommit
35cb9992a4

+ 6 - 4
apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts

@@ -11,7 +11,9 @@ export const PageBulkExportFormat = {
 export type PageBulkExportFormat = typeof PageBulkExportFormat[keyof typeof PageBulkExportFormat]
 
 export const PageBulkExportJobStatus = {
-  inProgress: 'inProgress',
+  initializing: 'initializing', // preparing for export
+  exporting: 'exporting', // exporting to fs
+  uploading: 'uploading', // uploading to cloud storage
   completed: 'completed',
   failed: 'failed',
 } as const;
@@ -21,10 +23,10 @@ export type PageBulkExportJobStatus = typeof PageBulkExportJobStatus[keyof typeo
 export interface IPageBulkExportJob {
   user: Ref<IUser>, // user that started export job
   page: Ref<IPage>, // the root page of page tree to export
-  lastUploadedPagePath: string, // the path of page that was uploaded last
-  uploadId: string | null, // upload ID of multipart upload of S3/GCS
+  lastExportedPagePath?: string, // the path of page that was uploaded last
+  uploadId?: string, // upload ID of multipart upload of S3/GCS
   format: PageBulkExportFormat,
-  completedAt: Date | null, // the date at which job was completed
+  completedAt?: Date, // the date at which job was completed
   attachment?: Ref<IAttachment>,
   status: PageBulkExportJobStatus,
 }

+ 2 - 2
apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts

@@ -12,13 +12,13 @@ export type PageBulkExportJobModel = Model<PageBulkExportJobDocument>
 const pageBulkExportJobSchema = new Schema<PageBulkExportJobDocument>({
   user: { type: Schema.Types.ObjectId, ref: 'User', required: true },
   page: { type: Schema.Types.ObjectId, ref: 'Page', required: true },
-  lastUploadedPagePath: { type: String },
+  lastExportedPagePath: { type: String },
   uploadId: { type: String, unique: true, sparse: true },
   format: { type: String, enum: Object.values(PageBulkExportFormat), required: true },
   completedAt: { type: Date },
   attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' },
   status: {
-    type: String, enum: Object.values(PageBulkExportJobStatus), required: true, default: PageBulkExportJobStatus.inProgress,
+    type: String, enum: Object.values(PageBulkExportJobStatus), required: true, default: PageBulkExportJobStatus.initializing,
   },
 }, { timestamps: true });
 

+ 119 - 91
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,7 +1,6 @@
 import fs from 'fs';
 import path from 'path';
-import type { Readable } from 'stream';
-import { Writable, pipeline } from 'stream';
+import { Writable } from 'stream';
 import { pipeline as pipelinePromise } from 'stream/promises';
 
 
@@ -30,6 +29,7 @@ import loggerFactory from '~/utils/logger';
 import { PageBulkExportFormat, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
 import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
 import PageBulkExportJob from '../models/page-bulk-export-job';
+import type { PageBulkExportPageSnapshotDocument } from '../models/page-bulk-export-page-snapshot';
 import PageBulkExportPageSnapshot from '../models/page-bulk-export-page-snapshot';
 
 
@@ -65,142 +65,137 @@ class PageBulkExportService {
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
   tmpOutputRootDir = '/tmp';
 
+  pageModel: PageModel;
+
   constructor(crowi) {
     this.crowi = crowi;
     this.activityEvent = crowi.event('activity');
+    this.pageModel = mongoose.model<IPage, PageModel>('Page');
   }
 
   async createAndStartPageBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
-    const Page = mongoose.model<IPage, PageModel>('Page');
-    const basePage = await Page.findByPathAndViewer(basePagePath, currentUser, null, true);
+    const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
 
     if (basePage == null) {
       throw new Error('Base page not found or not accessible');
     }
 
     const format = PageBulkExportFormat.md;
-    const pageBulkExportJobProperties = {
-      user: currentUser, page: basePage, format, status: PageBulkExportJobStatus.inProgress,
-    };
-    const duplicatePageBulkExportJobInProgress: PageBulkExportJobDocument & HasObjectId | null = await PageBulkExportJob.findOne(pageBulkExportJobProperties);
+    const duplicatePageBulkExportJobInProgress: PageBulkExportJobDocument & HasObjectId | null = await PageBulkExportJob.findOne({
+      user: currentUser,
+      page: basePage,
+      format,
+      $or: [
+        { status: PageBulkExportJobStatus.initializing }, { status: PageBulkExportJobStatus.exporting }, { status: PageBulkExportJobStatus.uploading },
+      ],
+    });
     if (duplicatePageBulkExportJobInProgress != null) {
       throw new DuplicateBulkExportJobError();
     }
-    const pageBulkExportJob: PageBulkExportJobDocument & HasObjectId = await PageBulkExportJob.create(pageBulkExportJobProperties);
+    const pageBulkExportJob: PageBulkExportJobDocument & HasObjectId = await PageBulkExportJob.create({
+      user: currentUser, page: basePage, format, status: PageBulkExportJobStatus.initializing,
+    });
 
     await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
 
-    this.bulkExportWithBasePagePath(basePagePath, currentUser, activityParameters, pageBulkExportJob);
+    this.executePageBulkExportJob(currentUser, activityParameters, pageBulkExportJob);
   }
 
-  private async bulkExportWithBasePagePath(
-      basePagePath: string, currentUser, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument & HasObjectId,
+  private async executePageBulkExportJob(
+      currentUser, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument & HasObjectId,
   ): Promise<void> {
-    const timeStamp = (new Date()).getTime();
-    const exportName = `page-bulk-export-${timeStamp}`;
-
-    // export pages to fs temporarily
-    const tmpOutputDir = `${this.tmpOutputRootDir}/${exportName}`;
     try {
-      await this.exportPagesToFS(basePagePath, tmpOutputDir, currentUser);
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.initializing) {
+        await this.createPageSnapshots(currentUser, pageBulkExportJob);
+        pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
+        await pageBulkExportJob.save();
+      }
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.exporting) {
+        await this.exportPagesToFS(pageBulkExportJob);
+        pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
+        await pageBulkExportJob.save();
+      }
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) {
+        await this.compressAndUpload(currentUser, pageBulkExportJob);
+      }
     }
     catch (err) {
-      await this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir);
-      return;
-    }
-
-    const pageArchiver = this.setUpPageArchiver();
-    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
-
-    const originalName = `${exportName}.${this.compressExtension}`;
-    const attachment = Attachment.createWithoutSave(null, currentUser, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
-    const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
-
-    // init multipart upload
-    const fileUploadService: FileUploader = this.crowi.fileUploadService;
-    const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
-    try {
-      await multipartUploader.initUpload();
-      pageBulkExportJob.uploadId = multipartUploader.uploadId;
-      await pageBulkExportJob.save;
-    }
-    catch (err) {
-      await this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir, multipartUploader);
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(false, activityParameters, pageBulkExportJob);
       return;
     }
 
-    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment, activityParameters, tmpOutputDir);
-
-    pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable,
-      err => this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir, multipartUploader));
-    pageArchiver.directory(tmpOutputDir, false);
-    pageArchiver.finalize();
+    await this.notifyExportResultAndCleanUp(true, activityParameters, pageBulkExportJob);
   }
 
   /**
    * Handles export failure with the following:
    * - notify the user of the failure
+   * - delete page snapshots
    * - remove the temporal output directory
-   * - abort multipart upload
+   * - set status of pageBulkExportJob to 'failed'
    */
-  // TODO: update completedAt of pageBulkExportJob, or add a failed status flag to it (https://redmine.weseek.co.jp/issues/78040)
-  private async handleExportError(
-      err: Error | null,
+  private async notifyExportResultAndCleanUp(
+      succeeded: boolean,
       activityParameters: ActivityParameters,
       pageBulkExportJob: PageBulkExportJobDocument,
-      tmpOutputDir: string,
-      multipartUploader?: IMultipartUploader,
   ): Promise<void> {
-    if (err != null) {
-      logger.error(err);
-
-      const results = await Promise.allSettled([
-        this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED),
-        fs.promises.rm(tmpOutputDir, { recursive: true, force: true }),
-        multipartUploader?.abortUpload(),
-      ]);
-      results.forEach((result) => {
-        if (result.status === 'rejected') logger.error(result.reason);
-      });
-    }
+    const action = succeeded ? SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED : SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED;
+    pageBulkExportJob.status = succeeded ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
+    const results = await Promise.allSettled([
+      this.notifyExportResult(activityParameters, pageBulkExportJob, action),
+      PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
+      fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
+      pageBulkExportJob.save(),
+    ]);
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
   }
 
-  private async exportPagesToFS(basePagePath: string, outputDir: string, currentUser): Promise<void> {
-    const pagesReadable = await this.getPageReadable(basePagePath, currentUser, true);
-    const pagesWritable = this.getPageWritable(outputDir);
+  /**
+   * Export pages to the file system before compressing and uploading to the cloud storage.
+   * The export will resume from the last exported page if the process was interrupted.
+   */
+  private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+    const pageSnapshotsReadable = PageBulkExportPageSnapshot
+      .find({ pageBulkExportJob, path: { $gt: pageBulkExportJob.lastExportedPagePath } })
+      .populate('revision').sort({ path: 1 }).lean()
+      .cursor({ batchSize: this.pageBatchSize });
+    const pagesWritable = this.getPageWritable(pageBulkExportJob);
 
-    return pipelinePromise(pagesReadable, pagesWritable);
+    return pipelinePromise(pageSnapshotsReadable, pagesWritable);
   }
 
   /**
-   * Get a Readable of all the pages under the specified path, including the root page.
+   * Create a snapshot for each page that is to be exported in the pageBulkExportJob
    */
-  private async getPageReadable(basePagePath: string, currentUser, populateRevision = false): Promise<Readable> {
-    const Page = mongoose.model<IPage, PageModel>('Page');
-    const { PageQueryBuilder } = Page;
+  private async createPageSnapshots(currentUser, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+    // if the process of creating snapshots was interrupted, delete the snapshots and create from the start
+    await PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob });
 
-    const builder = await new PageQueryBuilder(Page.find())
-      .addConditionToListWithDescendants(basePagePath)
-      .addViewerCondition(currentUser);
-
-    if (populateRevision) {
-      builder.query = builder.query.populate('revision');
+    const basePage = await this.pageModel.findById(pageBulkExportJob.page);
+    if (basePage == null) {
+      throw new Error('Base page not found');
     }
 
-    return builder
+    // create a Readable for pages to be exported
+    const { PageQueryBuilder } = this.pageModel;
+    const builder = await new PageQueryBuilder(this.pageModel.find())
+      .addConditionToListWithDescendants(basePage.path)
+      .addViewerCondition(currentUser);
+    const pagesReadable = builder
       .query
       .lean()
       .cursor({ batchSize: this.pageBatchSize });
-  }
 
-  private async createPageSnapshots(basePagePath: string, currentUser, pageBulkExportJob: PageBulkExportJobDocument) {
-    const pagesReadable = await this.getPageReadable(basePagePath, currentUser);
-    const pageSnapshotwritable = new Writable({
+    // create a Writable that creates a snapshot for each page
+    const pageSnapshotsWritable = new Writable({
       objectMode: true,
       write: async(page: PageDocument, encoding, callback) => {
         try {
           await PageBulkExportPageSnapshot.create({
-            pageBulkExportJob: pageBulkExportJob._id,
+            pageBulkExportJob,
             path: page.path,
             revision: page.revision,
           });
@@ -212,16 +207,18 @@ class PageBulkExportService {
         callback();
       },
     });
-    await pipelinePromise(pagesReadable, pageSnapshotwritable);
+
+    await pipelinePromise(pagesReadable, pageSnapshotsWritable);
   }
 
   /**
    * Get a Writable that writes the page body temporarily to fs
    */
-  private getPageWritable(outputDir: string): Writable {
+  private getPageWritable(pageBulkExportJob: PageBulkExportJobDocument): Writable {
+    const outputDir = this.getTmpOutputDir(pageBulkExportJob);
     return new Writable({
       objectMode: true,
-      write: async(page: PageDocument, encoding, callback) => {
+      write: async(page: PageBulkExportPageSnapshotDocument, encoding, callback) => {
         try {
           const revision = page.revision;
 
@@ -233,6 +230,8 @@ class PageBulkExportService {
 
             await fs.promises.mkdir(fileOutputParentPath, { recursive: true });
             await fs.promises.writeFile(fileOutputPath, markdownBody);
+            pageBulkExportJob.lastExportedPagePath = page.path;
+            await pageBulkExportJob.save();
           }
         }
         catch (err) {
@@ -262,8 +261,6 @@ class PageBulkExportService {
       multipartUploader: IMultipartUploader,
       pageBulkExportJob: PageBulkExportJobDocument,
       attachment: IAttachmentDocument,
-      activityParameters: ActivityParameters,
-      tmpOutputDir: string,
   ): Writable {
     let partNumber = 1;
 
@@ -277,6 +274,7 @@ class PageBulkExportService {
           gc();
         }
         catch (err) {
+          await multipartUploader.abortUpload();
           callback(err);
           return;
         }
@@ -293,9 +291,6 @@ class PageBulkExportService {
           pageBulkExportJob.completedAt = new Date();
           pageBulkExportJob.attachment = attachment._id;
           await pageBulkExportJob.save();
-
-          await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED);
-          await fs.promises.rm(tmpOutputDir, { recursive: true, force: true });
         }
         catch (err) {
           callback(err);
@@ -306,6 +301,39 @@ class PageBulkExportService {
     });
   }
 
+  private async compressAndUpload(currentUser, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+    const pageArchiver = this.setUpPageArchiver();
+    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
+
+    const originalName = `${pageBulkExportJob._id}.${this.compressExtension}`;
+    const attachment = Attachment.createWithoutSave(null, currentUser, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
+    const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
+
+    const fileUploadService: FileUploader = this.crowi.fileUploadService;
+    // if the process of uploading was interrupted, delete and start from the start
+    if (pageBulkExportJob.uploadId != null) {
+      await fileUploadService.abortExistingMultipartUpload(uploadKey, pageBulkExportJob.uploadId);
+    }
+
+    // init multipart upload
+    const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
+    await multipartUploader.initUpload();
+    pageBulkExportJob.uploadId = multipartUploader.uploadId;
+    await pageBulkExportJob.save();
+
+    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment);
+
+    const compressAndUploadPromise = pipelinePromise(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable);
+    pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
+    pageArchiver.finalize();
+
+    await compressAndUploadPromise;
+  }
+
+  private getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string {
+    return `${this.tmpOutputRootDir}/page-bulk-export/${pageBulkExportJob._id}`;
+  }
+
   private async notifyExportResult(
       activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType,
   ) {

+ 9 - 0
apps/app/src/server/service/file-uploader/aws/index.ts

@@ -8,6 +8,7 @@ import {
   DeleteObjectCommand,
   ListObjectsCommand,
   ObjectCannedACL,
+  AbortMultipartUploadCommand,
 } from '@aws-sdk/client-s3';
 import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
 import urljoin from 'url-join';
@@ -236,6 +237,14 @@ class AwsFileUploader extends AbstractFileUploader {
     return new AwsMultipartUploader(s3, getS3Bucket(), uploadKey, maxPartSize);
   }
 
+  override async abortExistingMultipartUpload(uploadKey: string, uploadId: string) {
+    await S3Factory().send(new AbortMultipartUploadCommand({
+      Bucket: getS3Bucket(),
+      Key: uploadKey,
+      UploadId: uploadId,
+    }));
+  }
+
 }
 
 module.exports = (crowi) => {

+ 0 - 6
apps/app/src/server/service/file-uploader/aws/multipart-uploader.ts

@@ -12,12 +12,6 @@ const logger = loggerFactory('growi:services:fileUploaderAws:multipartUploader')
 
 export type IAwsMultipartUploader = IMultipartUploader
 
-/**
- * Class for uploading files to S3 using multipart upload.
- * Create instance from AwsFileUploader class.
- * Each instance can only be used for one multipart upload, and cannot be reused once completed.
- * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
- */
 export class AwsMultipartUploader extends MultipartUploader implements IAwsMultipartUploader {
 
   private bucket: string | undefined;

+ 11 - 0
apps/app/src/server/service/file-uploader/file-uploader.ts

@@ -42,6 +42,7 @@ export interface FileUploader {
   findDeliveryFile(attachment: IAttachmentDocument): Promise<NodeJS.ReadableStream>,
   generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise<TemporaryUrl>,
   createMultipartUploader: (uploadKey: string, maxPartSize: number) => MultipartUploader,
+  abortExistingMultipartUpload: (uploadKey: string, uploadId: string) => Promise<void>
 }
 
 export abstract class AbstractFileUploader implements FileUploader {
@@ -154,10 +155,20 @@ export abstract class AbstractFileUploader implements FileUploader {
     return ResponseMode.RELAY;
   }
 
+  /**
+   * Create a multipart uploader for cloud storage
+   */
   createMultipartUploader(uploadKey: string, maxPartSize: number): MultipartUploader {
     throw new Error('Multipart upload not available for file upload type');
   }
 
+  /**
+   * Abort an existing multipart upload without creating a MultipartUploader instance
+   */
+  abortExistingMultipartUpload(uploadKey: string, uploadId: string): Promise<void> {
+    throw new Error('Multipart upload not available for file upload type');
+  }
+
   /**
    * Respond to the HTTP request.
    */

+ 12 - 1
apps/app/src/server/service/file-uploader/gcs/index.ts

@@ -1,4 +1,5 @@
 import { Storage } from '@google-cloud/storage';
+import axios from 'axios';
 import urljoin from 'url-join';
 
 import type Crowi from '~/server/crowi';
@@ -14,7 +15,6 @@ import {
 } from '../file-uploader';
 import { ContentHeaders } from '../utils';
 
-import type { IGcsMultipartUploader } from './multipart-uploader';
 import { GcsMultipartUploader } from './multipart-uploader';
 
 const logger = loggerFactory('growi:service:fileUploaderGcs');
@@ -177,6 +177,17 @@ class GcsFileUploader extends AbstractFileUploader {
     return new GcsMultipartUploader(myBucket, uploadKey, maxPartSize);
   }
 
+  override async abortExistingMultipartUpload(uploadKey: string, uploadId: string) {
+    try {
+      await axios.delete(uploadId);
+    }
+    catch (e) {
+      if (e.response?.status !== 499) {
+        throw e;
+      }
+    }
+  }
+
 }
 
 

+ 0 - 6
apps/app/src/server/service/file-uploader/gcs/multipart-uploader.ts

@@ -12,12 +12,6 @@ const logger = loggerFactory('growi:services:fileUploaderGcs:multipartUploader')
 
 export type IGcsMultipartUploader = IMultipartUploader
 
-/**
- * Class for uploading files to GCS using multipart upload.
- * Create instance from GcsFileUploader class.
- * Each instance can only be used for one multipart upload, and cannot be reused once completed.
- * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
- */
 export class GcsMultipartUploader extends MultipartUploader implements IGcsMultipartUploader {
 
   private file: File;

+ 4 - 0
apps/app/src/server/service/file-uploader/multipart-uploader.ts

@@ -18,6 +18,10 @@ export interface IMultipartUploader {
   getUploadedFileSize(): Promise<number>;
 }
 
+/**
+ * Abstract class for uploading files to cloud storage using multipart upload.
+ * Each instance is equivalent to a single multipart upload, and cannot be reused once completed.
+ */
 export abstract class MultipartUploader implements IMultipartUploader {
 
   protected uploadKey: string;