Просмотр исходного кода

fix bulk export cleanup and notification on expire

Futa Arai 6 месяцев назад
Родитель
Сommit
31a3c72780

+ 9 - 7
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts

@@ -1,5 +1,5 @@
 import type { HydratedDocument } from 'mongoose';
-
+import { SupportedAction } from '~/interfaces/activity';
 import type Crowi from '~/server/crowi';
 import { configManager } from '~/server/service/config-manager';
 import CronService from '~/server/service/cron';
@@ -57,13 +57,15 @@ class PageBulkExportJobCleanUpCronService extends CronService {
       },
     });
 
-    if (pageBulkExportJobCronService != null) {
-      await this.cleanUpAndDeleteBulkExportJobs(
-        expiredExportJobs,
-        pageBulkExportJobCronService.cleanUpExportJobResources.bind(
-          pageBulkExportJobCronService,
-        ),
+    const cleanUp = async (job: PageBulkExportJobDocument) => {
+      await pageBulkExportJobCronService?.notifyExportResultAndCleanUp(
+        SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
+        job,
       );
+    };
+
+    if (pageBulkExportJobCronService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, cleanUp);
     }
   }
 

+ 2 - 2
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/errors.ts

@@ -4,8 +4,8 @@ export class BulkExportJobExpiredError extends Error {
   }
 }
 
