Просмотр исходного кода

Merge pull request #10366 from growilabs/fix/172102-172103-bulk-export-cleanup-notif-on-expire

fix: Bulk export cleanup and notification occasionally not working on job expire
mergify[bot] 6 месяцев назад
Родитель
Сommit
15e0ecd2ea

+ 1 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.integ.ts

@@ -31,6 +31,7 @@ vi.mock('./page-bulk-export-job-cron', () => {
   return {
   return {
     pageBulkExportJobCronService: {
     pageBulkExportJobCronService: {
       cleanUpExportJobResources: vi.fn(() => Promise.resolve()),
       cleanUpExportJobResources: vi.fn(() => Promise.resolve()),
+      notifyExportResultAndCleanUp: vi.fn(() => Promise.resolve()),
     },
     },
   };
   };
 });
 });

+ 10 - 7
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts

@@ -1,5 +1,5 @@
 import type { HydratedDocument } from 'mongoose';
 import type { HydratedDocument } from 'mongoose';
-
+import { SupportedAction } from '~/interfaces/activity';
 import type Crowi from '~/server/crowi';
 import type Crowi from '~/server/crowi';
 import { configManager } from '~/server/service/config-manager';
 import { configManager } from '~/server/service/config-manager';
 import CronService from '~/server/service/cron';
 import CronService from '~/server/service/cron';
@@ -57,13 +57,16 @@ class PageBulkExportJobCleanUpCronService extends CronService {
       },
       },
     });
     });
 
 
-    if (pageBulkExportJobCronService != null) {
-      await this.cleanUpAndDeleteBulkExportJobs(
-        expiredExportJobs,
-        pageBulkExportJobCronService.cleanUpExportJobResources.bind(
-          pageBulkExportJobCronService,
-        ),
+    const cleanUp = async (job: PageBulkExportJobDocument) => {
+      await pageBulkExportJobCronService?.notifyExportResultAndCleanUp(
+        SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
+        job,
       );
       );
+      logger.error(`Bulk export job has expired: ${job._id.toString()}`);
+    };
+
+    if (pageBulkExportJobCronService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, cleanUp);
     }
     }
   }
   }
 
 

+ 6 - 4
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/errors.ts

@@ -1,11 +1,13 @@
+import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
+
 export class BulkExportJobExpiredError extends Error {
 export class BulkExportJobExpiredError extends Error {
-  constructor() {
-    super('Bulk export job has expired');
+  constructor(pageBulkExportJob: PageBulkExportJobDocument) {
+    super(`Bulk export job has expired: ${pageBulkExportJob._id.toString()}`);
   }
   }
 }
 }
 
 
-export class BulkExportJobRestartedError extends Error {
+export class BulkExportJobStreamDestroyedByCleanupError extends Error {
   constructor() {
   constructor() {
-    super('Bulk export job has restarted');
+    super('Bulk export job stream was destroyed by cleanup');
   }
   }
 }
 }

+ 38 - 24
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -1,6 +1,6 @@
 import fs from 'node:fs';
 import fs from 'node:fs';
 import path from 'node:path';
 import path from 'node:path';
-import type { Readable } from 'node:stream';
+import type { Readable, Writable } from 'node:stream';
 import type { IUser } from '@growi/core';
 import type { IUser } from '@growi/core';
 import { getIdForRef, isPopulated } from '@growi/core';
 import { getIdForRef, isPopulated } from '@growi/core';
 import mongoose from 'mongoose';
 import mongoose from 'mongoose';
@@ -26,7 +26,7 @@ import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snaps
 
 
 import {
 import {
   BulkExportJobExpiredError,
   BulkExportJobExpiredError,
-  BulkExportJobRestartedError,
+  BulkExportJobStreamDestroyedByCleanupError,
 } from './errors';
 } from './errors';
 import { requestPdfConverter } from './request-pdf-converter';
 import { requestPdfConverter } from './request-pdf-converter';
 import { compressAndUpload } from './steps/compress-and-upload';
 import { compressAndUpload } from './steps/compress-and-upload';
