فهرست منبع

Merge branch 'feat/78039-151411-cancel-streams-on-export-job-abort' into feat/78039-150777-page-bulk-export-job-cron-test

Futa Arai 1 سال پیش
والد
کامیت
7af47cdd73

+ 40 - 22
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.ts

@@ -1,3 +1,6 @@
+import type { HydratedDocument } from 'mongoose';
+
+import { SupportedAction } from '~/interfaces/activity';
 import { configManager } from '~/server/service/config-manager';
 import { configManager } from '~/server/service/config-manager';
 import CronService from '~/server/service/cron';
 import CronService from '~/server/service/cron';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
@@ -22,6 +25,10 @@ class PageBulkExportJobCronService extends CronService {
     this.crowi = crowi;
     this.crowi = crowi;
   }
   }
 
 
+  override getCronSchedule(): string {
+    return configManager.getConfig('crowi', 'app:pageBulkExportJobCronSchedule');
+  }
+
   override async executeJob(): Promise<void> {
   override async executeJob(): Promise<void> {
     await this.deleteExpiredExportJobs();
     await this.deleteExpiredExportJobs();
     await this.deleteDownloadExpiredExportJobs();
     await this.deleteDownloadExpiredExportJobs();
@@ -37,11 +44,13 @@ class PageBulkExportJobCronService extends CronService {
       $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
       $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
       createdAt: { $lt: new Date(Date.now() - exportJobExpirationSeconds * 1000) },
       createdAt: { $lt: new Date(Date.now() - exportJobExpirationSeconds * 1000) },
     });
     });
-    for (const expiredExportJob of expiredExportJobs) {
-      pageBulkExportService?.pageBulkExportJobStreamManager?.destroyJobStream(expiredExportJob._id);
-      // eslint-disable-next-line no-await-in-loop
-      await this.cleanUpAndDeleteBulkExportJob(expiredExportJob);
-    }
+
+    const cleanup = async(job: PageBulkExportJobDocument) => {
+      await pageBulkExportService?.cleanUpExportJobResources(job);
+      await pageBulkExportService?.notifyExportResult(job, SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED);
+    };
+
+    await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, cleanup);
   }
   }
 
 
   /**
   /**
@@ -53,17 +62,13 @@ class PageBulkExportJobCronService extends CronService {
       status: PageBulkExportJobStatus.completed,
       status: PageBulkExportJobStatus.completed,
       completedAt: { $lt: new Date(Date.now() - downloadExpirationSeconds * 1000) },
       completedAt: { $lt: new Date(Date.now() - downloadExpirationSeconds * 1000) },
     });
     });
-    for (const downloadExpiredExportJob of downloadExpiredExportJobs) {
-      try {
-        // eslint-disable-next-line no-await-in-loop
-        await this.crowi.attachmentService?.removeAttachment(downloadExpiredExportJob.attachment);
-      }
-      catch (err) {
-        logger.error(err);
-      }
-      // eslint-disable-next-line no-await-in-loop
-      await this.cleanUpAndDeleteBulkExportJob(downloadExpiredExportJob);
-    }
+
+    const cleanup = async(job: PageBulkExportJobDocument) => {
+      await pageBulkExportService?.cleanUpExportJobResources(job);
+      await this.crowi.attachmentService?.removeAttachment(job.attachment);
+    };
+
+    await this.cleanUpAndDeleteBulkExportJobs(downloadExpiredExportJobs, cleanup);
   }
   }
 
 
   /**
   /**
@@ -71,15 +76,28 @@ class PageBulkExportJobCronService extends CronService {
    */
    */
   async deleteFailedExportJobs() {
   async deleteFailedExportJobs() {
     const failedExportJobs = await PageBulkExportJob.find({ status: PageBulkExportJobStatus.failed });
     const failedExportJobs = await PageBulkExportJob.find({ status: PageBulkExportJobStatus.failed });
-    for (const failedExportJob of failedExportJobs) {
-      // eslint-disable-next-line no-await-in-loop
-      await this.cleanUpAndDeleteBulkExportJob(failedExportJob);
+
+    if (pageBulkExportService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(failedExportJobs, pageBulkExportService.cleanUpExportJobResources);
     }
     }
   }
   }
 
 
-  async cleanUpAndDeleteBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
-    await pageBulkExportService?.cleanUpExportJobResources(pageBulkExportJob);
-    await pageBulkExportJob.delete();
+  async cleanUpAndDeleteBulkExportJobs(
+      pageBulkExportJobs: HydratedDocument<PageBulkExportJobDocument>[],
+      cleanup: (job: PageBulkExportJobDocument) => Promise<void>,
+  ): Promise<void> {
+    const results = await Promise.allSettled(pageBulkExportJobs.map(job => cleanup(job)));
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
+
+    // Only batch delete jobs which have been successfully cleaned up
+    // Cleanup failed jobs will be retried in the next cron execution
+    const cleanedUpJobs = pageBulkExportJobs.filter((_, index) => results[index].status === 'fulfilled');
+    if (cleanedUpJobs.length > 0) {
+      const cleanedUpJobIds = cleanedUpJobs.map(job => job._id);
+      await PageBulkExportJob.deleteMany({ _id: { $in: cleanedUpJobIds } });
+    }
   }
   }
 
 
 }
 }

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/index.ts

