Просмотр исходного кода

Merge pull request #8963 from weseek/feat/78040-135788-resume-suspended-bulk-export-job

Feat/78040 135788 resume suspended bulk export job
Yuki Takei 1 год назад
Родитель
Сommit
2591f224e0

+ 2 - 1
apps/app/public/static/locales/en_US/translation.json

@@ -640,7 +640,8 @@
     "markdown": "Markdown",
     "choose_export_format": "Select export format",
     "bulk_export_started": "Please wait a moment...",
-    "bulk_export_download_expired": "Download period has expired"
+    "bulk_export_download_expired": "Download period has expired",
+    "duplicate_bulk_export_job_error": "Export for the same page and its children is in progress"
   },
   "message": {
     "successfully_connected": "Successfully Connected!",

+ 2 - 1
apps/app/public/static/locales/fr_FR/translation.json

@@ -634,7 +634,8 @@
     "markdown": "Markdown",
     "choose_export_format": "Sélectionnez le format d'exportation",
     "bulk_export_started": "Patientez s'il-vous-plait...",
-    "bulk_export_download_expired": "La période de téléchargement a expiré"
+    "bulk_export_download_expired": "La période de téléchargement a expiré",
+    "duplicate_bulk_export_job_error": "L'export pour la même page et ses enfants est en cours"
   },
   "message": {
     "successfully_connected": "Connecté!",

+ 2 - 1
apps/app/public/static/locales/ja_JP/translation.json

@@ -673,7 +673,8 @@
     "markdown": "マークダウン",
     "choose_export_format": "エクスポート形式を選択してください",
     "bulk_export_started": "ただいま準備中です...",
-    "bulk_export_download_expired": "ダウンロード期限が切れました"
+    "bulk_export_download_expired": "ダウンロード期限が切れました",
+    "duplicate_bulk_export_job_error": "既に同じページとその配下のエクスポートが進行中です"
   },
   "message": {
     "successfully_connected": "接続に成功しました!",

+ 2 - 1
apps/app/public/static/locales/zh_CN/translation.json

@@ -643,7 +643,8 @@
     "markdown": "Markdown",
     "choose_export_format": "选择导出格式",
     "bulk_export_started": "目前我们正在准备...",
-    "bulk_export_download_expired": "下载期限已过"
+    "bulk_export_download_expired": "下载期限已过",
+    "duplicate_bulk_export_job_error": "正在导出同一页面及其子页面"
   },
   "message": {
     "successfully_connected": "连接成功!",

+ 4 - 2
apps/app/src/features/page-bulk-export/client/components/PageBulkExportSelectModal.tsx

@@ -18,7 +18,9 @@ const PageBulkExportSelectModal = (): JSX.Element => {
       toastSuccess(t('page_export.bulk_export_started'));
     }
     catch (e) {
-      toastError(t('page_export.failed_to_export'));
+      // TODO: Enable cancel and restart of export if duplicate export exists (https://redmine.weseek.co.jp/issues/150418)
+      const errorCode = e?.[0].code ?? 'page_export.failed_to_export';
+      toastError(t(errorCode));
     }
     close();
   };
