Просмотр исходного кода

Merge branch 'imprv/158220-158223-add-page-bulk-export-job-cron' into imprv/158220-158276-clean-up-page-bulk-export-service

Futa Arai 1 год назад
Родитель
Сommit
af73ee28fa

+ 2 - 0
apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts

@@ -36,7 +36,9 @@ export interface IPageBulkExportJob {
   completedAt?: Date, // the date at which job was completed
   attachment?: Ref<IAttachment>,
   status: PageBulkExportJobStatus,
+  statusOnPreviousCronExec?: PageBulkExportJobStatus, // status on previous cron execution
   revisionListHash?: string, // Hash created from the list of revision IDs. Used to detect existing duplicate uploads.
+  restartFlag: boolean, // flag to restart the job
   createdAt?: Date,
   updatedAt?: Date
 }

+ 4 - 0
apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts

@@ -21,6 +21,10 @@ const pageBulkExportJobSchema = new Schema<PageBulkExportJobDocument>({
   status: {
     type: String, enum: Object.values(PageBulkExportJobStatus), required: true, default: PageBulkExportJobStatus.initializing,
   },
+  statusOnPreviousCronExec: {
+    type: String, enum: Object.values(PageBulkExportJobStatus),
+  },
+  restartFlag: { type: Boolean, required: true, default: false },
   revisionListHash: { type: String },
 }, { timestamps: true });
 

+ 11 - 11
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.integ.ts → apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.integ.ts