@@ -40,7 +40,10 @@ export interface IPageBulkExportJobCronService {
   pageBatchSize: number;
   pageBatchSize: number;
   maxPartSize: number;
   maxPartSize: number;
   compressExtension: string;
   compressExtension: string;
-  setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
+  setStreamsInExecution(
+    jobId: ObjectIdLike,
+    ...streams: (Readable | Writable)[]
+  ): void;
   removeStreamInExecution(jobId: ObjectIdLike): void;
   removeStreamInExecution(jobId: ObjectIdLike): void;
   handleError(
   handleError(
     err: Error | null,
     err: Error | null,
@@ -78,10 +81,10 @@ class PageBulkExportJobCronService
   // temporal path of local fs to output page files before upload
   // temporal path of local fs to output page files before upload
   tmpOutputRootDir = '/tmp/page-bulk-export';
   tmpOutputRootDir = '/tmp/page-bulk-export';
 
 
-  // Keep track of the stream executed for PageBulkExportJob to destroy it on job failure.
-  // The key is the id of a PageBulkExportJob.
+  // Keep track of all streams executed for PageBulkExportJob to destroy them on job failure.
+  // The key is the id of a PageBulkExportJob, value is array of streams.
   private streamInExecutionMemo: {
   private streamInExecutionMemo: {
-    [key: string]: Readable;
+    [key: string]: (Readable | Writable)[];
   } = {};
   } = {};
 
 
   private parallelExecLimit: number;
   private parallelExecLimit: number;
@@ -133,22 +136,27 @@ class PageBulkExportJobCronService
   }
   }
 
 
   /**
   /**
-   * Get the stream in execution for a job.
+   * Get all streams in execution for a job.
    * A getter method that includes "undefined" in the return type
    * A getter method that includes "undefined" in the return type
    */
    */
-  getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
+  getStreamsInExecution(
+    jobId: ObjectIdLike,
+  ): (Readable | Writable)[] | undefined {
     return this.streamInExecutionMemo[jobId.toString()];
     return this.streamInExecutionMemo[jobId.toString()];
   }
   }
 
 
   /**
   /**
-   * Set the stream in execution for a job
+   * Set streams in execution for a job
    */
    */