@@ -38,7 +40,7 @@ const PageBulkExportSelectModal = (): JSX.Element => {
               </small>
             </div>
             <div className="d-flex justify-content-center mt-2">
-              <button className="btn btn-primary" type="button" onClick={() => startBulkExport(PageBulkExportFormat.markdown)}>
+              <button className="btn btn-primary" type="button" onClick={() => startBulkExport(PageBulkExportFormat.md)}>
                 {t('page_export.markdown')}
               </button>
               <button className="btn btn-primary ms-2" type="button" onClick={() => startBulkExport(PageBulkExportFormat.pdf)}>PDF</button>

+ 17 - 5
apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts

@@ -4,26 +4,38 @@ import type {
 } from '@growi/core';
 
 export const PageBulkExportFormat = {
-  markdown: 'markdown',
+  md: 'md',
   pdf: 'pdf',
 } as const;
 
 export type PageBulkExportFormat = typeof PageBulkExportFormat[keyof typeof PageBulkExportFormat]
 
+export const PageBulkExportJobStatus = {
+  initializing: 'initializing', // preparing for export
+  exporting: 'exporting', // exporting to fs
+  uploading: 'uploading', // uploading to cloud storage
+  completed: 'completed',
+  failed: 'failed',
+} as const;
+
+export type PageBulkExportJobStatus = typeof PageBulkExportJobStatus[keyof typeof PageBulkExportJobStatus]
+
 export interface IPageBulkExportJob {
   user: Ref<IUser>, // user that started export job
   page: Ref<IPage>, // the root page of page tree to export
-  lastUploadedPagePath: string, // the path of page that was uploaded last
-  uploadId: string | null, // upload ID of multipart upload of S3/GCS
+  lastExportedPagePath?: string, // the path of page that was exported to the fs last
+  uploadId?: string, // upload ID of multipart upload of S3/GCS
+  uploadKey?: string, // upload key of multipart upload of S3/GCS
   format: PageBulkExportFormat,
-  completedAt: Date | null, // the date at which job was completed
+  completedAt?: Date, // the date at which job was completed
   attachment?: Ref<IAttachment>,
+  status: PageBulkExportJobStatus,
 }
 
 export interface IPageBulkExportJobHasId extends IPageBulkExportJob, HasObjectId {}
 
 // snapshot of page info to upload
-export interface IPageBulkExportPageInfo {
+export interface IPageBulkExportPageSnapshot {
   pageBulkExportJob: Ref<IPageBulkExportJob>,
   path: string, // page path when export was stared
   revision: Ref<IRevision>, // page revision when export was stared

+ 6 - 2
apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts

@@ -3,7 +3,7 @@ import { type Document, type Model, Schema } from 'mongoose';
 import { getOrCreateModel } from '~/server/util/mongoose-utils';
 
 import type { IPageBulkExportJob } from '../../interfaces/page-bulk-export';
-import { PageBulkExportFormat } from '../../interfaces/page-bulk-export';
+import { PageBulkExportFormat, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
 
 export interface PageBulkExportJobDocument extends IPageBulkExportJob, Document {}
 
@@ -12,11 +12,15 @@ export type PageBulkExportJobModel = Model<PageBulkExportJobDocument>
 const pageBulkExportJobSchema = new Schema<PageBulkExportJobDocument>({
   user: { type: Schema.Types.ObjectId, ref: 'User', required: true },
   page: { type: Schema.Types.ObjectId, ref: 'Page', required: true },
-  lastUploadedPagePath: { type: String },
+  lastExportedPagePath: { type: String },
   uploadId: { type: String, unique: true, sparse: true },
+  uploadKey: { type: String, unique: true, sparse: true },
   format: { type: String, enum: Object.values(PageBulkExportFormat), required: true },
   completedAt: { type: Date },
   attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' },
+  status: {
+    type: String, enum: Object.values(PageBulkExportJobStatus), required: true, default: PageBulkExportJobStatus.initializing,
+  },
 }, { timestamps: true });
 
 export default getOrCreateModel<PageBulkExportJobDocument, PageBulkExportJobModel>('PageBulkExportJob', pageBulkExportJobSchema);

+ 0 - 17
apps/app/src/features/page-bulk-export/server/models/page-bulk-export-page-info.ts

@@ -1,17 +0,0 @@
-import { type Document, type Model, Schema } from 'mongoose';
-
-import { getOrCreateModel } from '~/server/util/mongoose-utils';
-
-import { IPageBulkExportPageInfo } from '../../interfaces/page-bulk-export';
-
-export interface PageBulkExportPageInfoDocument extends IPageBulkExportPageInfo, Document {}
-
-export type PageBulkExportPageInfoModel = Model<PageBulkExportPageInfoDocument>
-
-const pageBulkExportPageInfoSchema = new Schema<PageBulkExportPageInfoDocument>({
-  pageBulkExportJob: { type: Schema.Types.ObjectId, ref: 'PageBulkExportJob', required: true },
-  path: { type: String, required: true },
-  revision: { type: Schema.Types.ObjectId, ref: 'Revision', required: true },
-}, { timestamps: true });
-
-export default getOrCreateModel<PageBulkExportPageInfoDocument, PageBulkExportPageInfoModel>('PageBulkExportPageInfo', pageBulkExportPageInfoSchema);

+ 19 - 0
apps/app/src/features/page-bulk-export/server/models/page-bulk-export-page-snapshot.ts

@@ -0,0 +1,19 @@
+import { type Document, type Model, Schema } from 'mongoose';
+
+import { getOrCreateModel } from '~/server/util/mongoose-utils';
+
+import type { IPageBulkExportPageSnapshot } from '../../interfaces/page-bulk-export';
+
+export interface PageBulkExportPageSnapshotDocument extends IPageBulkExportPageSnapshot, Document {}
+
+export type PageBulkExportPageSnapshotModel = Model<PageBulkExportPageSnapshotDocument>
+
+const pageBulkExportPageInfoSchema = new Schema<PageBulkExportPageSnapshotDocument>({
+  pageBulkExportJob: { type: Schema.Types.ObjectId, ref: 'PageBulkExportJob', required: true },
+  path: { type: String, required: true },
+  revision: { type: Schema.Types.ObjectId, ref: 'Revision', required: true },
+}, { timestamps: true });
+
+export default getOrCreateModel<PageBulkExportPageSnapshotDocument, PageBulkExportPageSnapshotModel>(
+  'PageBulkExportPageSnapshot', pageBulkExportPageInfoSchema,
+);

+ 6 - 3
apps/app/src/features/page-bulk-export/server/routes/apiv3/page-bulk-export.ts

@@ -7,7 +7,7 @@ import type Crowi from '~/server/crowi';
 import type { ApiV3Response } from '~/server/routes/apiv3/interfaces/apiv3-response';
 import loggerFactory from '~/utils/logger';
 
-import { pageBulkExportService } from '../../service/page-bulk-export';
+import { DuplicateBulkExportJobError, pageBulkExportService } from '../../service/page-bulk-export';
 
 const logger = loggerFactory('growi:routes:apiv3:page-bulk-export');
 
@@ -40,12 +40,15 @@ module.exports = (crowi: Crowi): Router => {
     };
 
     try {
-      await pageBulkExportService?.createAndStartPageBulkExportJob(path, req.user, activityParameters);
+      await pageBulkExportService?.createAndExecuteBulkExportJob(path, req.user, activityParameters);
       return res.apiv3({}, 204);
     }
     catch (err) {
       logger.error(err);
-      return res.apiv3Err(new ErrorV3('Failed to start bulk export'));
+      if (err instanceof DuplicateBulkExportJobError) {
+        return res.apiv3Err(new ErrorV3('Duplicate bulk export job is in progress', 'page_export.duplicate_bulk_export_job_error'), 409);
+      }
+      return res.apiv3Err(new ErrorV3('Failed to start bulk export', 'page_export.failed_to_export'));
     }
   });
 

+ 181 - 95
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,16 +1,16 @@
 import fs from 'fs';
 import path from 'path';
-import type { Readable } from 'stream';
-import { Writable, pipeline } from 'stream';
+import { Writable } from 'stream';
 import { pipeline as pipelinePromise } from 'stream/promises';
 
-
-import type { HasObjectId } from '@growi/core';
-import { type IPage, isPopulated, SubscriptionStatusType } from '@growi/core';
+import {
+  getIdForRef, type IPage, isPopulated, SubscriptionStatusType,
+} from '@growi/core';
 import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
 import type { Archiver } from 'archiver';
 import archiver from 'archiver';
 import gc from 'expose-gc/function';
+import type { HydratedDocument } from 'mongoose';
 import mongoose from 'mongoose';
 
 import type { SupportedActionType } from '~/interfaces/activity';
@@ -27,18 +27,28 @@ import { preNotifyService } from '~/server/service/pre-notify';
 import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import loggerFactory from '~/utils/logger';
 
-import { PageBulkExportFormat } from '../../interfaces/page-bulk-export';
+import { PageBulkExportFormat, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
 import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
 import PageBulkExportJob from '../models/page-bulk-export-job';
+import type { PageBulkExportPageSnapshotDocument } from '../models/page-bulk-export-page-snapshot';
+import PageBulkExportPageSnapshot from '../models/page-bulk-export-page-snapshot';
 
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
 
 type ActivityParameters ={
-  ip: string;
+  ip?: string;
   endpoint: string;
 }
 
+export class DuplicateBulkExportJobError extends Error {
+
+  constructor() {
+    super('Duplicate bulk export job is in progress');
+  }
+
+}
+
 class PageBulkExportService {
 
   crowi: any;
@@ -54,147 +64,187 @@ class PageBulkExportService {
 
   // temporal path of local fs to output page files before upload
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
-  tmpOutputRootDir = '/tmp';
+  tmpOutputRootDir = '/tmp/page-bulk-export';
+
+  pageModel: PageModel;
 
   constructor(crowi) {
     this.crowi = crowi;
     this.activityEvent = crowi.event('activity');
+    this.pageModel = mongoose.model<IPage, PageModel>('Page');
   }
 
-  async createAndStartPageBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
-    const Page = mongoose.model<IPage, PageModel>('Page');
-    const basePage = await Page.findByPathAndViewer(basePagePath, currentUser, null, true);
+  /**
+   * Create a new page bulk export job and execute it
+   */
+  async createAndExecuteBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
+    const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
 
     if (basePage == null) {
       throw new Error('Base page not found or not accessible');
     }
 
-    const pageBulkExportJob: PageBulkExportJobDocument & HasObjectId = await PageBulkExportJob.create({
+    const format = PageBulkExportFormat.md;
+    const duplicatePageBulkExportJobInProgress: HydratedDocument<PageBulkExportJobDocument> | null = await PageBulkExportJob.findOne({
       user: currentUser,
       page: basePage,
-      format: PageBulkExportFormat.markdown,
+      format,
+      $or: [
+        { status: PageBulkExportJobStatus.initializing }, { status: PageBulkExportJobStatus.exporting }, { status: PageBulkExportJobStatus.uploading },
+      ],
+    });
+    if (duplicatePageBulkExportJobInProgress != null) {
+      throw new DuplicateBulkExportJobError();
+    }
+    const pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument> = await PageBulkExportJob.create({
+      user: currentUser, page: basePage, format, status: PageBulkExportJobStatus.initializing,
     });
 
     await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
 
-    this.bulkExportWithBasePagePath(basePagePath, currentUser, activityParameters, pageBulkExportJob);
+    this.executePageBulkExportJob(pageBulkExportJob, activityParameters);
   }
 
-  async bulkExportWithBasePagePath(
-      basePagePath: string, currentUser, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument & HasObjectId,
-  ): Promise<void> {
-    const timeStamp = (new Date()).getTime();
-    const exportName = `page-bulk-export-${timeStamp}`;
-
-    // export pages to fs temporarily
-    const tmpOutputDir = `${this.tmpOutputRootDir}/${exportName}`;
-    try {
-      await this.exportPagesToFS(basePagePath, tmpOutputDir, currentUser);
-    }
-    catch (err) {
-      await this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir);
-      return;
-    }
-
-    const pageArchiver = this.setUpPageArchiver();
-    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
-
-    const originalName = `${exportName}.${this.compressExtension}`;
-    const attachment = Attachment.createWithoutSave(null, currentUser, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
-    const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
-
-    // init multipart upload
-    const fileUploadService: FileUploader = this.crowi.fileUploadService;
-    const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
+  /**
+   * Execute a page bulk export job. This method can also resume a previously inturrupted job.
+   */
+  async executePageBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters): Promise<void> {
     try {
-      await multipartUploader.initUpload();
-      pageBulkExportJob.uploadId = multipartUploader.uploadId;
-      await pageBulkExportJob.save;
+      const User = this.crowi.model('User');
+      const user = await User.findById(getIdForRef(pageBulkExportJob.user));
+
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.initializing) {
+        await this.createPageSnapshots(user, pageBulkExportJob);
+        pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
+        await pageBulkExportJob.save();
+      }
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.exporting) {
+        await this.exportPagesToFS(pageBulkExportJob);
+        pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
+        await pageBulkExportJob.save();
+      }
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) {
+        await this.compressAndUpload(user, pageBulkExportJob);
+      }
     }
     catch (err) {
-      await this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir, multipartUploader);
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(false, pageBulkExportJob, activityParameters);
       return;
     }
 
-    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment, activityParameters, tmpOutputDir);
-
-    pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable,
-      err => this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir, multipartUploader));
-    pageArchiver.directory(tmpOutputDir, false);
-    pageArchiver.finalize();
+    await this.notifyExportResultAndCleanUp(true, pageBulkExportJob, activityParameters);
   }
 
   /**
-   * Handles export failure with the following:
-   * - notify the user of the failure
+   * Do the following in parallel:
+   * - notify user of the export result
+   * - update pageBulkExportJob status
+   * - delete page snapshots
    * - remove the temporal output directory
-   * - abort multipart upload
    */
