Просмотр исходного кода

Merge pull request #8997 from weseek/feat/78039-151411-cancel-streams-on-export-job-abort

Feat/78039 151411 cancel streams on export job abort
Futa Arai 1 год назад
Родитель
Сommit
1c39b0012a

+ 2 - 1
apps/app/src/features/page-bulk-export/server/routes/apiv3/page-bulk-export.ts

@@ -7,7 +7,8 @@ import type Crowi from '~/server/crowi';
 import type { ApiV3Response } from '~/server/routes/apiv3/interfaces/apiv3-response';
 import loggerFactory from '~/utils/logger';
 
-import { DuplicateBulkExportJobError, pageBulkExportService } from '../../service/page-bulk-export';
+import { pageBulkExportService } from '../../service/page-bulk-export';
+import { DuplicateBulkExportJobError } from '../../service/page-bulk-export/errors';
 
 const logger = loggerFactory('growi:routes:apiv3:page-bulk-export');
 

+ 3 - 6
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.ts

@@ -45,12 +45,9 @@ class PageBulkExportJobCronService extends CronService {
       createdAt: { $lt: new Date(Date.now() - exportJobExpirationSeconds * 1000) },
     });
 
-    const cleanup = async(job: PageBulkExportJobDocument) => {
-      await pageBulkExportService?.cleanUpExportJobResources(job);
-      await pageBulkExportService?.notifyExportResult(job, SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED);
-    };
-
-    await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, cleanup);
+    if (pageBulkExportService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, pageBulkExportService?.cleanUpExportJobResources);
+    }
   }
 
   /**

+ 15 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/errors.ts

@@ -0,0 +1,15 @@
+export class DuplicateBulkExportJobError extends Error {
+
+  constructor() {
+    super('Duplicate bulk export job is in progress');
+  }
+
+}
+
+export class BulkExportJobExpiredError extends Error {
+
+  constructor() {
+    super('Bulk export job has expired');
+  }
+
+}

+ 29 - 21
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts → apps/app/src/features/page-bulk-export/server/service/page-bulk-export/index.ts

@@ -16,9 +16,9 @@ import mongoose from 'mongoose';
 import type { SupportedActionType } from '~/interfaces/activity';
 import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
 import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
-import type { IAttachmentDocument } from '~/server/models';
-import { Attachment } from '~/server/models';
 import type { ActivityDocument } from '~/server/models/activity';
+import type { IAttachmentDocument } from '~/server/models/attachment';
+import { Attachment } from '~/server/models/attachment';
 import type { PageModel, PageDocument } from '~/server/models/page';
 import Subscription from '~/server/models/subscription';
 import type { FileUploader } from '~/server/service/file-uploader';
@@ -27,11 +27,14 @@ import { preNotifyService } from '~/server/service/pre-notify';
 import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import loggerFactory from '~/utils/logger';
 
-import { PageBulkExportFormat, PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
-import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
-import PageBulkExportJob from '../models/page-bulk-export-job';
-import type { PageBulkExportPageSnapshotDocument } from '../models/page-bulk-export-page-snapshot';
-import PageBulkExportPageSnapshot from '../models/page-bulk-export-page-snapshot';
+import { PageBulkExportFormat, PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../../interfaces/page-bulk-export';
+import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
+import PageBulkExportJob from '../../models/page-bulk-export-job';
+import type { PageBulkExportPageSnapshotDocument } from '../../models/page-bulk-export-page-snapshot';
+import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';
+
+import { BulkExportJobExpiredError, DuplicateBulkExportJobError } from './errors';
+import { PageBulkExportJobStreamManager } from './page-bulk-export-job-stream-manager';
 
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
@@ -41,14 +44,6 @@ type ActivityParameters ={
   endpoint: string;
 }
 
-export class DuplicateBulkExportJobError extends Error {
-
-  constructor() {
-    super('Duplicate bulk export job is in progress');
-  }
-
-}
-
 class PageBulkExportService {
 
   crowi: any;
@@ -62,6 +57,8 @@ class PageBulkExportService {
 
   compressExtension = 'tar.gz';
 
+  pageBulkExportJobStreamManager: PageBulkExportJobStreamManager = new PageBulkExportJobStreamManager();
+
   // temporal path of local fs to output page files before upload
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
   tmpOutputRootDir = '/tmp/page-bulk-export';
@@ -127,26 +124,31 @@ class PageBulkExportService {
     }
     catch (err) {
       logger.error(err);
-      await this.notifyExportResultAndCleanUp(false, pageBulkExportJob, activityParameters);
+      if (err instanceof BulkExportJobExpiredError) {
+        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, pageBulkExportJob, activityParameters);
+      }
+      else {
+        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED, pageBulkExportJob, activityParameters);
+      }
       return;
     }
 
-    await this.notifyExportResultAndCleanUp(true, pageBulkExportJob, activityParameters);
+    await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob, activityParameters);
   }
 
   /**
    * Notify the user of the export result, and cleanup the resources used in the export process
-   * @param succeeded whether the export was successful
+   * @param action whether the export was successful
    * @param pageBulkExportJob the page bulk export job
    * @param activityParameters parameters to record user activity
    */
   private async notifyExportResultAndCleanUp(
-      succeeded: boolean,
+      action: SupportedActionType,
       pageBulkExportJob: PageBulkExportJobDocument,
       activityParameters?: ActivityParameters,
   ): Promise<void> {
-    const action = succeeded ? SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED : SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED;
-    pageBulkExportJob.status = succeeded ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
+    pageBulkExportJob.status = action === SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED
+      ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
 
     try {
       await pageBulkExportJob.save();
@@ -200,6 +202,8 @@ class PageBulkExportService {
       },
     });
 