-  setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
-    this.streamInExecutionMemo[jobId.toString()] = stream;
+  setStreamsInExecution(
+    jobId: ObjectIdLike,
+    ...streams: (Readable | Writable)[]
+  ) {
+    this.streamInExecutionMemo[jobId.toString()] = streams;
   }
   }
 
 
   /**
   /**
-   * Remove the stream in execution for a job
+   * Remove all streams in execution for a job
    */
    */
   removeStreamInExecution(jobId: ObjectIdLike) {
   removeStreamInExecution(jobId: ObjectIdLike) {
     delete this.streamInExecutionMemo[jobId.toString()];
     delete this.streamInExecutionMemo[jobId.toString()];
@@ -161,7 +169,7 @@ class PageBulkExportJobCronService
   async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
   async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
     try {
     try {
       if (pageBulkExportJob.restartFlag) {
       if (pageBulkExportJob.restartFlag) {
-        await this.cleanUpExportJobResources(pageBulkExportJob, true);
+        await this.cleanUpExportJobResources(pageBulkExportJob);
         pageBulkExportJob.restartFlag = false;
         pageBulkExportJob.restartFlag = false;
         pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
         pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
         pageBulkExportJob.statusOnPreviousCronExec = undefined;
         pageBulkExportJob.statusOnPreviousCronExec = undefined;
@@ -226,9 +234,6 @@ class PageBulkExportJobCronService
         SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
         SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
         pageBulkExportJob,
         pageBulkExportJob,
       );
       );
-    } else if (err instanceof BulkExportJobRestartedError) {
-      logger.info(err.message);
-      await this.cleanUpExportJobResources(pageBulkExportJob);
     } else {
     } else {
       logger.error(err);
       logger.error(err);
       await this.notifyExportResultAndCleanUp(
       await this.notifyExportResultAndCleanUp(
@@ -269,15 +274,24 @@ class PageBulkExportJobCronService
    */
    */
   async cleanUpExportJobResources(
   async cleanUpExportJobResources(
     pageBulkExportJob: PageBulkExportJobDocument,
     pageBulkExportJob: PageBulkExportJobDocument,
-    restarted = false,
   ) {
   ) {
-    const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id);
-    if (streamInExecution != null) {
-      if (restarted) {
-        streamInExecution.destroy(new BulkExportJobRestartedError());
-      } else {
-        streamInExecution.destroy(new BulkExportJobExpiredError());
-      }
+    const streamsInExecution = this.getStreamsInExecution(
+      pageBulkExportJob._id,
+    );
+    if (streamsInExecution != null && streamsInExecution.length > 0) {
+      // Wait for all streams to be destroyed before proceeding with cleanup
+      await Promise.allSettled(
+        streamsInExecution.map((stream) => {
+          if (!stream.destroyed) {
+            return new Promise<void>((resolve) => {
+              stream.destroy(new BulkExportJobStreamDestroyedByCleanupError());
+              stream.once('close', () => resolve());
+            });
+          }
+          return Promise.resolve();
+        }),
+      );
+
       this.removeStreamInExecution(pageBulkExportJob._id);
       this.removeStreamInExecution(pageBulkExportJob._id);
     }
     }
 
 

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/request-pdf-converter.ts

@@ -49,7 +49,7 @@ export async function requestPdfConverter(
   }
   }
 
 
   if (new Date() > bulkExportJobExpirationDate) {
   if (new Date() > bulkExportJobExpirationDate) {
-    throw new BulkExportJobExpiredError();
+    throw new BulkExportJobExpiredError(pageBulkExportJob);
   }
   }
 
 
   try {
   try {

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts

@@ -76,7 +76,7 @@ export async function compressAndUpload(
 
 
   pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
   pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
   pageArchiver.finalize();
   pageArchiver.finalize();
-  this.setStreamInExecution(pageBulkExportJob._id, pageArchiver);
+  this.setStreamsInExecution(pageBulkExportJob._id, pageArchiver);
 
 
   try {
   try {
     await fileUploadService.uploadAttachment(pageArchiver, attachment);
     await fileUploadService.uploadAttachment(pageArchiver, attachment);

+ 10 - 2
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts

@@ -11,6 +11,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export
 import PageBulkExportJob from '../../../models/page-bulk-export-job';
 import PageBulkExportJob from '../../../models/page-bulk-export-job';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 import type { IPageBulkExportJobCronService } from '..';
 import type { IPageBulkExportJobCronService } from '..';
+import { BulkExportJobStreamDestroyedByCleanupError } from '../errors';
 
 
 async function reuseDuplicateExportIfExists(
 async function reuseDuplicateExportIfExists(
   this: IPageBulkExportJobCronService,
   this: IPageBulkExportJobCronService,
@@ -100,9 +101,16 @@ export async function createPageSnapshotsAsync(
     },
     },
   });
   });
 
 
-  this.setStreamInExecution(pageBulkExportJob._id, pagesReadable);
+  this.setStreamsInExecution(
+    pageBulkExportJob._id,
+    pagesReadable,
+    pageSnapshotsWritable,
+  );
 
 
   pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
   pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
-    this.handleError(err, pageBulkExportJob);
+    // prevent overlapping cleanup
+    if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) {
+      this.handleError(err, pageBulkExportJob);
+    }
   });
   });
 }
 }

+ 10 - 2
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts

@@ -20,6 +20,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export
 import type { PageBulkExportPageSnapshotDocument } from '../../../models/page-bulk-export-page-snapshot';
 import type { PageBulkExportPageSnapshotDocument } from '../../../models/page-bulk-export-page-snapshot';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 import type { IPageBulkExportJobCronService } from '..';
 import type { IPageBulkExportJobCronService } from '..';
+import { BulkExportJobStreamDestroyedByCleanupError } from '../errors';
 
 
 async function convertMdToHtml(
 async function convertMdToHtml(
   md: string,
   md: string,
@@ -132,9 +133,16 @@ export async function exportPagesToFsAsync(
 
 
   const pagesWritable = await getPageWritable.bind(this)(pageBulkExportJob);
   const pagesWritable = await getPageWritable.bind(this)(pageBulkExportJob);
 
 
-  this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable);
+  this.setStreamsInExecution(
+    pageBulkExportJob._id,
+    pageSnapshotsReadable,
+    pagesWritable,
+  );
 
 
   pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
   pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
-    this.handleError(err, pageBulkExportJob);
+    // prevent overlapping cleanup
+    if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) {
+      this.handleError(err, pageBulkExportJob);
+    }
   });
   });
 }
 }