-export class BulkExportJobRestartedError extends Error {
+export class BulkExportJobStreamDestroyedByCleanupError extends Error {
   constructor() {
-    super('Bulk export job has restarted');
+    super('Bulk export job stream was destroyed by cleanup');
   }
 }

+ 31 - 24
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -1,6 +1,6 @@
 import fs from 'node:fs';
 import path from 'node:path';
-import type { Readable } from 'node:stream';
+import type { Readable, Writable } from 'node:stream';
 import type { IUser } from '@growi/core';
 import { getIdForRef, isPopulated } from '@growi/core';
 import mongoose from 'mongoose';
@@ -26,7 +26,7 @@ import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snaps
 
 import {
   BulkExportJobExpiredError,
-  BulkExportJobRestartedError,
+  BulkExportJobStreamDestroyedByCleanupError,
 } from './errors';
 import { requestPdfConverter } from './request-pdf-converter';
 import { compressAndUpload } from './steps/compress-and-upload';
@@ -40,7 +40,10 @@ export interface IPageBulkExportJobCronService {
   pageBatchSize: number;
   maxPartSize: number;
   compressExtension: string;
-  setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
+  setStreamsInExecution(
+    jobId: ObjectIdLike,
+    ...streams: (Readable | Writable)[]
+  ): void;
   removeStreamInExecution(jobId: ObjectIdLike): void;
   handleError(
     err: Error | null,
@@ -78,10 +81,10 @@ class PageBulkExportJobCronService
   // temporal path of local fs to output page files before upload
   tmpOutputRootDir = '/tmp/page-bulk-export';
 
-  // Keep track of the stream executed for PageBulkExportJob to destroy it on job failure.
-  // The key is the id of a PageBulkExportJob.
+  // Keep track of all streams executed for PageBulkExportJob to destroy them on job failure.
+  // The key is the id of a PageBulkExportJob, value is array of streams.
   private streamInExecutionMemo: {
-    [key: string]: Readable;
+    [key: string]: (Readable | Writable)[];
   } = {};
 
   private parallelExecLimit: number;
@@ -133,22 +136,27 @@ class PageBulkExportJobCronService
   }
 
   /**
-   * Get the stream in execution for a job.
+   * Get all streams in execution for a job.
    * A getter method that includes "undefined" in the return type
    */
-  getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
+  getStreamsInExecution(
+    jobId: ObjectIdLike,
+  ): (Readable | Writable)[] | undefined {
     return this.streamInExecutionMemo[jobId.toString()];
   }
 
   /**
-   * Set the stream in execution for a job
+   * Set streams in execution for a job
    */
-  setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
-    this.streamInExecutionMemo[jobId.toString()] = stream;
+  setStreamsInExecution(
+    jobId: ObjectIdLike,
+    ...streams: (Readable | Writable)[]
+  ) {
+    this.streamInExecutionMemo[jobId.toString()] = streams;
   }
 
   /**
-   * Remove the stream in execution for a job
+   * Remove all streams in execution for a job
    */
   removeStreamInExecution(jobId: ObjectIdLike) {
     delete this.streamInExecutionMemo[jobId.toString()];
@@ -161,7 +169,7 @@ class PageBulkExportJobCronService
   async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
     try {
       if (pageBulkExportJob.restartFlag) {
-        await this.cleanUpExportJobResources(pageBulkExportJob, true);
+        await this.cleanUpExportJobResources(pageBulkExportJob);
         pageBulkExportJob.restartFlag = false;
         pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
         pageBulkExportJob.statusOnPreviousCronExec = undefined;
@@ -226,9 +234,6 @@ class PageBulkExportJobCronService
         SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
         pageBulkExportJob,
       );
-    } else if (err instanceof BulkExportJobRestartedError) {
-      logger.info(err.message);
-      await this.cleanUpExportJobResources(pageBulkExportJob);
     } else {
       logger.error(err);
       await this.notifyExportResultAndCleanUp(
@@ -269,15 +274,17 @@ class PageBulkExportJobCronService
    */
   async cleanUpExportJobResources(
     pageBulkExportJob: PageBulkExportJobDocument,
-    restarted = false,
   ) {
-    const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id);
-    if (streamInExecution != null) {
-      if (restarted) {
-        streamInExecution.destroy(new BulkExportJobRestartedError());
-      } else {
-        streamInExecution.destroy(new BulkExportJobExpiredError());
-      }
+    const streamsInExecution = this.getStreamsInExecution(
+      pageBulkExportJob._id,
+    );
+    if (streamsInExecution != null && streamsInExecution.length > 0) {
+      streamsInExecution.forEach((stream) => {
+        if (!stream.destroyed) {
+          stream.destroy(new BulkExportJobStreamDestroyedByCleanupError());
+        }
+      });
+
       this.removeStreamInExecution(pageBulkExportJob._id);
     }
 

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts

@@ -76,7 +76,7 @@ export async function compressAndUpload(
 
   pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
   pageArchiver.finalize();
-  this.setStreamInExecution(pageBulkExportJob._id, pageArchiver);
+  this.setStreamsInExecution(pageBulkExportJob._id, pageArchiver);
 
   try {
     await fileUploadService.uploadAttachment(pageArchiver, attachment);

+ 10 - 2
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts

@@ -11,6 +11,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export
 import PageBulkExportJob from '../../../models/page-bulk-export-job';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 import type { IPageBulkExportJobCronService } from '..';
+import { BulkExportJobStreamDestroyedByCleanupError } from '../errors';
 
 async function reuseDuplicateExportIfExists(
   this: IPageBulkExportJobCronService,
@@ -100,9 +101,16 @@ export async function createPageSnapshotsAsync(
     },
   });
 
-  this.setStreamInExecution(pageBulkExportJob._id, pagesReadable);
+  this.setStreamsInExecution(
+    pageBulkExportJob._id,
+    pagesReadable,
+    pageSnapshotsWritable,
+  );
 
   pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
-    this.handleError(err, pageBulkExportJob);
+    // prevent overlapping cleanup
+    if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) {
+      this.handleError(err, pageBulkExportJob);
+    }
   });
 }

+ 10 - 2
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts

@@ -20,6 +20,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export
 import type { PageBulkExportPageSnapshotDocument } from '../../../models/page-bulk-export-page-snapshot';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 import type { IPageBulkExportJobCronService } from '..';
+import { BulkExportJobStreamDestroyedByCleanupError } from '../errors';
 
 async function convertMdToHtml(
   md: string,
@@ -132,9 +133,16 @@ export async function exportPagesToFsAsync(
 
   const pagesWritable = await getPageWritable.bind(this)(pageBulkExportJob);
 
-  this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable);
+  this.setStreamsInExecution(
+    pageBulkExportJob._id,
+    pageSnapshotsReadable,
+    pagesWritable,
+  );
 
   pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
-    this.handleError(err, pageBulkExportJob);
+    // prevent overlapping cleanup
+    if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) {
+      this.handleError(err, pageBulkExportJob);
+    }
   });
 }