+    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pagesReadable);
+
     await pipelinePromise(pagesReadable, pageSnapshotsWritable);
   }
 
@@ -219,6 +223,8 @@ class PageBulkExportService {
 
     const pagesWritable = this.getPageWritable(pageBulkExportJob);
 
+    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pageSnapshotsReadable);
+
     return pipelinePromise(pageSnapshotsReadable, pagesWritable);
   }
 
@@ -377,6 +383,8 @@ class PageBulkExportService {
    * - abort multipart upload
    */
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument) {
+    this.pageBulkExportJobStreamManager.destroyJobStream(pageBulkExportJob._id);
+
     const promises = [
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
       fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),

+ 33 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-stream-manager.ts

@@ -0,0 +1,33 @@
+import type { Readable } from 'stream';
+
+import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
+
+import { BulkExportJobExpiredError } from './errors';
+
+/**
+ * Used to keep track of streams currently being executed, and enable destroying them
+ */
+export class PageBulkExportJobStreamManager {
+
+  private jobStreams: Record<string, Readable> = {};
+
+  addJobStream(jobId: ObjectIdLike, stream: Readable): void {
+    if (this.jobStreams[jobId.toString()] != null) {
+      this.destroyJobStream(jobId);
+    }
+    this.jobStreams[jobId.toString()] = stream;
+  }
+
+  removeJobStream(jobId: ObjectIdLike): void {
+    delete this.jobStreams[jobId.toString()];
+  }
+
+  destroyJobStream(jobId: ObjectIdLike): void {
+    const stream = this.jobStreams[jobId.toString()];
+    if (stream != null) {
+      stream.destroy(new BulkExportJobExpiredError());
+    }
+    this.removeJobStream(jobId);
+  }
+
+}

+ 1 - 1
apps/app/src/server/crowi/index.js

@@ -12,7 +12,7 @@ import pkg from '^/package.json';
 
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
-import { PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+import { PageBulkExportJobInProgressStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
 import PageBulkExportJob from '~/features/page-bulk-export/server/models/page-bulk-export-job';
 import instanciatePageBulkExportService, { pageBulkExportService } from '~/features/page-bulk-export/server/service/page-bulk-export';
 import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';