@@ -383,7 +383,7 @@ class PageBulkExportService {
    * - abort multipart upload
    * - abort multipart upload
    */
    */
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument) {
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument) {
-    this.pageBulkExportJobStreamManager.removeJobStream(pageBulkExportJob._id);
+    this.pageBulkExportJobStreamManager?.destroyJobStream(pageBulkExportJob._id);
 
 
     const promises = [
     const promises = [
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),

+ 1 - 1
apps/app/src/features/questionnaire/server/service/questionnaire-cron.integ.ts

@@ -327,7 +327,7 @@ describe('QuestionnaireCronService', () => {
       validProactiveQuestionnaireAnswer,
       validProactiveQuestionnaireAnswer,
     ]);
     ]);
 
 
-    questionnaireCronService.startCron('0 22 * * *');
+    questionnaireCronService.startCron();
 
 
     vi.spyOn(axios, 'get').mockResolvedValue(mockResponse);
     vi.spyOn(axios, 'get').mockResolvedValue(mockResponse);
     vi.spyOn(axios, 'post').mockResolvedValue({ data: { result: 'success' } });
     vi.spyOn(axios, 'post').mockResolvedValue({ data: { result: 'success' } });

+ 2 - 2
apps/app/src/features/questionnaire/server/service/questionnaire-cron.ts

@@ -26,8 +26,8 @@ class QuestionnaireCronService extends CronService {
 
 
   sleep = (msec: number): Promise<void> => new Promise(resolve => setTimeout(resolve, msec));
   sleep = (msec: number): Promise<void> => new Promise(resolve => setTimeout(resolve, msec));
 
 
-  override startCron(cronSchedule: string): void {
-    super.startCron(cronSchedule);
+  override getCronSchedule(): string {
+    return configManager.getConfig('crowi', 'app:questionnaireCronSchedule');
   }
   }
 
 
   override async executeJob(): Promise<void> {
   override async executeJob(): Promise<void> {

+ 2 - 4
apps/app/src/server/crowi/index.js

@@ -327,12 +327,10 @@ Crowi.prototype.setupModels = async function() {
 };
 };
 
 
 Crowi.prototype.setupCron = function() {
 Crowi.prototype.setupCron = function() {
-  const questionnaireCronSchedule = this.configManager.getConfig('crowi', 'app:questionnaireCronSchedule');
-  questionnaireCronService.startCron(questionnaireCronSchedule);
+  questionnaireCronService.startCron();
 
 
   instanciatePageBulkExportJobCronService(this);
   instanciatePageBulkExportJobCronService(this);
-  const pageBulkExportJobCronSchedule = this.configManager.getConfig('crowi', 'app:pageBulkExportJobCronSchedule');
-  pageBulkExportJobCronService.startCron(pageBulkExportJobCronSchedule);
+  pageBulkExportJobCronService.startCron();
 };
 };
 
 
 Crowi.prototype.setupQuestionnaireService = function() {
 Crowi.prototype.setupQuestionnaireService = function() {

+ 1 - 1
apps/app/src/server/service/config-loader.ts

@@ -752,7 +752,7 @@ const ENV_VAR_NAME_TO_CONFIG_INFO = {
     ns: 'crowi',
     ns: 'crowi',
     key: 'app:pageBulkExportJobCronSchedule',
     key: 'app:pageBulkExportJobCronSchedule',
     type: ValueType.STRING,
     type: ValueType.STRING,
-    default: '0 * * * *',
+    default: '*/10 * * * *', // every 10 minutes
   },
   },
 };
 };
 
 

+ 8 - 3
apps/app/src/server/service/cron.ts

@@ -15,11 +15,10 @@ abstract class CronService {
 
 
   /**
   /**
    * Create and start a new cronjob
    * Create and start a new cronjob
-   * @param cronSchedule e.g. '0 1 * * *'
    */
    */
-  startCron(cronSchedule: string): void {
+  startCron(): void {
     this.cronJob?.stop();
     this.cronJob?.stop();
-    this.cronJob = this.generateCronJob(cronSchedule);
+    this.cronJob = this.generateCronJob(this.getCronSchedule());
     this.cronJob.start();
     this.cronJob.start();
   }
   }
 
 
@@ -30,6 +29,12 @@ abstract class CronService {
     this.cronJob.stop();
     this.cronJob.stop();
   }
   }
 
 
+  /**
+   * Get the cron schedule
+   * e.g. '0 1 * * *'
+   */
+  abstract getCronSchedule(): string;
+
   /**
   /**
    * Execute the job. Define the job process in the subclass.
    * Execute the job. Define the job process in the subclass.
    */
    */