Просмотр исходного кода

cancel streams on export job abort

Futa Arai 1 год назад
Родитель
Сommit
547b2d3fa7

+ 1 - 7
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.ts

@@ -33,13 +33,7 @@ class PageBulkExportJobCronService extends CronService {
       createdAt: { $lt: new Date(Date.now() - exportJobExpirationSeconds * 1000) },
     });
     for (const expiredExportJob of expiredExportJobs) {
-      try {
-        // eslint-disable-next-line no-await-in-loop
-        await pageBulkExportService?.notifyExportResult(expiredExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED);
-      }
-      catch (err) {
-        logger.error(err);
-      }
+      pageBulkExportService?.pageBulkExportJobStreamManager?.destroyJobStream(expiredExportJob._id);
       // eslint-disable-next-line no-await-in-loop
       await this.cleanUpAndDeleteBulkExportJob(expiredExportJob);
     }

+ 54 - 7
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -1,9 +1,9 @@
 import fs from 'fs';
 import path from 'path';
+import type { Readable } from 'stream';
 import { Writable } from 'stream';
 import { pipeline as pipelinePromise } from 'stream/promises';
 
-
 import type { HasObjectId } from '@growi/core';
 import {
   getIdForRef, type IPage, isPopulated, SubscriptionStatusType,
@@ -17,6 +17,7 @@ import mongoose from 'mongoose';
 import type { SupportedActionType } from '~/interfaces/activity';
 import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
 import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
+import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
 import type { IAttachmentDocument } from '~/server/models';
 import { Attachment } from '~/server/models';
 import type { ActivityDocument } from '~/server/models/activity';
@@ -45,6 +46,39 @@ export class DuplicateBulkExportJobError extends Error {
 
 }
 
+class BulkExportJobExpiredError extends Error {
+
+  constructor() {
+    super('Bulk export job has expired');
+  }
+
+}
+
+/**
+ * Used to keep track of streams currently being executed, and enable destroying them
+ */
+class PageBulkExportJobStreamManager {
+
+  private jobStreams: Record<string, Readable> = {};
+
+  addJobStream(jobId: ObjectIdLike, stream: Readable) {
+    this.jobStreams[jobId.toString()] = stream;
+  }
+
+  removeJobStream(jobId: ObjectIdLike) {
+    delete this.jobStreams[jobId.toString()];
+  }
+
+  destroyJobStream(jobId: ObjectIdLike) {
+    const stream = this.jobStreams[jobId.toString()];
+    if (stream != null) {
+      stream.destroy(new BulkExportJobExpiredError());
+    }
+    this.removeJobStream(jobId);
+  }
+
+}
+
 class PageBulkExportService {
 
   crowi: any;
@@ -58,6 +92,8 @@ class PageBulkExportService {
 
   compressExtension = 'tar.gz';
 
+  pageBulkExportJobStreamManager: PageBulkExportJobStreamManager = new PageBulkExportJobStreamManager();
+
   // temporal path of local fs to output page files before upload
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
   tmpOutputRootDir = '/tmp/page-bulk-export';
@@ -125,24 +161,29 @@ class PageBulkExportService {
     }
     catch (err) {
       logger.error(err);
-      await this.notifyExportResultAndCleanUp(false, pageBulkExportJob);
+      if (err instanceof BulkExportJobExpiredError) {
+        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, pageBulkExportJob);
+      }
+      else {
+        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED, pageBulkExportJob);
+      }
       return;
     }
 
-    await this.notifyExportResultAndCleanUp(true, pageBulkExportJob);
+    await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
   }
 
   /**
    * Notify the user of the export result, and cleanup the resources used in the export process
-   * @param succeeded whether the export was successful
+   * @param action whether the export was successful
    * @param pageBulkExportJob the page bulk export job
    */
   private async notifyExportResultAndCleanUp(
-      succeeded: boolean,
+      action: SupportedActionType,
       pageBulkExportJob: PageBulkExportJobDocument,
   ): Promise<void> {
-    const action = succeeded ? SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED : SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED;
-    pageBulkExportJob.status = succeeded ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
+    pageBulkExportJob.status = action === SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED
+      ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
 
     try {
       await pageBulkExportJob.save();
@@ -196,6 +237,8 @@ class PageBulkExportService {
       },
     });
 
+    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pagesReadable);
+
     await pipelinePromise(pagesReadable, pageSnapshotsWritable);
   }
 
@@ -215,6 +258,8 @@ class PageBulkExportService {
 
     const pagesWritable = this.getPageWritable(pageBulkExportJob);
 
+    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pageSnapshotsReadable);
+
     return pipelinePromise(pageSnapshotsReadable, pagesWritable);
   }
 
@@ -372,6 +417,8 @@ class PageBulkExportService {
    * - abort multipart upload
    */
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument) {
+    this.pageBulkExportJobStreamManager.removeJobStream(pageBulkExportJob._id);
+
     const promises = [
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
       fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),

+ 1 - 1
apps/app/src/server/service/file-uploader/file-uploader.ts

@@ -164,7 +164,7 @@ export abstract class AbstractFileUploader implements FileUploader {
     throw new Error('Multipart upload not available for file upload type');
   }
 
- abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>;
+  abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>;
 
   /**
    * Abort an existing multipart upload without creating a MultipartUploader instance