-  // TODO: update completedAt of pageBulkExportJob, or add a failed status flag to it (https://redmine.weseek.co.jp/issues/78040)
-  private async handleExportError(
-      err: Error | null,
-      activityParameters: ActivityParameters,
+  private async notifyExportResultAndCleanUp(
+      succeeded: boolean,
       pageBulkExportJob: PageBulkExportJobDocument,
-      tmpOutputDir: string,
-      multipartUploader?: IMultipartUploader,
+      activityParameters?: ActivityParameters,
   ): Promise<void> {
-    if (err != null) {
-      logger.error(err);
+    const action = succeeded ? SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED : SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED;
+    pageBulkExportJob.status = succeeded ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
+    const results = await Promise.allSettled([
+      this.notifyExportResult(pageBulkExportJob, action, activityParameters),
+      PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
+      fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
+      pageBulkExportJob.save(),
+    ]);
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
+  }
+
+  /**
+   * Create a snapshot for each page that is to be exported in the pageBulkExportJob
+   */
+  private async createPageSnapshots(user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+    // if the process of creating snapshots was interrupted, delete the snapshots and create from the start
+    await PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob });
 
-      const results = await Promise.allSettled([
-        this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED),
-        fs.promises.rm(tmpOutputDir, { recursive: true, force: true }),
-        multipartUploader?.abortUpload(),
-      ]);
-      results.forEach((result) => {
-        if (result.status === 'rejected') logger.error(result.reason);
-      });
+    const basePage = await this.pageModel.findById(getIdForRef(pageBulkExportJob.page));
+    if (basePage == null) {
+      throw new Error('Base page not found');
     }
