Forráskód Böngészése

add CheckPageBulkExportJobInProgressCronService

Futa Arai 1 éve
szülő
commit
79f0706260

+ 34 - 0
apps/app/src/features/page-bulk-export/server/service/check-page-bulk-export-job-in-progress-cron.ts

@@ -0,0 +1,34 @@
+import { configManager } from '~/server/service/config-manager';
+import CronService from '~/server/service/cron';
+import loggerFactory from '~/utils/logger';
+
+import { PageBulkExportJobInProgressStatus } from '../../interfaces/page-bulk-export';
+import PageBulkExportJob from '../models/page-bulk-export-job';
+
+import { pageBulkExportJobCronService } from './page-bulk-export-job-cron';
+
+const logger = loggerFactory('growi:service:check-page-bulk-export-job-in-progress-cron');
+
+class CheckPageBulkExportJobInProgressCronService extends CronService {
+
+  override getCronSchedule(): string {
+    return configManager.getConfig('crowi', 'app:checkPageBulkExportJobInProgressCronSchedule');
+  }
+
+  override async executeJob(): Promise<void> {
+    const pageBulkExportJobInProgress = await PageBulkExportJob.findOne({
+      $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
+    });
+    const pageBulkExportInProgressExists = pageBulkExportJobInProgress != null;
+
+    if (pageBulkExportInProgressExists && !pageBulkExportJobCronService?.isJobRunning()) {
+      pageBulkExportJobCronService?.startCron();
+    }
+    else if (!pageBulkExportInProgressExists) {
+      pageBulkExportJobCronService?.stopCron();
+    }
+  }
+
+}
+
+export const checkPageBulkExportJobInProgressCronService = new CheckPageBulkExportJobInProgressCronService(); // singleton instance

+ 4 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -97,6 +97,10 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
     pageBulkExportJobsInProgress.forEach((pageBulkExportJob) => {
       this.proceedBulkExportJob(pageBulkExportJob);
     });
+
+    if (pageBulkExportJobsInProgress.length === 0) {
+      this.stopCron();
+    }
   }
 
   /**

+ 14 - 5
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts

@@ -4,6 +4,7 @@ import { Writable, pipeline } from 'stream';
 import { getIdForRef, getIdStringForRef } from '@growi/core';
 
 import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+import { SupportedAction } from '~/interfaces/activity';
 import type { PageDocument } from '~/server/models/page';
 
 import type { IPageBulkExportJobCronService } from '..';
@@ -11,7 +12,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export
 import PageBulkExportJob from '../../../models/page-bulk-export-job';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 
-async function reuseDuplicateExportIfExists(pageBulkExportJob: PageBulkExportJobDocument) {
+async function reuseDuplicateExportIfExists(this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument) {
   const duplicateExportJob = await PageBulkExportJob.findOne({
     user: pageBulkExportJob.user,
     page: pageBulkExportJob.page,
@@ -24,6 +25,8 @@ async function reuseDuplicateExportIfExists(pageBulkExportJob: PageBulkExportJob
     pageBulkExportJob.attachment = duplicateExportJob.attachment;
     pageBulkExportJob.status = PageBulkExportJobStatus.completed;
     await pageBulkExportJob.save();
+
+    await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
   }
 }
 
@@ -73,11 +76,17 @@ export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronServi
       callback();
     },
     final: async(callback) => {
-      pageBulkExportJob.revisionListHash = revisionListHash.digest('hex');
-      pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
-      await pageBulkExportJob.save();
+      try {
+        pageBulkExportJob.revisionListHash = revisionListHash.digest('hex');
+        pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
+        await pageBulkExportJob.save();
 
-      await reuseDuplicateExportIfExists(pageBulkExportJob);
+        await reuseDuplicateExportIfExists.bind(this)(pageBulkExportJob);
+      }
+      catch (err) {
+        callback(err);
+        return;
+      }
       callback();
     },
   });

+ 8 - 2
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts

@@ -42,8 +42,14 @@ function getPageWritable(this: IPageBulkExportJobCronService, pageBulkExportJob:
       callback();
     },
     final: async(callback) => {
-      pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
-      await pageBulkExportJob.save();
+      try {
+        pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
+        await pageBulkExportJob.save();
+      }
+      catch (err) {
+        callback(err);
+        return;
+      }
       callback();
     },
   });

+ 5 - 0
apps/app/src/server/crowi/index.js

@@ -15,10 +15,12 @@ import { LdapUserGroupSyncService } from '~/features/external-user-group/server/
 import { startCronIfEnabled as startOpenaiCronIfEnabled } from '~/features/openai/server/services/cron';
 import { PageBulkExportJobInProgressStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
 import PageBulkExportJob from '~/features/page-bulk-export/server/models/page-bulk-export-job';
+import { checkPageBulkExportJobInProgressCronService } from '~/features/page-bulk-export/server/service/check-page-bulk-export-job-in-progress-cron';
 import instanciatePageBulkExportService, { pageBulkExportService } from '~/features/page-bulk-export/server/service/page-bulk-export';
 import instanciatePageBulkExportJobCleanUpCronService, {
   pageBulkExportJobCleanUpCronService,
 } from '~/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron';
+import instanciatePageBulkExportJobCronService from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import questionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
 import loggerFactory from '~/utils/logger';
@@ -331,6 +333,9 @@ Crowi.prototype.setupSocketIoService = async function() {
 Crowi.prototype.setupCron = function() {
   questionnaireCronService.startCron();
 
+  instanciatePageBulkExportJobCronService(this);
+  checkPageBulkExportJobInProgressCronService.startCron();
+
   instanciatePageBulkExportJobCleanUpCronService(this);
   pageBulkExportJobCleanUpCronService.startCron();
 

+ 6 - 0
apps/app/src/server/service/config-loader.ts

@@ -769,6 +769,12 @@ const ENV_VAR_NAME_TO_CONFIG_INFO: Record<string, EnvConfig> = {
     type: ValueType.STRING,
     default: '*/10 * * * * *', // every 10 seconds
   },
+  CHECK_PAGE_BULK_EXPORT_JOB_IN_PROGRESS_CRON_SCHEDULE: {
+    ns: 'crowi',
+    key: 'app:checkPageBulkExportJobInProgressCronSchedule',
+    type: ValueType.STRING,
+    default: '*/3 * * * *', // every 3 minutes
+  },
   BULK_EXPORT_JOB_CLEAN_UP_CRON_SCHEDULE: {
     ns: 'crowi',
     key: 'app:pageBulkExportJobCleanUpCronSchedule',

+ 7 - 2
apps/app/src/server/service/cron.ts

@@ -11,7 +11,7 @@ const logger = loggerFactory('growi:service:cron');
 abstract class CronService {
 
   // The current cronjob to manage
-  cronJob: ScheduledTask;
+  cronJob: ScheduledTask | undefined;
 
   /**
    * Create and start a new cronjob
@@ -26,7 +26,12 @@ abstract class CronService {
    * Stop the current cronjob
    */
   stopCron(): void {
-    this.cronJob.stop();
+    this.cronJob?.stop();
+    this.cronJob = undefined;
+  }
+
+  isJobRunning(): boolean {
+    return this.cronJob != null;
   }
 
   /**