@@ -5,7 +5,7 @@ import { configManager } from '~/server/service/config-manager';
 import { PageBulkExportFormat, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
 import PageBulkExportJob from '../models/page-bulk-export-job';
 
-import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from './page-bulk-export-job-cron';
+import instanciatePageBulkExportJobCleanUpCronService, { pageBulkExportJobCleanUpCronService } from './page-bulk-export-job-clean-up-cron';
 
 // TODO: use actual user model after ~/server/models/user.js becomes importable in vitest
 // ref: https://github.com/vitest-dev/vitest/issues/846
@@ -18,25 +18,25 @@ const userSchema = new mongoose.Schema({
 });
 const User = mongoose.model('User', userSchema);
 
-vi.mock('./page-bulk-export', () => {
+vi.mock('./page-bulk-export-job-cron', () => {
   return {
-    pageBulkExportService: {
+    pageBulkExportJobCronService: {
       cleanUpExportJobResources: vi.fn(() => Promise.resolve()),
     },
   };
 });
 
-describe('PageBulkExportJobCronService', () => {
+describe('PageBulkExportJobCleanUpCronService', () => {
   const crowi = { event: () => {} };
   let user;
 
   beforeAll(async() => {
     user = await User.create({
-      name: 'Example for PageBulkExportJobCronService Test',
-      username: 'page bulk export job cron test user',
-      email: 'bulkExportCronTestUser@example.com',
+      name: 'Example for PageBulkExportJobCleanUpCronService Test',
+      username: 'page bulk export job cleanup cron test user',
+      email: 'bulkExportCleanUpCronTestUser@example.com',
     });
-    instanciatePageBulkExportJobCronService(crowi);
+    instanciatePageBulkExportJobCleanUpCronService(crowi);
   });
 
   beforeEach(async() => {
@@ -87,7 +87,7 @@ describe('PageBulkExportJobCronService', () => {
       expect(await PageBulkExportJob.find()).toHaveLength(4);
 
       // act
-      await pageBulkExportJobCronService?.deleteExpiredExportJobs();
+      await pageBulkExportJobCleanUpCronService?.deleteExpiredExportJobs();
       const jobs = await PageBulkExportJob.find();
 
       // assert
@@ -135,7 +135,7 @@ describe('PageBulkExportJobCronService', () => {
       expect(await PageBulkExportJob.find()).toHaveLength(4);
 
       // act
-      await pageBulkExportJobCronService?.deleteDownloadExpiredExportJobs();
+      await pageBulkExportJobCleanUpCronService?.deleteDownloadExpiredExportJobs();
       const jobs = await PageBulkExportJob.find();
 
       // assert
@@ -167,7 +167,7 @@ describe('PageBulkExportJobCronService', () => {
       expect(await PageBulkExportJob.find()).toHaveLength(3);
 
       // act
-      await pageBulkExportJobCronService?.deleteFailedExportJobs();
+      await pageBulkExportJobCleanUpCronService?.deleteFailedExportJobs();
       const jobs = await PageBulkExportJob.find();
 
       // assert

+ 16 - 16
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.ts → apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts

@@ -8,14 +8,14 @@ import { PageBulkExportEnabledFileUploadTypes, PageBulkExportJobInProgressStatus
 import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
 import PageBulkExportJob from '../models/page-bulk-export-job';
 
-import { pageBulkExportService } from './page-bulk-export';
+import { pageBulkExportJobCronService } from './page-bulk-export-job-cron';
 
-const logger = loggerFactory('growi:service:cron');
+const logger = loggerFactory('growi:service:page-bulk-export-job-clean-up-cron');
 
 /**
  * Manages cronjob which deletes unnecessary bulk export jobs
  */
-class PageBulkExportJobCronService extends CronService {
+class PageBulkExportJobCleanUpCronService extends CronService {
 
   crowi: any;
 
@@ -25,7 +25,7 @@ class PageBulkExportJobCronService extends CronService {
   }
 
   override getCronSchedule(): string {
-    return configManager.getConfig('crowi', 'app:pageBulkExportJobCronSchedule');
+    return configManager.getConfig('crowi', 'app:pageBulkExportJobCleanUpCronSchedule');
   }
 
   override async executeJob(): Promise<void> {
@@ -47,8 +47,8 @@ class PageBulkExportJobCronService extends CronService {
       createdAt: { $lt: new Date(Date.now() - exportJobExpirationSeconds * 1000) },
     });
 
-    if (pageBulkExportService != null) {
-      await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, pageBulkExportService.cleanUpExportJobResources.bind(pageBulkExportService));
+    if (pageBulkExportJobCronService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, pageBulkExportJobCronService.cleanUpExportJobResources.bind(pageBulkExportJobCronService));
     }
   }
 
@@ -63,8 +63,8 @@ class PageBulkExportJobCronService extends CronService {
       completedAt: { $lt: thresholdDate },
     });
 
-    const cleanup = async(job: PageBulkExportJobDocument) => {
-      await pageBulkExportService?.cleanUpExportJobResources(job);
+    const cleanUp = async(job: PageBulkExportJobDocument) => {
+      await pageBulkExportJobCronService?.cleanUpExportJobResources(job);
 
       const hasSameAttachmentAndDownloadNotExpired = await PageBulkExportJob.findOne({
         attachment: job.attachment,
@@ -77,7 +77,7 @@ class PageBulkExportJobCronService extends CronService {
       }
     };
 
-    await this.cleanUpAndDeleteBulkExportJobs(downloadExpiredExportJobs, cleanup);
+    await this.cleanUpAndDeleteBulkExportJobs(downloadExpiredExportJobs, cleanUp);
   }
 
   /**
@@ -86,22 +86,22 @@ class PageBulkExportJobCronService extends CronService {
   async deleteFailedExportJobs() {
     const failedExportJobs = await PageBulkExportJob.find({ status: PageBulkExportJobStatus.failed });
 
-    if (pageBulkExportService != null) {
-      await this.cleanUpAndDeleteBulkExportJobs(failedExportJobs, pageBulkExportService.cleanUpExportJobResources.bind(pageBulkExportService));
+    if (pageBulkExportJobCronService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(failedExportJobs, pageBulkExportJobCronService.cleanUpExportJobResources.bind(pageBulkExportJobCronService));
     }
   }
 
   async cleanUpAndDeleteBulkExportJobs(
       pageBulkExportJobs: HydratedDocument<PageBulkExportJobDocument>[],
-      cleanup: (job: PageBulkExportJobDocument) => Promise<void>,
+      cleanUp: (job: PageBulkExportJobDocument) => Promise<void>,
   ): Promise<void> {
-    const results = await Promise.allSettled(pageBulkExportJobs.map(job => cleanup(job)));
+    const results = await Promise.allSettled(pageBulkExportJobs.map(job => cleanUp(job)));
     results.forEach((result) => {
       if (result.status === 'rejected') logger.error(result.reason);
     });
 
     // Only batch delete jobs which have been successfully cleaned up
-    // Cleanup failed jobs will be retried in the next cron execution
+    // Clean up failed jobs will be retried in the next cron execution
     const cleanedUpJobs = pageBulkExportJobs.filter((_, index) => results[index].status === 'fulfilled');
     if (cleanedUpJobs.length > 0) {
       const cleanedUpJobIds = cleanedUpJobs.map(job => job._id);
@@ -112,7 +112,7 @@ class PageBulkExportJobCronService extends CronService {
 }
 
 // eslint-disable-next-line import/no-mutable-exports
-export let pageBulkExportJobCronService: PageBulkExportJobCronService | undefined; // singleton instance
+export let pageBulkExportJobCleanUpCronService: PageBulkExportJobCleanUpCronService | undefined; // singleton instance
 export default function instanciate(crowi): void {
-  pageBulkExportJobCronService = new PageBulkExportJobCronService(crowi);
+  pageBulkExportJobCleanUpCronService = new PageBulkExportJobCleanUpCronService(crowi);
 }

+ 15 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/errors.ts

@@ -0,0 +1,15 @@
+export class BulkExportJobExpiredError extends Error {
+
+  constructor() {
+    super('Bulk export job has expired');
+  }
+
+}
+
+export class BulkExportJobRestartedError extends Error {
+
+  constructor() {
+    super('Bulk export job has restarted');
+  }
+
+}

+ 251 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -0,0 +1,251 @@
+import fs from 'fs';
+import type { Readable } from 'stream';
+
+import type { IPage, IUser } from '@growi/core';
+import { isPopulated, getIdForRef } from '@growi/core';
+import mongoose from 'mongoose';
+
+
+import type { SupportedActionType } from '~/interfaces/activity';
+import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
+import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
+import type { ActivityDocument } from '~/server/models/activity';
+import type { PageModel } from '~/server/models/page';
+import { configManager } from '~/server/service/config-manager';
+import CronService from '~/server/service/cron';
+import type { FileUploader } from '~/server/service/file-uploader';
+import { preNotifyService } from '~/server/service/pre-notify';
+import loggerFactory from '~/utils/logger';
+
+import { PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../../interfaces/page-bulk-export';
+import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
+import PageBulkExportJob from '../../models/page-bulk-export-job';
+import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';
+
+
+import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
+import { compressAndUploadAsync } from './steps/compress-and-upload-async';
+import { createPageSnapshotsAsync } from './steps/create-page-snapshots-async';
+import { exportPagesToFsAsync } from './steps/export-pages-to-fs-async';
+
+
+const logger = loggerFactory('growi:service:page-bulk-export-job-cron');
+
+export interface IPageBulkExportJobCronService {
+  crowi: any;
+  pageModel: PageModel;
+  pageBatchSize: number;
+  maxPartSize: number;
+  compressExtension: string;
+  setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
+  handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void;
+  notifyExportResultAndCleanUp(action: SupportedActionType, pageBulkExportJob: PageBulkExportJobDocument): Promise<void>;
+  getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string;
+}
+
+class PageBulkExportJobCronService extends CronService implements IPageBulkExportJobCronService {
+
+  crowi: any;
+
+  activityEvent: any;
+
+  // multipart upload max part size
+  maxPartSize = 5 * 1024 * 1024; // 5MB
+
+  pageBatchSize = 100;
+
+  compressExtension = 'tar.gz';
+
+  // temporal path of local fs to output page files before upload
+  // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
+  tmpOutputRootDir = '/tmp/page-bulk-export';
+
+  pageModel: PageModel;
+
+  userModel: mongoose.Model<IUser>;
+
+  private streamInExecutionMemo: {
+    [key: string]: Readable;
+  } = {};
+
+  private parallelExecLimit: number;
+
+  constructor(crowi) {
+    super();
+    this.crowi = crowi;
+    this.activityEvent = crowi.event('activity');
+    this.pageModel = mongoose.model<IPage, PageModel>('Page');
+    this.userModel = mongoose.model<IUser>('User');
+    this.parallelExecLimit = configManager.getConfig('crowi', 'app:pageBulkExportParallelExecLimit');
+  }
+
+  override getCronSchedule(): string {
+    return configManager.getConfig('crowi', 'app:pageBulkExportJobCronSchedule');
+  }
+
+  override async executeJob(): Promise<void> {
+    const pageBulkExportJobsInProgress = await PageBulkExportJob.find({
+      $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
+    }).sort({ createdAt: 1 }).limit(this.parallelExecLimit);
+
+    pageBulkExportJobsInProgress.forEach((pageBulkExportJob) => {
+      this.proceedBulkExportJob(pageBulkExportJob);
+    });
+  }
+
+  /**
+   * Get the output directory on the fs to temporarily store page files before compressing and uploading
+   */
+  getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string {
+    return `${this.tmpOutputRootDir}/${pageBulkExportJob._id}`;
+  }
+
+  /**
+   * Get the stream in execution of a job.
+   * A getter method that includes "undefined" in the return type
+   */
+  getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
+    return this.streamInExecutionMemo[jobId.toString()];
+  }
+
+  setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
+    this.streamInExecutionMemo[jobId.toString()] = stream;
+  }
+
+  removeStreamInExecution(jobId: ObjectIdLike) {
+    delete this.streamInExecutionMemo[jobId.toString()];
+  }
+
+  async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
+    if (pageBulkExportJob.restartFlag) {
+      await this.cleanUpExportJobResources(pageBulkExportJob, true);
+      pageBulkExportJob.restartFlag = false;
+      pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
+      pageBulkExportJob.statusOnPreviousCronExec = undefined;
+      await pageBulkExportJob.save();
+    }
+
+    if (pageBulkExportJob.status === pageBulkExportJob.statusOnPreviousCronExec) {
+      return;
+    }
+    try {
+      const user = await this.userModel.findById(getIdForRef(pageBulkExportJob.user));
+
+      // update statusOnPreviousCronExec before starting processes that updates status
+      pageBulkExportJob.statusOnPreviousCronExec = pageBulkExportJob.status;
+      await pageBulkExportJob.save();
+
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.initializing) {
+        await createPageSnapshotsAsync.bind(this)(user, pageBulkExportJob);
+      }
+      else if (pageBulkExportJob.status === PageBulkExportJobStatus.exporting) {
+        exportPagesToFsAsync.bind(this)(pageBulkExportJob);
+      }
+      else if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) {
+        await compressAndUploadAsync.bind(this)(user, pageBulkExportJob);
+      }
+    }
+    catch (err) {
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED, pageBulkExportJob);
+    }
+  }
+
+  async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
+    if (err == null) return;
+
+    if (err instanceof BulkExportJobExpiredError) {
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, pageBulkExportJob);
+    }
+    else if (err instanceof BulkExportJobRestartedError) {
+      logger.info(err.message);
+      await this.cleanUpExportJobResources(pageBulkExportJob);
+    }
+    else {
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED, pageBulkExportJob);
+    }
+  }
+
+  /**
+   * Notify the user of the export result, and cleanup the resources used in the export process
+   * @param action whether the export was successful
+   * @param pageBulkExportJob the page bulk export job
+   */
+  async notifyExportResultAndCleanUp(
+      action: SupportedActionType,
+      pageBulkExportJob: PageBulkExportJobDocument,
+  ): Promise<void> {
+    pageBulkExportJob.status = action === SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED
+      ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
+
+    try {
+      await pageBulkExportJob.save();
+      await this.notifyExportResult(pageBulkExportJob, action);
+    }
+    catch (err) {
+      logger.error(err);
+    }
+    // execute independently of notif process resolve/reject
+    await this.cleanUpExportJobResources(pageBulkExportJob);
+  }
+
+  /**
+   * Do the following in parallel:
+   * - delete page snapshots
+   * - remove the temporal output directory
+   * - abort multipart upload
+   */
+  async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
+    const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id);
+    if (streamInExecution != null) {
+      if (restarted) {
+        streamInExecution.destroy(new BulkExportJobRestartedError());
+      }
+      else {
+        streamInExecution.destroy(new BulkExportJobExpiredError());
+      }
+    }
+    this.removeStreamInExecution(pageBulkExportJob._id);
+
+    const promises = [
+      PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
+      fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
+    ];
+
+    const fileUploadService: FileUploader = this.crowi.fileUploadService;
+    if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
+      promises.push(fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId));
+    }
+
+    const results = await Promise.allSettled(promises);
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
+  }
+
+  private async notifyExportResult(
+      pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType,
+  ) {
+    const activity = await this.crowi.activityService.createActivity({
+      action,
+      targetModel: SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB,
+      target: pageBulkExportJob,
+      user: pageBulkExportJob.user,
+      snapshot: {
+        username: isPopulated(pageBulkExportJob.user) ? pageBulkExportJob.user.username : '',
+      },
+    });
+    const getAdditionalTargetUsers = async(activity: ActivityDocument) => [activity.user];
+    const preNotify = preNotifyService.generatePreNotify(activity, getAdditionalTargetUsers);
+    this.activityEvent.emit('updated', activity, pageBulkExportJob, preNotify);
+  }
+
+}
+
+// eslint-disable-next-line import/no-mutable-exports
+export let pageBulkExportJobCronService: PageBulkExportJobCronService | undefined; // singleton instance
+export default function instanciate(crowi): void {
+  pageBulkExportJobCronService = new PageBulkExportJobCronService(crowi);
+}

+ 117 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts

@@ -0,0 +1,117 @@
+import { Writable, pipeline } from 'stream';
+
+import type { Archiver } from 'archiver';
+import archiver from 'archiver';
+import gc from 'expose-gc/function';
+
+import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+import { SupportedAction } from '~/interfaces/activity';
+import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
+import type { IAttachmentDocument } from '~/server/models/attachment';
+import { Attachment } from '~/server/models/attachment';
+import type { FileUploader } from '~/server/service/file-uploader';
+import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
+import { getBufferToFixedSizeTransform } from '~/server/util/stream';
+import loggerFactory from '~/utils/logger';
+
+import type { IPageBulkExportJobCronService } from '..';
+import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job';
+
+const logger = loggerFactory('growi:service:page-bulk-export-job-cron:compress-and-upload-async');
+
+function setUpPageArchiver(): Archiver {
+  const pageArchiver = archiver('tar', {
+    gzip: true,
+  });
+
+  // good practice to catch warnings (ie stat failures and other non-blocking errors)
+  pageArchiver.on('warning', (err) => {
+    if (err.code === 'ENOENT') logger.error(err);
+    else throw err;
+  });
+
+  return pageArchiver;
+}
+
+function getMultipartUploadWritable(
+    this: IPageBulkExportJobCronService,
+    multipartUploader: IMultipartUploader,
+    pageBulkExportJob: PageBulkExportJobDocument,
+    attachment: IAttachmentDocument,
+): Writable {
+  let partNumber = 1;
+
+  return new Writable({
+    write: async(part: Buffer, encoding, callback) => {
+      try {
+        await multipartUploader.uploadPart(part, partNumber);
+        partNumber += 1;
+        // First aid to prevent unexplained memory leaks
+        logger.info('global.gc() invoked.');
+        gc();
+      }
+      catch (err) {
+        await multipartUploader.abortUpload();
+        callback(err);
+        return;
+      }
+      callback();
+    },
+    final: async(callback) => {
+      try {
+        await multipartUploader.completeUpload();
+
+        const fileSize = await multipartUploader.getUploadedFileSize();
+        attachment.fileSize = fileSize;
+        await attachment.save();
+
+        pageBulkExportJob.completedAt = new Date();
+        pageBulkExportJob.attachment = attachment._id;
+        pageBulkExportJob.status = PageBulkExportJobStatus.completed;
+        await pageBulkExportJob.save();
+
+        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
+      }
+      catch (err) {
+        callback(err);
+        return;
+      }
+      callback();
+    },
+  });
+}
+
+
+/**
+ * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
+ */
+export async function compressAndUploadAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+  const pageArchiver = setUpPageArchiver();
+  const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
+
+  if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set');
+  const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`;
+  const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
+  const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
+
+  const fileUploadService: FileUploader = this.crowi.fileUploadService;
+  // if the process of uploading was interrupted, delete and start from the start
+  if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
+    await fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId);
+  }
+
+  // init multipart upload
+  const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
+  await multipartUploader.initUpload();
+  pageBulkExportJob.uploadKey = uploadKey;
+  pageBulkExportJob.uploadId = multipartUploader.uploadId;
+  await pageBulkExportJob.save();
+
+  const multipartUploadWritable = getMultipartUploadWritable.bind(this)(multipartUploader, pageBulkExportJob, attachment);
+
+  pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable, (err) => {
+    this.handlePipelineError(err, pageBulkExportJob);
+  });
+  pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
+  pageArchiver.finalize();
+}

+ 90 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts

@@ -0,0 +1,90 @@
+import { createHash } from 'crypto';
+import { Writable, pipeline } from 'stream';
+
+import { getIdForRef, getIdStringForRef } from '@growi/core';
+
+import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+import type { PageDocument } from '~/server/models/page';
+
+import type { IPageBulkExportJobCronService } from '..';
+import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job';
+import PageBulkExportJob from '../../../models/page-bulk-export-job';
+import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
+
+async function reuseDuplicateExportIfExists(pageBulkExportJob: PageBulkExportJobDocument) {
+  const duplicateExportJob = await PageBulkExportJob.findOne({
+    user: pageBulkExportJob.user,
+    page: pageBulkExportJob.page,
+    format: pageBulkExportJob.format,
+    status: PageBulkExportJobStatus.completed,
+    revisionListHash: pageBulkExportJob.revisionListHash,
+  });
+  if (duplicateExportJob != null) {
+    // if an upload with the exact same contents exists, re-use the same attachment of that upload
+    pageBulkExportJob.attachment = duplicateExportJob.attachment;
+    pageBulkExportJob.status = PageBulkExportJobStatus.completed;
+    await pageBulkExportJob.save();
+  }
+}
+
+/**
+ * Start a pipeline that creates a snapshot for each page that is to be exported in the pageBulkExportJob.
+ * 'revisionListHash' is calulated and saved to the pageBulkExportJob at the end of the pipeline.
+ */
+export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+  // if the process of creating snapshots was interrupted, delete the snapshots and create from the start
+  await PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob });
+
+  const basePage = await this.pageModel.findById(getIdForRef(pageBulkExportJob.page));
+  if (basePage == null) {
+    throw new Error('Base page not found');
+  }
+
+  const revisionListHash = createHash('sha256');
+
+  // create a Readable for pages to be exported
+  const { PageQueryBuilder } = this.pageModel;
+  const builder = await new PageQueryBuilder(this.pageModel.find())
+    .addConditionToListWithDescendants(basePage.path)
+    .addViewerCondition(user);
+  const pagesReadable = builder
+    .query
+    .lean()
+    .cursor({ batchSize: this.pageBatchSize });
+
+  // create a Writable that creates a snapshot for each page
+  const pageSnapshotsWritable = new Writable({
+    objectMode: true,
+    write: async(page: PageDocument, encoding, callback) => {
+      try {
+        if (page.revision != null) {
+          revisionListHash.update(getIdStringForRef(page.revision));
+        }
+        await PageBulkExportPageSnapshot.create({
+          pageBulkExportJob,
+          path: page.path,
+          revision: page.revision,
+        });
+      }
+      catch (err) {
+        callback(err);
+        return;
+      }
+      callback();
+    },
+    final: async(callback) => {
+      pageBulkExportJob.revisionListHash = revisionListHash.digest('hex');
+      pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
+      await pageBulkExportJob.save();
+
+      await reuseDuplicateExportIfExists(pageBulkExportJob);
+      callback();
+    },
+  });
+
+  this.setStreamInExecution(pageBulkExportJob._id, pagesReadable);
+
+  pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
+    this.handlePipelineError(err, pageBulkExportJob);
+  });
+}

+ 73 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts

@@ -0,0 +1,73 @@
+import fs from 'fs';
+import path from 'path';
+import { Writable, pipeline } from 'stream';
+
+import { isPopulated } from '@growi/core';
+import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
+
+import { PageBulkExportFormat, PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+
+import type { IPageBulkExportJobCronService } from '..';
+import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job';
+import type { PageBulkExportPageSnapshotDocument } from '../../../models/page-bulk-export-page-snapshot';
+import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
+
+/**
+ * Get a Writable that writes the page body temporarily to fs
+ */
+function getPageWritable(this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument): Writable {
+  const outputDir = this.getTmpOutputDir(pageBulkExportJob);
+  return new Writable({
+    objectMode: true,
+    write: async(page: PageBulkExportPageSnapshotDocument, encoding, callback) => {
+      try {
+        const revision = page.revision;
+
+        if (revision != null && isPopulated(revision)) {
+          const markdownBody = revision.body;
+          const pathNormalized = `${normalizePath(page.path)}.${PageBulkExportFormat.md}`;
+          const fileOutputPath = path.join(outputDir, pathNormalized);
+          const fileOutputParentPath = getParentPath(fileOutputPath);
+
+          await fs.promises.mkdir(fileOutputParentPath, { recursive: true });
+          await fs.promises.writeFile(fileOutputPath, markdownBody);
+          pageBulkExportJob.lastExportedPagePath = page.path;
+          await pageBulkExportJob.save();
+        }
+      }
+      catch (err) {
+        callback(err);
+        return;
+      }
+      callback();
+    },
+    final: async(callback) => {
+      pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
+      await pageBulkExportJob.save();
+      callback();
+    },
+  });
+}
+
+/**
+ * Export pages to the file system before compressing and uploading to the cloud storage.
+ * The export will resume from the last exported page if the process was interrupted.
+ */
+export function exportPagesToFsAsync(this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument): void {
+  const findQuery = pageBulkExportJob.lastExportedPagePath != null ? {
+    pageBulkExportJob,
+    path: { $gt: pageBulkExportJob.lastExportedPagePath },
+  } : { pageBulkExportJob };
+  const pageSnapshotsReadable = PageBulkExportPageSnapshot
+    .find(findQuery)
+    .populate('revision').sort({ path: 1 }).lean()
+    .cursor({ batchSize: this.pageBatchSize });
+
+  const pagesWritable = getPageWritable.bind(this)(pageBulkExportJob);
+
+  this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable);
+
+  pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
+    this.handlePipelineError(err, pageBulkExportJob);
+  });
+}

+ 7 - 1
apps/app/src/server/crowi/index.js

@@ -12,8 +12,11 @@ import pkg from '^/package.json';
 
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
-import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
 import { startCronIfEnabled as startOpenaiCronIfEnabled } from '~/features/openai/server/services/cron';
+import instanciatePageBulkExportJobCleanUpCronService, {
+  pageBulkExportJobCleanUpCronService,
+} from '~/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron';
+import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import questionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
 import loggerFactory from '~/utils/logger';
@@ -323,6 +326,9 @@ Crowi.prototype.setupSocketIoService = async function() {
 Crowi.prototype.setupCron = function() {
   questionnaireCronService.startCron();
 
+  instanciatePageBulkExportJobCleanUpCronService(this);
+  pageBulkExportJobCleanUpCronService.startCron();
+
   instanciatePageBulkExportJobCronService(this);
   pageBulkExportJobCronService.startCron();
 

+ 6 - 0
apps/app/src/server/service/config-loader.ts

@@ -767,6 +767,12 @@ const ENV_VAR_NAME_TO_CONFIG_INFO: Record<string, EnvConfig> = {
     ns: 'crowi',
     key: 'app:pageBulkExportJobCronSchedule',
     type: ValueType.STRING,
+    default: '*/10 * * * * *', // every 10 seconds
+  },
+  BULK_EXPORT_JOB_CLEAN_UP_CRON_SCHEDULE: {
+    ns: 'crowi',
+    key: 'app:pageBulkExportJobCleanUpCronSchedule',
+    type: ValueType.STRING,
     default: '*/10 * * * *', // every 10 minutes
   },
   BULK_EXPORT_PARALLEL_EXEC_LIMIT: {