-  }
 
-  private async exportPagesToFS(basePagePath: string, outputDir: string, currentUser): Promise<void> {
-    const pagesReadable = await this.getPageReadable(basePagePath, currentUser);
-    const pagesWritable = this.getPageWritable(outputDir);
+    // create a Readable for pages to be exported
+    const { PageQueryBuilder } = this.pageModel;
+    const builder = await new PageQueryBuilder(this.pageModel.find())
+      .addConditionToListWithDescendants(basePage.path)
+      .addViewerCondition(user);
+    const pagesReadable = builder
+      .query
+      .lean()
+      .cursor({ batchSize: this.pageBatchSize });
 
-    return pipelinePromise(pagesReadable, pagesWritable);
+    // create a Writable that creates a snapshot for each page
+    const pageSnapshotsWritable = new Writable({
+      objectMode: true,
+      write: async(page: PageDocument, encoding, callback) => {
+        try {
+          await PageBulkExportPageSnapshot.create({
+            pageBulkExportJob,
+            path: page.path,
+            revision: page.revision,
+          });
+        }
+        catch (err) {
+          callback(err);
+          return;
+        }
+        callback();
+      },
+    });
+
+    await pipelinePromise(pagesReadable, pageSnapshotsWritable);
   }
 
   /**
-   * Get a Readable of all the pages under the specified path, including the root page.
+   * Export pages to the file system before compressing and uploading to the cloud storage.
+   * The export will resume from the last exported page if the process was interrupted.
    */
