Futa Arai 1 год назад
Родитель
Сommit
be6cb60769

+ 2 - 7
apps/app/src/features/page-bulk-export/server/routes/apiv3/page-bulk-export.ts

@@ -7,8 +7,7 @@ import type Crowi from '~/server/crowi';
 import type { ApiV3Response } from '~/server/routes/apiv3/interfaces/apiv3-response';
 import loggerFactory from '~/utils/logger';
 
-import { pageBulkExportService } from '../../service/page-bulk-export';
-import { DuplicateBulkExportJobError } from '../../service/page-bulk-export/errors';
+import { pageBulkExportService, DuplicateBulkExportJobError } from '../../service/page-bulk-export';
 
 const logger = loggerFactory('growi:routes:apiv3:page-bulk-export');
 
@@ -36,13 +35,9 @@ module.exports = (crowi: Crowi): Router => {
     }
 
     const { path, format, restartJob } = req.body;
-    const activityParameters = {
-      ip: req.ip,
-      endpoint: req.originalUrl,
-    };
 
     try {
-      await pageBulkExportService?.createAndExecuteOrRestartBulkExportJob(path, req.user, activityParameters, restartJob);
+      await pageBulkExportService?.createOrResetBulkExportJob(path, req.user, restartJob);
       return res.apiv3({}, 204);
     }
     catch (err) {

+ 82 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -0,0 +1,82 @@
+import {
+  type IPage, SubscriptionStatusType,
+} from '@growi/core';
+import type { HydratedDocument } from 'mongoose';
+import mongoose from 'mongoose';
+
+
+import { SupportedTargetModel } from '~/interfaces/activity';
+import type { PageModel } from '~/server/models/page';
+import Subscription from '~/server/models/subscription';
+import loggerFactory from '~/utils/logger';
+
+import { PageBulkExportFormat, PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
+import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
+import PageBulkExportJob from '../models/page-bulk-export-job';
+
+const logger = loggerFactory('growi:services:PageBulkExportService');
+
+export class DuplicateBulkExportJobError extends Error {
+
+  duplicateJob: HydratedDocument<PageBulkExportJobDocument>;
+
+  constructor(duplicateJob: HydratedDocument<PageBulkExportJobDocument>) {
+    super('Duplicate bulk export job is in progress');
+    this.duplicateJob = duplicateJob;
+  }
+
+}
+
+export interface IPageBulkExportService {
+  createOrResetBulkExportJob: (basePagePath: string, currentUser, restartJob?: boolean) => Promise<void>;
+}
+
+class PageBulkExportService implements IPageBulkExportService {
+
+  // temporal path of local fs to output page files before upload
+  // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
+  tmpOutputRootDir = '/tmp/page-bulk-export';
+
+  /**
+   * Create a new page bulk export job or reset the existing one
+   */
+  async createOrResetBulkExportJob(basePagePath: string, currentUser, restartJob = false): Promise<void> {
+    const Page = mongoose.model<IPage, PageModel>('Page');
+    const basePage = await Page.findByPathAndViewer(basePagePath, currentUser, null, true);
+
+    if (basePage == null) {
+      throw new Error('Base page not found or not accessible');
+    }
+
+    const format = PageBulkExportFormat.md;
+    const duplicatePageBulkExportJobInProgress: HydratedDocument<PageBulkExportJobDocument> | null = await PageBulkExportJob.findOne({
+      user: currentUser,
+      page: basePage,
+      format,
+      $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
+    });
+    if (duplicatePageBulkExportJobInProgress != null) {
+      if (restartJob) {
+        this.resetBulkExportJob(duplicatePageBulkExportJobInProgress);
+        return;
+      }
+      throw new DuplicateBulkExportJobError(duplicatePageBulkExportJobInProgress);
+    }
+    const pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument> = await PageBulkExportJob.create({
+      user: currentUser, page: basePage, format, status: PageBulkExportJobStatus.initializing,
+    });
+
+    await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
+  }
+
+  /**
+   * Reset page bulk export job in progress
+   */
+  async resetBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>): Promise<void> {
+    pageBulkExportJob.restartFlag = true;
+    await pageBulkExportJob.save();
+  }
+
+}
+
+export const pageBulkExportService: PageBulkExportService = new PageBulkExportService(); // singleton instance

+ 0 - 30
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/errors.ts

@@ -1,30 +0,0 @@
-import type { HydratedDocument } from 'mongoose';
-
-import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
-
-export class DuplicateBulkExportJobError extends Error {
-
-  duplicateJob: HydratedDocument<PageBulkExportJobDocument>;
-
-  constructor(duplicateJob: HydratedDocument<PageBulkExportJobDocument>) {
-    super('Duplicate bulk export job is in progress');
-    this.duplicateJob = duplicateJob;
-  }
-
-}
-
-export class BulkExportJobExpiredError extends Error {
-
-  constructor() {
-    super('Bulk export job has expired');
-  }
-
-}
-
-export class BulkExportJobRestartedError extends Error {
-
-  constructor() {
-    super('Bulk export job has restarted');
-  }
-
-}

+ 0 - 462
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/index.ts

@@ -1,462 +0,0 @@
-import { createHash } from 'crypto';
-import fs from 'fs';
-import path from 'path';
-import { Writable } from 'stream';
-import { pipeline as pipelinePromise } from 'stream/promises';
-
-import type { IUser } from '@growi/core';
-import {
-  getIdForRef, getIdStringForRef, type IPage, isPopulated, SubscriptionStatusType,
-} from '@growi/core';
-import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
-import type { Archiver } from 'archiver';
-import archiver from 'archiver';
-import gc from 'expose-gc/function';
-import type { HydratedDocument } from 'mongoose';
-import mongoose from 'mongoose';
-
-import type { SupportedActionType } from '~/interfaces/activity';
-import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
-import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
-import type { ActivityDocument } from '~/server/models/activity';
-import type { IAttachmentDocument } from '~/server/models/attachment';
-import { Attachment } from '~/server/models/attachment';
-import type { PageModel, PageDocument } from '~/server/models/page';
-import Subscription from '~/server/models/subscription';
-import type { FileUploader } from '~/server/service/file-uploader';
-import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
-import { preNotifyService } from '~/server/service/pre-notify';
-import { getBufferToFixedSizeTransform } from '~/server/util/stream';
-import loggerFactory from '~/utils/logger';
-
-import { PageBulkExportFormat, PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../../interfaces/page-bulk-export';
-import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
-import PageBulkExportJob from '../../models/page-bulk-export-job';
-import type { PageBulkExportPageSnapshotDocument } from '../../models/page-bulk-export-page-snapshot';
-import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';
-
-import { BulkExportJobExpiredError, BulkExportJobRestartedError, DuplicateBulkExportJobError } from './errors';
-import { PageBulkExportJobManager } from './page-bulk-export-job-manager';
-
-
-const logger = loggerFactory('growi:services:PageBulkExportService');
-
-export type ActivityParameters ={
-  ip?: string;
-  endpoint: string;
-}
-
-export interface IPageBulkExportService {
-  executePageBulkExportJob: (pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters) => Promise<void>
-}
-
-class PageBulkExportService implements IPageBulkExportService {
-
-  crowi: any;
-
-  activityEvent: any;
-
-  // multipart upload max part size
-  maxPartSize = 5 * 1024 * 1024; // 5MB
-
-  pageBatchSize = 100;
-
-  compressExtension = 'tar.gz';
-
-  pageBulkExportJobManager: PageBulkExportJobManager;
-
-  // temporal path of local fs to output page files before upload
-  // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
-  tmpOutputRootDir = '/tmp/page-bulk-export';
-
-  pageModel: PageModel;
-
-  constructor(crowi) {
-    this.crowi = crowi;
-    this.activityEvent = crowi.event('activity');
-    this.pageModel = mongoose.model<IPage, PageModel>('Page');
-    this.pageBulkExportJobManager = new PageBulkExportJobManager(this);
-  }
-
-  /**
-   * Create a new page bulk export job and execute it
-   */
-  async createAndExecuteOrRestartBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters, restartJob = false): Promise<void> {
-    const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
-
-    if (basePage == null) {
-      throw new Error('Base page not found or not accessible');
-    }
-
-    const format = PageBulkExportFormat.md;
-    const duplicatePageBulkExportJobInProgress: HydratedDocument<PageBulkExportJobDocument> | null = await PageBulkExportJob.findOne({
-      user: currentUser,
-      page: basePage,
-      format,
-      $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
-    });
-    if (duplicatePageBulkExportJobInProgress != null) {
-      if (restartJob) {
-        this.restartBulkExportJob(duplicatePageBulkExportJobInProgress, activityParameters);
-        return;
-      }
-      throw new DuplicateBulkExportJobError(duplicatePageBulkExportJobInProgress);
-    }
-    const pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument> = await PageBulkExportJob.create({
-      user: currentUser, page: basePage, format, status: PageBulkExportJobStatus.initializing,
-    });
-
-    await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
-
-    this.pageBulkExportJobManager.addJob(pageBulkExportJob, activityParameters);
-  }
-
-  /**
-   * Restart page bulk export job in progress from the beginning
-   */
-  async restartBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters: ActivityParameters): Promise<void> {
-    await this.cleanUpExportJobResources(pageBulkExportJob, true);
-
-    pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
-    await pageBulkExportJob.save();
-    this.pageBulkExportJobManager.addJob(pageBulkExportJob, activityParameters);
-  }
-
-  /**
-   * Execute a page bulk export job. This method can also resume a previously inturrupted job.
-   */
-  async executePageBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters): Promise<void> {
-    try {
-      const User = mongoose.model<IUser>('User');
-      const user = await User.findById(getIdForRef(pageBulkExportJob.user));
-
-      if (pageBulkExportJob.status === PageBulkExportJobStatus.initializing) {
-        await this.createPageSnapshots(user, pageBulkExportJob);
-
-        const duplicateExportJob = await PageBulkExportJob.findOne({
-          user: pageBulkExportJob.user,
-          page: pageBulkExportJob.page,
-          format: pageBulkExportJob.format,
-          status: PageBulkExportJobStatus.completed,
-          revisionListHash: pageBulkExportJob.revisionListHash,
-        });
-        if (duplicateExportJob != null) {
-          // if an upload with the exact same contents exists, re-use the same attachment of that upload
-          pageBulkExportJob.attachment = duplicateExportJob.attachment;
-          pageBulkExportJob.status = PageBulkExportJobStatus.completed;
-        }
-        else {
-          pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
-        }
-        await pageBulkExportJob.save();
-      }
-      if (pageBulkExportJob.status === PageBulkExportJobStatus.exporting) {
-        await this.exportPagesToFS(pageBulkExportJob);
-        pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
-        await pageBulkExportJob.save();
-      }
-      if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) {
-        await this.compressAndUpload(user, pageBulkExportJob);
-      }
-    }
-    catch (err) {
-      if (err instanceof BulkExportJobExpiredError) {
-        logger.error(err);
-        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, pageBulkExportJob, activityParameters);
-      }
-      else if (err instanceof BulkExportJobRestartedError) {
-        logger.info(err.message);
-        await this.cleanUpExportJobResources(pageBulkExportJob);
-      }
-      else {
-        logger.error(err);
-        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED, pageBulkExportJob, activityParameters);
-      }
-      return;
-    }
-
-    await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob, activityParameters);
-  }
-
-  /**
-   * Notify the user of the export result, and cleanup the resources used in the export process
-   * @param action whether the export was successful
-   * @param pageBulkExportJob the page bulk export job
-   * @param activityParameters parameters to record user activity
-   */
-  private async notifyExportResultAndCleanUp(
-      action: SupportedActionType,
-      pageBulkExportJob: PageBulkExportJobDocument,
-      activityParameters?: ActivityParameters,
-  ): Promise<void> {
-    pageBulkExportJob.status = action === SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED
-      ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
-
-    try {
-      await pageBulkExportJob.save();
-      await this.notifyExportResult(pageBulkExportJob, action, activityParameters);
-    }
-    catch (err) {
-      logger.error(err);
-    }
-    // execute independently of notif process resolve/reject
-    await this.cleanUpExportJobResources(pageBulkExportJob);
-  }
-
-  /**
-   * Create a snapshot for each page that is to be exported in the pageBulkExportJob.
-   * Also calulate revisionListHash and save it to the pageBulkExportJob.
-   */
-  private async createPageSnapshots(user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
-    // if the process of creating snapshots was interrupted, delete the snapshots and create from the start
-    await PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob });
-
-    const basePage = await this.pageModel.findById(getIdForRef(pageBulkExportJob.page));
-    if (basePage == null) {
-      throw new Error('Base page not found');
-    }
-
-    const revisionListHash = createHash('sha256');
-
-    // create a Readable for pages to be exported
-    const { PageQueryBuilder } = this.pageModel;
-    const builder = await new PageQueryBuilder(this.pageModel.find())
-      .addConditionToListWithDescendants(basePage.path)
-      .addViewerCondition(user);
-    const pagesReadable = builder
-      .query
-      .lean()
-      .cursor({ batchSize: this.pageBatchSize });
-
-    // create a Writable that creates a snapshot for each page
-    const pageSnapshotsWritable = new Writable({
-      objectMode: true,
-      write: async(page: PageDocument, encoding, callback) => {
-        try {
-          if (page.revision != null) {
-            revisionListHash.update(getIdStringForRef(page.revision));
-          }
-          await PageBulkExportPageSnapshot.create({
-            pageBulkExportJob,
-            path: page.path,
-            revision: page.revision,
-          });
-        }
-        catch (err) {
-          callback(err);
-          return;
-        }
-        callback();
-      },
-    });
-
-    this.pageBulkExportJobManager.updateJobStream(pageBulkExportJob._id, pagesReadable);
-
-    await pipelinePromise(pagesReadable, pageSnapshotsWritable);
-
-    pageBulkExportJob.revisionListHash = revisionListHash.digest('hex');
-    await pageBulkExportJob.save();
-  }
-
-  /**
-   * Export pages to the file system before compressing and uploading to the cloud storage.
-   * The export will resume from the last exported page if the process was interrupted.
-   */
-  private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
-    const findQuery = pageBulkExportJob.lastExportedPagePath != null ? {
-      pageBulkExportJob,
-      path: { $gt: pageBulkExportJob.lastExportedPagePath },
-    } : { pageBulkExportJob };
-    const pageSnapshotsReadable = PageBulkExportPageSnapshot
-      .find(findQuery)
-      .populate('revision').sort({ path: 1 }).lean()
-      .cursor({ batchSize: this.pageBatchSize });
-
-    const pagesWritable = this.getPageWritable(pageBulkExportJob);
-
-    this.pageBulkExportJobManager.updateJobStream(pageBulkExportJob._id, pageSnapshotsReadable);
-
-    return pipelinePromise(pageSnapshotsReadable, pagesWritable);
-  }
-
-  /**
-   * Get a Writable that writes the page body temporarily to fs
-   */
-  private getPageWritable(pageBulkExportJob: PageBulkExportJobDocument): Writable {
-    const outputDir = this.getTmpOutputDir(pageBulkExportJob);
-    return new Writable({
-      objectMode: true,
-      write: async(page: PageBulkExportPageSnapshotDocument, encoding, callback) => {
-        try {
-          const revision = page.revision;
-
-          if (revision != null && isPopulated(revision)) {
-            const markdownBody = revision.body;
-            const pathNormalized = `${normalizePath(page.path)}.${PageBulkExportFormat.md}`;
-            const fileOutputPath = path.join(outputDir, pathNormalized);
-            const fileOutputParentPath = getParentPath(fileOutputPath);
-
-            await fs.promises.mkdir(fileOutputParentPath, { recursive: true });
-            await fs.promises.writeFile(fileOutputPath, markdownBody);
-            pageBulkExportJob.lastExportedPagePath = page.path;
-            await pageBulkExportJob.save();
-          }
-        }
-        catch (err) {
-          callback(err);
-          return;
-        }
-        callback();
-      },
-    });
-  }
-
-  /**
-   * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
-   */
-  private async compressAndUpload(user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
-    const pageArchiver = this.setUpPageArchiver();
-    const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
-
-    if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set');
-    const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`;
-    const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
-    const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
-
-    const fileUploadService: FileUploader = this.crowi.fileUploadService;
-    // if the process of uploading was interrupted, delete and start from the start
-    if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
-      await fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId);
-    }
-
-    // init multipart upload
-    const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
-    await multipartUploader.initUpload();
-    pageBulkExportJob.uploadKey = uploadKey;
-    pageBulkExportJob.uploadId = multipartUploader.uploadId;
-    await pageBulkExportJob.save();
-
-    const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment);
-
-    const compressAndUploadPromise = pipelinePromise(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable);
-    pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
-    pageArchiver.finalize();
-
-    await compressAndUploadPromise;
-  }
-
-  private setUpPageArchiver(): Archiver {
-    const pageArchiver = archiver('tar', {
-      gzip: true,
-    });
-
-    // good practice to catch warnings (ie stat failures and other non-blocking errors)
-    pageArchiver.on('warning', (err) => {
-      if (err.code === 'ENOENT') logger.error(err);
-      else throw err;
-    });
-
-    return pageArchiver;
-  }
-
-  private getMultipartUploadWritable(
-      multipartUploader: IMultipartUploader,
-      pageBulkExportJob: PageBulkExportJobDocument,
-      attachment: IAttachmentDocument,
-  ): Writable {
-    let partNumber = 1;
-
-    return new Writable({
-      write: async(part: Buffer, encoding, callback) => {
-        try {
-          await multipartUploader.uploadPart(part, partNumber);
-          partNumber += 1;
-          // First aid to prevent unexplained memory leaks
-          logger.info('global.gc() invoked.');
-          gc();
-        }
-        catch (err) {
-          await multipartUploader.abortUpload();
-          callback(err);
-          return;
-        }
-        callback();
-      },
-      final: async(callback) => {
-        try {
-          await multipartUploader.completeUpload();
-
-          const fileSize = await multipartUploader.getUploadedFileSize();
-          attachment.fileSize = fileSize;
-          await attachment.save();
-
-          pageBulkExportJob.completedAt = new Date();
-          pageBulkExportJob.attachment = attachment._id;
-          await pageBulkExportJob.save();
-        }
-        catch (err) {
-          callback(err);
-          return;
-        }
-        callback();
-      },
-    });
-  }
-
-  /**
-   * Get the output directory on the fs to temporarily store page files before compressing and uploading
-   */
-  private getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string {
-    return `${this.tmpOutputRootDir}/${pageBulkExportJob._id}`;
-  }
-
-  async notifyExportResult(
-      pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType, activityParameters?: ActivityParameters,
-  ) {
-    const activity = await this.crowi.activityService.createActivity({
-      ...activityParameters,
-      action,
-      targetModel: SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB,
-      target: pageBulkExportJob,
-      user: pageBulkExportJob.user,
-      snapshot: {
-        username: isPopulated(pageBulkExportJob.user) ? pageBulkExportJob.user.username : '',
-      },
-    });
-    const getAdditionalTargetUsers = async(activity: ActivityDocument) => [activity.user];
-    const preNotify = preNotifyService.generatePreNotify(activity, getAdditionalTargetUsers);
-    this.activityEvent.emit('updated', activity, pageBulkExportJob, preNotify);
-  }
-
-  /**
-   * Do the following in parallel:
-   * - delete page snapshots
-   * - remove the temporal output directory
-   * - abort multipart upload
-   */
-  async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
-    this.pageBulkExportJobManager.removeJobInProgressAndQueueNextJob(pageBulkExportJob._id, restarted);
-
-    const promises = [
-      PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
-      fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
-    ];
-
-    const fileUploadService: FileUploader = this.crowi.fileUploadService;
-    if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
-      promises.push(fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId));
-    }
-
-    const results = await Promise.allSettled(promises);
-    results.forEach((result) => {
-      if (result.status === 'rejected') logger.error(result.reason);
-    });
-  }
-
-}
-
-// eslint-disable-next-line import/no-mutable-exports
-export let pageBulkExportService: PageBulkExportService | undefined; // singleton instance
-export default function instanciate(crowi): void {
-  pageBulkExportService = new PageBulkExportService(crowi);
-}

+ 0 - 231
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.spec.ts

@@ -1,231 +0,0 @@
-import { Readable } from 'stream';
-import { finished } from 'stream/promises';
-
-import type { HydratedDocument } from 'mongoose';
-
-import { configManager } from '~/server/service/config-manager';
-
-import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
-
-import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
-import { PageBulkExportJobManager } from './page-bulk-export-job-manager';
-
-describe('PageBulkExportJobManager', () => {
-  let pageBulkExportServiceMock;
-  let jobManager: PageBulkExportJobManager;
-
-  beforeAll(() => {
-    vi.spyOn(configManager, 'getConfig').mockImplementation((namespace, key) => {
-      if (namespace === 'crowi' && key === 'app:pageBulkExportParallelExecLimit') {
-        return 3;
-      }
-      return undefined; // or whatever the default return value should be
-    });
-  });
-
-  beforeEach(() => {
-    pageBulkExportServiceMock = {
-      executePageBulkExportJob: vi.fn(),
-    };
-    jobManager = new PageBulkExportJobManager(pageBulkExportServiceMock);
-  });
-
-  describe('canExecuteNextJob', () => {
-    it('should return true if jobs in progress are less than the limit', () => {
-      // act, assert
-      expect(jobManager.canExecuteNextJob()).toBe(true);
-    });
-
-    it('should return false if jobs in progress exceed the limit', () => {
-      // arrange
-      jobManager.jobsInProgress = {
-        job1: { stream: undefined },
-        job2: { stream: undefined },
-        job3: { stream: undefined },
-      };
-
-      // act, assert
-      expect(jobManager.canExecuteNextJob()).toBe(false);
-    });
-  });
-
-  describe('getJobInProgress', () => {
-    it('should return the info of job in progress', () => {
-      // arrange
-      const jobId = 'job1';
-      jobManager.jobsInProgress[jobId] = { stream: undefined };
-
-      // act, assert
-      expect(jobManager.getJobInProgress(jobId)).toEqual({ stream: undefined });
-    });
-
-    it('should return undefined if job is not in progress', () => {
-      // arrange
-      const jobId = 'job1';
-
-      // act, assert
-      expect(jobManager.getJobInProgress(jobId)).toBeUndefined();
-    });
-  });
-
-  describe('addJob', () => {
-    it('should add the job to jobsInProgress if under the parallelExecLimit', () => {
-      // arrange
-      const job = { _id: 'job1' } as HydratedDocument<PageBulkExportJobDocument>;
-      expect(jobManager.jobQueue.length).toBe(0);
-
-      // act
-      jobManager.addJob(job, { endpoint: '/test/endpoint' });
-
-      // assert
-      expect(jobManager.jobQueue.length).toBe(0);
-      expect(jobManager.jobsInProgress[job._id.toString()]).toEqual({ stream: undefined });
-      expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(job, { endpoint: '/test/endpoint' });
-    });
-
-    it('should queue the job if the parallelExecLimit is reached', () => {
-      // arrange
-      jobManager.jobsInProgress = {
-        job1: { stream: undefined },
-        job2: { stream: undefined },
-        job3: { stream: undefined },
-      };
-      const job = { _id: 'job2' } as HydratedDocument<PageBulkExportJobDocument>;
-      expect(jobManager.jobQueue.length).toBe(0);
-
-      // act
-      jobManager.addJob(job);
-
-      // assert
-      expect(jobManager.jobQueue.length).toBe(1);
-      expect(jobManager.jobQueue[0]).toEqual({ job });
-      expect(pageBulkExportServiceMock.executePageBulkExportJob).not.toHaveBeenCalled();
-    });
-  });
-
-  describe('updateJobStream', () => {
-    it('should set a new stream when there are no streams executing for the job', () => {
-      // arrange
-      const jobId = 'job1';
-      const mockStream = new Readable();
-      jobManager.jobsInProgress[jobId] = { stream: undefined };
-
-      // act
-      jobManager.updateJobStream(jobId, mockStream);
-
-      // assert
-      expect(jobManager.jobsInProgress[jobId].stream).toBe(mockStream);
-    });
-
-    it('should set a new stream when previous stream is finished', async() => {
-      // arrange
-      const jobId = 'job1';
-      const oldStream = new Readable({
-        read(size) {
-          // End the stream immediately
-          this.push(null);
-        },
-      });
-      oldStream.read();
-      await finished(oldStream);
-      const newStream = vi.fn().mockImplementation(() => {
-        const stream = new Readable();
-        stream.destroy = vi.fn();
-        return stream;
-      })() as unknown as Readable;
-      jobManager.addJob({ _id: jobId } as HydratedDocument<PageBulkExportJobDocument>);
-
-      // act
-      jobManager.updateJobStream(jobId, oldStream);
-
-      // assert
-      expect(oldStream.readableEnded).toBe(true);
-      jobManager.updateJobStream(jobId, newStream);
-      expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream);
-    });
-
-    it('should destroy non-finished stream with an error before setting a new stream', () => {
-      // arrange
-      const jobId = 'job1';
-      const oldStream = vi.fn().mockImplementation(() => {
-        const stream = new Readable();
-        stream.destroy = vi.fn();
-        return stream;
-      })();
-      const newStream = new Readable();
-      const destroySpy = vi.spyOn(oldStream, 'destroy');
-      jobManager.addJob({ _id: jobId } as HydratedDocument<PageBulkExportJobDocument>);
-      jobManager.updateJobStream(jobId, oldStream);
-
-      // act
-      jobManager.updateJobStream(jobId, newStream);
-      expect(destroySpy).toHaveBeenCalledWith(expect.any(Error));
-
-      // assert
-      expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream);
-    });
-
-    it('should destroy the new stream with BulkExportJobExpiredError if job is not in progress', () => {
-      // arrange
-      const jobId = 'job1';
-      const newStream = vi.fn().mockImplementation(() => {
-        const stream = new Readable();
-        stream.destroy = vi.fn();
-        return stream;
-      })();
-      const destroySpy = vi.spyOn(newStream, 'destroy');
-
-      // act
-      jobManager.updateJobStream(jobId, newStream);
-
-      // assert
-      expect(destroySpy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError));
-    });
-  });
-
-  describe('removeJobInProgressAndQueueNextJob', () => {
-    it('should remove the job in progress and queue the next job', () => {
-      // arrange
-      const jobId = 'job1';
-      const mockStream = vi.fn().mockImplementation(() => {
-        const stream = new Readable();
-        stream.destroy = vi.fn();
-        return stream;
-      })();
-      vi.spyOn(mockStream, 'destroy');
-      const nextJob = { _id: 'job2' } as HydratedDocument<PageBulkExportJobDocument>;
-      jobManager.jobsInProgress[jobId] = { stream: mockStream };
-      jobManager.jobQueue.push({ job: nextJob });
-      expect(jobManager.jobQueue.length).toBe(1);
-
-      // act
-      jobManager.removeJobInProgressAndQueueNextJob(jobId);
-
-      // assert
-      expect(jobManager.jobQueue.length).toBe(0);
-      expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError));
-      expect(jobManager.jobsInProgress[jobId]).toBeUndefined();
-      expect(jobManager.jobsInProgress[nextJob._id.toString()]).toEqual({ stream: undefined });
-      expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(nextJob, undefined);
-    });
-
-    it('should destroy the stream with a BulkExportJobRestartedError if job was restarted', () => {
-      // arrange
-      const jobId = 'job1';
-      const mockStream = vi.fn().mockImplementation(() => {
-        const stream = new Readable();
-        stream.destroy = vi.fn();
-        return stream;
-      })();
-      vi.spyOn(mockStream, 'destroy');
-      jobManager.jobsInProgress[jobId] = { stream: mockStream };
-
-      // act
-      jobManager.removeJobInProgressAndQueueNextJob(jobId, true);
-
-      // assert
-      expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobRestartedError));
-      expect(jobManager.jobsInProgress[jobId]).toBeUndefined();
-    });
-  });
-});

+ 0 - 125
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts

@@ -1,125 +0,0 @@
-import type { Readable } from 'stream';
-
-import type { HydratedDocument } from 'mongoose';
-
-import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
-import { configManager } from '~/server/service/config-manager';
-
-import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
-
-import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
-
-import type { ActivityParameters, IPageBulkExportService } from '.';
-
-/**
- * Manage PageBulkExportJob execution.
- * - Keep track of jobs being executed and enable destroying the stream if the job is terminated
- * - Limit the number of jobs being executed in parallel
- * - Queue jobs to be executed in order
- */
-export class PageBulkExportJobManager {
-
-  pageBulkExportService: IPageBulkExportService;
-
-  private parallelExecLimit: number;
-
-  // contains jobs being executed and it's information
-  // the key is the _id of PageBulkExportJob and the value contains the stream of the job
-  jobsInProgress: {
-    [key: string]: { stream: Readable | undefined };
-  } = {};
-
-  // jobs waiting to be executed in order
-  jobQueue: { job: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters }[] = [];
-
-  constructor(pageBulkExportService: IPageBulkExportService) {
-    this.pageBulkExportService = pageBulkExportService;
-    this.parallelExecLimit = configManager.getConfig('crowi', 'app:pageBulkExportParallelExecLimit');
-  }
-
-  canExecuteNextJob(): boolean {
-    return Object.keys(this.jobsInProgress).length < this.parallelExecLimit;
-  }
-
-  /**
-   * Get the information of a job in progress.
-   * A getter method that includes "undefined" in the return type
-   */
-  getJobInProgress(jobId: ObjectIdLike): { stream: Readable | undefined } | undefined {
-    return this.jobsInProgress[jobId.toString()];
-  }
-
-  /**
-   * Add a job to the queue or execute it if the number of jobs in progress is less than the limit
-   * @param job job to add or execute
-   * @param activityParameters parameters to record user activity
-   */
-  addJob(job: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters): void {
-    if (this.canExecuteNextJob()) {
-      this.jobsInProgress[job._id.toString()] = { stream: undefined };
-      this.pageBulkExportService.executePageBulkExportJob(job, activityParameters);
-    }
-    else {
-      this.jobQueue.push({ job, activityParameters });
-    }
-  }
-
-  /**
-   * Update the info of which stream is being executed for a job
-   * @param jobId id of job to update
-   * @param stream the new stream being executed for the job
-   */
-  updateJobStream(jobId: ObjectIdLike, stream: Readable): void {
-    const jobInProgress = this.getJobInProgress(jobId);
-    if (jobInProgress != null) {
-      if (jobInProgress.stream != null && !jobInProgress.stream.readableEnded) {
-        jobInProgress.stream.destroy(new Error('Stream not finished before next stream started'));
-      }
-      jobInProgress.stream = stream;
-    }
-    else {
-      // job was terminated beforehand, so destroy the stream
-      stream.destroy(new BulkExportJobExpiredError());
-    }
-  }
-
-  /**
-   * Remove a job in execution and queue the next job if there are any
-   * @param jobId id of job to remove
-   * @param isJobRestarted whether or not the job was restarted
-   */
-  removeJobInProgressAndQueueNextJob(jobId: ObjectIdLike, isJobRestarted = false): void {
-    this.removeJobInProgress(jobId, isJobRestarted);
-
-    if (this.jobQueue.length > 0) {
-      while (this.canExecuteNextJob() && this.jobQueue.length > 0) {
-        const nextJob = this.jobQueue.shift();
-        if (nextJob != null) {
-          this.jobsInProgress[nextJob.job._id.toString()] = { stream: undefined };
-          this.pageBulkExportService.executePageBulkExportJob(nextJob.job, nextJob.activityParameters);
-        }
-      }
-    }
-  }
-
-  /**
-   * Remove a job in execution and destroy it's stream process
-   * @param jobId id of job to remove
-   * @param isJobRestarted whether or not the job was restarted
-   */
-  private removeJobInProgress(jobId: ObjectIdLike, isJobRestarted = false): void {
-    const jobInProgress = this.getJobInProgress(jobId);
-    if (jobInProgress == null) return;
-
-    if (jobInProgress.stream != null) {
-      if (isJobRestarted) {
-        jobInProgress.stream.destroy(new BulkExportJobRestartedError());
-      }
-      else {
-        jobInProgress.stream.destroy(new BulkExportJobExpiredError());
-      }
-    }
-    delete this.jobsInProgress[jobId.toString()];
-  }
-
-}

+ 0 - 17
apps/app/src/server/crowi/index.js

@@ -12,9 +12,6 @@ import pkg from '^/package.json';
 
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
-import { PageBulkExportJobInProgressStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
-import PageBulkExportJob from '~/features/page-bulk-export/server/models/page-bulk-export-job';
-import instanciatePageBulkExportService, { pageBulkExportService } from '~/features/page-bulk-export/server/service/page-bulk-export';
 import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
 import { startCronIfEnabled as startOpenaiCronIfEnabled } from '~/features/openai/server/services/cron';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
@@ -173,7 +170,6 @@ Crowi.prototype.init = async function() {
     this.setupUserGroupService(),
     this.setupExport(),
     this.setupImport(),
-    this.setupPageBulkExportService(),
     this.setupGrowiPluginService(),
     this.setupPageService(),
     this.setupInAppNotificationService(),
@@ -194,8 +190,6 @@ Crowi.prototype.init = async function() {
   ]);
 
   await normalizeData();
-
-  this.resumeIncompletePageBulkExportJobs();
 };
 
 /**
@@ -683,10 +677,6 @@ Crowi.prototype.setupExport = async function() {
   instanciateExportService(this);
 };
 
-Crowi.prototype.setupPageBulkExportService = async function() {
-  instanciatePageBulkExportService(this);
-};
-
 Crowi.prototype.setupImport = async function() {
   initializeImportService(this);
 };
@@ -779,11 +769,4 @@ Crowi.prototype.setupExternalUserGroupSyncService = function() {
   this.keycloakUserGroupSyncService = new KeycloakUserGroupSyncService(this.s2sMessagingService, this.socketIoService);
 };
 
-Crowi.prototype.resumeIncompletePageBulkExportJobs = async function() {
-  const jobs = await PageBulkExportJob.find({
-    $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
-  });
-  jobs.forEach(job => pageBulkExportService?.pageBulkExportJobManager?.addJob(job));
-};
-
 export default Crowi;