-  private async getPageReadable(basePagePath: string, currentUser): Promise<Readable> {
-    const Page = mongoose.model<IPage, PageModel>('Page');
-    const { PageQueryBuilder } = Page;
+  private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+    const findQuery = pageBulkExportJob.lastExportedPagePath != null ? {
+      pageBulkExportJob,
+      path: { $gt: pageBulkExportJob.lastExportedPagePath },
+    } : { pageBulkExportJob };
+    const pageSnapshotsReadable = PageBulkExportPageSnapshot
+      .find(findQuery)
+      .populate('revision').sort({ path: 1 }).lean()
+      .cursor({ batchSize: this.pageBatchSize });
 
-    const builder = await new PageQueryBuilder(Page.find())
-      .addConditionToListWithDescendants(basePagePath)
-      .addViewerCondition(currentUser);
+    const pagesWritable = this.getPageWritable(pageBulkExportJob);
 
-    return builder
-      .query
-      .populate('revision')
-      .lean()
-      .cursor({ batchSize: this.pageBatchSize });
+    return pipelinePromise(pageSnapshotsReadable, pagesWritable);
   }
 
   /**
    * Get a Writable that writes the page body temporarily to fs
    */
-  private getPageWritable(outputDir: string): Writable {
+  private getPageWritable(pageBulkExportJob: PageBulkExportJobDocument): Writable {
+    const outputDir = this.getTmpOutputDir(pageBulkExportJob);
     return new Writable({
       objectMode: true,
-      write: async(page: PageDocument, encoding, callback) => {
+      write: async(page: PageBulkExportPageSnapshotDocument, encoding, callback) => {
         try {
           const revision = page.revision;
 
           if (revision != null && isPopulated(revision)) {
             const markdownBody = revision.body;
-            const pathNormalized = `${normalizePath(page.path)}.md`;
+            const pathNormalized = `${normalizePath(page.path)}.${PageBulkExportFormat.md}`;
             const fileOutputPath = path.join(outputDir, pathNormalized);
             const fileOutputParentPath = getParentPath(fileOutputPath);
 
             await fs.promises.mkdir(fileOutputParentPath, { recursive: true });
             await fs.promises.writeFile(fileOutputPath, markdownBody);
+            pageBulkExportJob.lastExportedPagePath = page.path;
+            await pageBulkExportJob.save();
           }
         }
         catch (err) {
@@ -206,6 +256,39 @@ class PageBulkExportService {
     });
   }
 
+  /**
+   * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
+   */
+  private async compressAndUpload(user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+    const pageArchiver = this.setUpPageArchiver();
+    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
+
+    const originalName = `${pageBulkExportJob._id}.${this.compressExtension}`;
+    const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
+    const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
+
+    const fileUploadService: FileUploader = this.crowi.fileUploadService;
+    // if the process of uploading was interrupted, delete and start from the start
+    if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
+      await fileUploadService.abortExistingMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId);
+    }
+
+    // init multipart upload
+    const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
+    await multipartUploader.initUpload();
+    pageBulkExportJob.uploadKey = uploadKey;
+    pageBulkExportJob.uploadId = multipartUploader.uploadId;
+    await pageBulkExportJob.save();
+
+    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment);
+
+    const compressAndUploadPromise = pipelinePromise(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable);
+    pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
+    pageArchiver.finalize();
+
+    await compressAndUploadPromise;
+  }
+
   private setUpPageArchiver(): Archiver {
     const pageArchiver = archiver('tar', {
       gzip: true,
@@ -224,8 +307,6 @@ class PageBulkExportService {
       multipartUploader: IMultipartUploader,
       pageBulkExportJob: PageBulkExportJobDocument,
       attachment: IAttachmentDocument,
-      activityParameters: ActivityParameters,
-      tmpOutputDir: string,
   ): Writable {
     let partNumber = 1;
 
@@ -239,6 +320,7 @@ class PageBulkExportService {
           gc();
         }
         catch (err) {
+          await multipartUploader.abortUpload();
           callback(err);
           return;
         }
@@ -255,9 +337,6 @@ class PageBulkExportService {
           pageBulkExportJob.completedAt = new Date();
           pageBulkExportJob.attachment = attachment._id;
           await pageBulkExportJob.save();
-
-          await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED);
-          await fs.promises.rm(tmpOutputDir, { recursive: true, force: true });
         }
         catch (err) {
           callback(err);
@@ -268,8 +347,15 @@ class PageBulkExportService {
     });
   }
 
+  /**
+   * Get the output directory on the fs to temporarily store page files before compressing and uploading
+   */
+  private getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string {
+    return `${this.tmpOutputRootDir}/${pageBulkExportJob._id}`;
+  }
+
   private async notifyExportResult(
-      activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType,
+      pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType, activityParameters?: ActivityParameters,
   ) {
     const activity = await this.crowi.activityService.createActivity({
       ...activityParameters,

+ 15 - 1
apps/app/src/server/crowi/index.js

@@ -12,7 +12,9 @@ import pkg from '^/package.json';
 
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
-import instanciatePageBulkExportService from '~/features/page-bulk-export/server/service/page-bulk-export';
+import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+import PageBulkExportJob from '~/features/page-bulk-export/server/models/page-bulk-export-job';
+import instanciatePageBulkExportService, { pageBulkExportService } from '~/features/page-bulk-export/server/service/page-bulk-export';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import QuestionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
 import loggerFactory from '~/utils/logger';
@@ -177,6 +179,8 @@ Crowi.prototype.init = async function() {
   ]);
 
   await normalizeData();
+
+  this.resumeIncompletePageBulkExportJobs();
 };
 
 /**
@@ -789,4 +793,14 @@ Crowi.prototype.setupExternalUserGroupSyncService = function() {
   this.keycloakUserGroupSyncService = new KeycloakUserGroupSyncService(this.s2sMessagingService, this.socketIoService);
 };
 
+// TODO: Limit the number of jobs to execute in parallel (https://redmine.weseek.co.jp/issues/143599)
+Crowi.prototype.resumeIncompletePageBulkExportJobs = async function() {
+  const jobs = await PageBulkExportJob.find({
+    $or: [
+      { status: PageBulkExportJobStatus.initializing }, { status: PageBulkExportJobStatus.exporting }, { status: PageBulkExportJobStatus.uploading },
+    ],
+  });
+  Promise.all(jobs.map(job => pageBulkExportService.executePageBulkExportJob(job)));
+};
+
 export default Crowi;

+ 9 - 0
apps/app/src/server/service/file-uploader/aws/index.ts

@@ -10,6 +10,7 @@ import {
   DeleteObjectCommand,
   ListObjectsCommand,
   ObjectCannedACL,
+  AbortMultipartUploadCommand,
 } from '@aws-sdk/client-s3';
 import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
 import urljoin from 'url-join';
@@ -264,6 +265,14 @@ class AwsFileUploader extends AbstractFileUploader {
     return new AwsMultipartUploader(s3, getS3Bucket(), uploadKey, maxPartSize);
   }
 
+  override async abortExistingMultipartUpload(uploadKey: string, uploadId: string) {
+    await S3Factory().send(new AbortMultipartUploadCommand({
+      Bucket: getS3Bucket(),
+      Key: uploadKey,
+      UploadId: uploadId,
+    }));
+  }
+
 }
 
 module.exports = (crowi) => {

+ 0 - 6
apps/app/src/server/service/file-uploader/aws/multipart-uploader.ts

@@ -12,12 +12,6 @@ const logger = loggerFactory('growi:services:fileUploaderAws:multipartUploader')
 
 export type IAwsMultipartUploader = IMultipartUploader
 
-/**
- * Class for uploading files to S3 using multipart upload.
- * Create instance from AwsFileUploader class.
- * Each instance can only be used for one multipart upload, and cannot be reused once completed.
- * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
- */
 export class AwsMultipartUploader extends MultipartUploader implements IAwsMultipartUploader {
 
   private bucket: string | undefined;

+ 12 - 1
apps/app/src/server/service/file-uploader/file-uploader.ts

@@ -44,6 +44,7 @@ export interface FileUploader {
   findDeliveryFile(attachment: IAttachmentDocument): Promise<NodeJS.ReadableStream>,
   generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise<TemporaryUrl>,
   createMultipartUploader: (uploadKey: string, maxPartSize: number) => MultipartUploader,
+  abortExistingMultipartUpload: (uploadKey: string, uploadId: string) => Promise<void>
 }
 
 export abstract class AbstractFileUploader implements FileUploader {
@@ -156,11 +157,21 @@ export abstract class AbstractFileUploader implements FileUploader {
     return ResponseMode.RELAY;
   }
 
+  /**
+   * Create a multipart uploader for cloud storage
+   */
   createMultipartUploader(uploadKey: string, maxPartSize: number): MultipartUploader {
     throw new Error('Multipart upload not available for file upload type');
   }
 
- abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>;
+  abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>;
+
+  /**
+   * Abort an existing multipart upload without creating a MultipartUploader instance
+   */
+  abortExistingMultipartUpload(uploadKey: string, uploadId: string): Promise<void> {
+    throw new Error('Multipart upload not available for file upload type');
+  }
 
   /**
    * Respond to the HTTP request.

+ 12 - 1
apps/app/src/server/service/file-uploader/gcs/index.ts

@@ -1,6 +1,7 @@
 import type { ReadStream } from 'fs';
 
 import { Storage } from '@google-cloud/storage';
+import axios from 'axios';
 import urljoin from 'url-join';
 
 import type Crowi from '~/server/crowi';
@@ -16,7 +17,6 @@ import {
 } from '../file-uploader';
 import { ContentHeaders } from '../utils';
 
-import type { IGcsMultipartUploader } from './multipart-uploader';
 import { GcsMultipartUploader } from './multipart-uploader';
 
 const logger = loggerFactory('growi:service:fileUploaderGcs');
@@ -201,6 +201,17 @@ class GcsFileUploader extends AbstractFileUploader {
     return new GcsMultipartUploader(myBucket, uploadKey, maxPartSize);
   }
 
+  override async abortExistingMultipartUpload(uploadKey: string, uploadId: string) {
+    try {
+      await axios.delete(uploadId);
+    }
+    catch (e) {
+      if (e.response?.status !== 499) {
+        throw e;
+      }
+    }
+  }
+
 }
 
 

+ 4 - 7
apps/app/src/server/service/file-uploader/gcs/multipart-uploader.ts

@@ -1,21 +1,17 @@
 import type { Bucket, File } from '@google-cloud/storage';
 // eslint-disable-next-line no-restricted-imports
 import axios from 'axios';
+import urljoin from 'url-join';
 
 import loggerFactory from '~/utils/logger';
 
+import { configManager } from '../../config-manager';
 import { MultipartUploader, UploadStatus, type IMultipartUploader } from '../multipart-uploader';
 
 const logger = loggerFactory('growi:services:fileUploaderGcs:multipartUploader');
 
 export type IGcsMultipartUploader = IMultipartUploader
 
-/**
- * Class for uploading files to GCS using multipart upload.
- * Create instance from GcsFileUploader class.
- * Each instance can only be used for one multipart upload, and cannot be reused once completed.
- * TODO: Enable creation of uploader of inturrupted uploads: https://redmine.weseek.co.jp/issues/78040
- */
 export class GcsMultipartUploader extends MultipartUploader implements IGcsMultipartUploader {
 
   private file: File;
@@ -26,7 +22,8 @@ export class GcsMultipartUploader extends MultipartUploader implements IGcsMulti
   constructor(bucket: Bucket, uploadKey: string, maxPartSize: number) {
     super(uploadKey, maxPartSize);
 
-    this.file = bucket.file(this.uploadKey);
+    const namespace = configManager.getConfig('crowi', 'gcs:uploadNamespace');
+    this.file = bucket.file(urljoin(namespace || '', uploadKey));
   }
 
   async initUpload(): Promise<void> {

+ 4 - 0
apps/app/src/server/service/file-uploader/multipart-uploader.ts

@@ -18,6 +18,10 @@ export interface IMultipartUploader {
   getUploadedFileSize(): Promise<number>;
 }
 
+/**
+ * Abstract class for uploading files to cloud storage using multipart upload.
+ * Each instance is equivalent to a single multipart upload, and cannot be reused once completed.
+ */
 export abstract class MultipartUploader implements IMultipartUploader {
 
   protected uploadKey: string;