Просмотр исходного кода

Merge branch 'imprv/158220-158223-add-page-bulk-export-job-cron' into imprv/158220-158276-clean-up-page-bulk-export-service

Futa Arai 1 год назад
Родитель
Сommit
c88f6e02e0

+ 38 - 0
apps/app/src/features/page-bulk-export/server/service/check-page-bulk-export-job-in-progress-cron.ts

@@ -0,0 +1,38 @@
+import { configManager } from '~/server/service/config-manager';
+import CronService from '~/server/service/cron';
+import loggerFactory from '~/utils/logger';
+
+import { PageBulkExportJobInProgressStatus } from '../../interfaces/page-bulk-export';
+import PageBulkExportJob from '../models/page-bulk-export-job';
+
+import { pageBulkExportJobCronService } from './page-bulk-export-job-cron';
+
+const logger = loggerFactory('growi:service:check-page-bulk-export-job-in-progress-cron');
+
+/**
+ * Manages cronjob which checks if PageBulkExportJob in progress exists.
+ * If it does, and PageBulkExportJobCronService is not running, start PageBulkExportJobCronService
+ */
+class CheckPageBulkExportJobInProgressCronService extends CronService {
+
+  override getCronSchedule(): string {
+    return configManager.getConfig('crowi', 'app:checkPageBulkExportJobInProgressCronSchedule');
+  }
+
+  override async executeJob(): Promise<void> {
+    const pageBulkExportJobInProgress = await PageBulkExportJob.findOne({
+      $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
+    });
+    const pageBulkExportInProgressExists = pageBulkExportJobInProgress != null;
+
+    if (pageBulkExportInProgressExists && !pageBulkExportJobCronService?.isJobRunning()) {
+      pageBulkExportJobCronService?.startCron();
+    }
+    else if (!pageBulkExportInProgressExists) {
+      pageBulkExportJobCronService?.stopCron();
+    }
+  }
+
+}
+
+export const checkPageBulkExportJobInProgressCronService = new CheckPageBulkExportJobInProgressCronService(); // singleton instance

+ 35 - 15
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -1,16 +1,16 @@
 import fs from 'fs';
 import fs from 'fs';
 import type { Readable } from 'stream';
 import type { Readable } from 'stream';
 
 
-import type { IPage, IUser } from '@growi/core';
+import type { IUser } from '@growi/core';
 import { isPopulated, getIdForRef } from '@growi/core';
 import { isPopulated, getIdForRef } from '@growi/core';
 import mongoose from 'mongoose';
 import mongoose from 'mongoose';
 
 
 
 
 import type { SupportedActionType } from '~/interfaces/activity';
 import type { SupportedActionType } from '~/interfaces/activity';
 import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
 import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
+import type Crowi from '~/server/crowi';
 import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
 import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
 import type { ActivityDocument } from '~/server/models/activity';
 import type { ActivityDocument } from '~/server/models/activity';
-import type { PageModel } from '~/server/models/page';
 import { configManager } from '~/server/service/config-manager';
 import { configManager } from '~/server/service/config-manager';
 import CronService from '~/server/service/cron';
 import CronService from '~/server/service/cron';
 import type { FileUploader } from '~/server/service/file-uploader';
 import type { FileUploader } from '~/server/service/file-uploader';
@@ -32,8 +32,7 @@ import { exportPagesToFsAsync } from './steps/export-pages-to-fs-async';
 const logger = loggerFactory('growi:service:page-bulk-export-job-cron');
 const logger = loggerFactory('growi:service:page-bulk-export-job-cron');
 
 
 export interface IPageBulkExportJobCronService {
 export interface IPageBulkExportJobCronService {
-  crowi: any;
-  pageModel: PageModel;
+  crowi: Crowi;
   pageBatchSize: number;
   pageBatchSize: number;
   maxPartSize: number;
   maxPartSize: number;
   compressExtension: string;
   compressExtension: string;
@@ -43,9 +42,13 @@ export interface IPageBulkExportJobCronService {
   getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string;
   getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string;
 }
 }
 
 
+/**
+ * Manages cronjob which proceeds PageBulkExportJobs in progress.
+ * If PageBulkExportJob finishes the current step, the next step will be started on the next cron execution.
+ */
 class PageBulkExportJobCronService extends CronService implements IPageBulkExportJobCronService {
 class PageBulkExportJobCronService extends CronService implements IPageBulkExportJobCronService {
 
 
-  crowi: any;
+  crowi: Crowi;
 
 
   activityEvent: any;
   activityEvent: any;
 
 
@@ -60,22 +63,18 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
   tmpOutputRootDir = '/tmp/page-bulk-export';
   tmpOutputRootDir = '/tmp/page-bulk-export';
 
 
-  pageModel: PageModel;
-
-  userModel: mongoose.Model<IUser>;
-
+  // Keep track of the stream executed for PageBulkExportJob to destroy it on job failure.
+  // The key is the id of a PageBulkExportJob.
   private streamInExecutionMemo: {
   private streamInExecutionMemo: {
     [key: string]: Readable;
     [key: string]: Readable;
   } = {};
   } = {};
 
 
   private parallelExecLimit: number;
   private parallelExecLimit: number;
 
 
-  constructor(crowi) {
+  constructor(crowi: Crowi) {
     super();
     super();
     this.crowi = crowi;
     this.crowi = crowi;
     this.activityEvent = crowi.event('activity');
     this.activityEvent = crowi.event('activity');
-    this.pageModel = mongoose.model<IPage, PageModel>('Page');
-    this.userModel = mongoose.model<IUser>('User');
     this.parallelExecLimit = configManager.getConfig('crowi', 'app:pageBulkExportParallelExecLimit');
     this.parallelExecLimit = configManager.getConfig('crowi', 'app:pageBulkExportParallelExecLimit');
   }
   }
 
 
@@ -91,6 +90,10 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
     pageBulkExportJobsInProgress.forEach((pageBulkExportJob) => {
     pageBulkExportJobsInProgress.forEach((pageBulkExportJob) => {
       this.proceedBulkExportJob(pageBulkExportJob);
       this.proceedBulkExportJob(pageBulkExportJob);
     });
     });
+
+    if (pageBulkExportJobsInProgress.length === 0) {
+      this.stopCron();
+    }
   }
   }
 
 
   /**
   /**
@@ -101,21 +104,31 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
   }
   }
 
 
   /**
   /**
-   * Get the stream in execution of a job.
+   * Get the stream in execution for a job.
    * A getter method that includes "undefined" in the return type
    * A getter method that includes "undefined" in the return type
    */
    */
   getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
   getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
     return this.streamInExecutionMemo[jobId.toString()];
     return this.streamInExecutionMemo[jobId.toString()];
   }
   }
 
 
+  /**
+   * Set the stream in execution for a job
+   */
   setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
   setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
     this.streamInExecutionMemo[jobId.toString()] = stream;
     this.streamInExecutionMemo[jobId.toString()] = stream;
   }
   }
 
 
+  /**
+   * Remove the stream in execution for a job
+   */
   removeStreamInExecution(jobId: ObjectIdLike) {
   removeStreamInExecution(jobId: ObjectIdLike) {
     delete this.streamInExecutionMemo[jobId.toString()];
     delete this.streamInExecutionMemo[jobId.toString()];
   }
   }
 
 
+  /**
+   * Proceed the page bulk export job if the next step is executable
+   * @param pageBulkExportJob PageBulkExportJob in progress
+   */
   async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
   async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
     if (pageBulkExportJob.restartFlag) {
     if (pageBulkExportJob.restartFlag) {
       await this.cleanUpExportJobResources(pageBulkExportJob, true);
       await this.cleanUpExportJobResources(pageBulkExportJob, true);
@@ -125,11 +138,13 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
       await pageBulkExportJob.save();
       await pageBulkExportJob.save();
     }
     }
 
 
+    // return if job is still the same status as the previous cron exec
     if (pageBulkExportJob.status === pageBulkExportJob.statusOnPreviousCronExec) {
     if (pageBulkExportJob.status === pageBulkExportJob.statusOnPreviousCronExec) {
       return;
       return;
     }
     }
+    const User = mongoose.model<IUser>('User');
     try {
     try {
-      const user = await this.userModel.findById(getIdForRef(pageBulkExportJob.user));
+      const user = await User.findById(getIdForRef(pageBulkExportJob.user));
 
 
       // update statusOnPreviousCronExec before starting processes that updates status
       // update statusOnPreviousCronExec before starting processes that updates status
       pageBulkExportJob.statusOnPreviousCronExec = pageBulkExportJob.status;
       pageBulkExportJob.statusOnPreviousCronExec = pageBulkExportJob.status;
@@ -151,6 +166,11 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
     }
     }
   }
   }
 
 
+  /**
+   * Handle errors that occurred inside a stream pipeline
+   * @param err error
+   * @param pageBulkExportJob PageBulkExportJob executed in the pipeline
+   */
   async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
   async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
     if (err == null) return;
     if (err == null) return;
 
 
@@ -246,6 +266,6 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
 
 
 // eslint-disable-next-line import/no-mutable-exports
 // eslint-disable-next-line import/no-mutable-exports
 export let pageBulkExportJobCronService: PageBulkExportJobCronService | undefined; // singleton instance
 export let pageBulkExportJobCronService: PageBulkExportJobCronService | undefined; // singleton instance
-export default function instanciate(crowi): void {
+export default function instanciate(crowi: Crowi): void {
   pageBulkExportJobCronService = new PageBulkExportJobCronService(crowi);
   pageBulkExportJobCronService = new PageBulkExportJobCronService(crowi);
 }
 }

+ 22 - 9
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts

@@ -2,16 +2,19 @@ import { createHash } from 'crypto';
 import { Writable, pipeline } from 'stream';
 import { Writable, pipeline } from 'stream';
 
 
 import { getIdForRef, getIdStringForRef } from '@growi/core';
 import { getIdForRef, getIdStringForRef } from '@growi/core';
+import type { IPage } from '@growi/core';
+import mongoose from 'mongoose';
 
 
 import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
 import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
-import type { PageDocument } from '~/server/models/page';
+import { SupportedAction } from '~/interfaces/activity';
+import type { PageDocument, PageModel } from '~/server/models/page';
 
 
 import type { IPageBulkExportJobCronService } from '..';
 import type { IPageBulkExportJobCronService } from '..';
 import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job';
 import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job';
 import PageBulkExportJob from '../../../models/page-bulk-export-job';
 import PageBulkExportJob from '../../../models/page-bulk-export-job';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
 
 
-async function reuseDuplicateExportIfExists(pageBulkExportJob: PageBulkExportJobDocument) {
+async function reuseDuplicateExportIfExists(this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument) {
   const duplicateExportJob = await PageBulkExportJob.findOne({
   const duplicateExportJob = await PageBulkExportJob.findOne({
     user: pageBulkExportJob.user,
     user: pageBulkExportJob.user,
     page: pageBulkExportJob.page,
     page: pageBulkExportJob.page,
@@ -24,6 +27,8 @@ async function reuseDuplicateExportIfExists(pageBulkExportJob: PageBulkExportJob
     pageBulkExportJob.attachment = duplicateExportJob.attachment;
     pageBulkExportJob.attachment = duplicateExportJob.attachment;
     pageBulkExportJob.status = PageBulkExportJobStatus.completed;
     pageBulkExportJob.status = PageBulkExportJobStatus.completed;
     await pageBulkExportJob.save();
     await pageBulkExportJob.save();
+
+    await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
   }
   }
 }
 }
 
 
@@ -32,10 +37,12 @@ async function reuseDuplicateExportIfExists(pageBulkExportJob: PageBulkExportJob
  * 'revisionListHash' is calulated and saved to the pageBulkExportJob at the end of the pipeline.
  * 'revisionListHash' is calulated and saved to the pageBulkExportJob at the end of the pipeline.
  */
  */
 export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
 export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+  const Page = mongoose.model<IPage, PageModel>('Page');
+
   // if the process of creating snapshots was interrupted, delete the snapshots and create from the start
   // if the process of creating snapshots was interrupted, delete the snapshots and create from the start
   await PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob });
   await PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob });
 
 
-  const basePage = await this.pageModel.findById(getIdForRef(pageBulkExportJob.page));
+  const basePage = await Page.findById(getIdForRef(pageBulkExportJob.page));
   if (basePage == null) {
   if (basePage == null) {
     throw new Error('Base page not found');
     throw new Error('Base page not found');
   }
   }
@@ -43,8 +50,8 @@ export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronServi
   const revisionListHash = createHash('sha256');
   const revisionListHash = createHash('sha256');
 
 
   // create a Readable for pages to be exported
   // create a Readable for pages to be exported
-  const { PageQueryBuilder } = this.pageModel;
-  const builder = await new PageQueryBuilder(this.pageModel.find())
+  const { PageQueryBuilder } = Page;
+  const builder = await new PageQueryBuilder(Page.find())
     .addConditionToListWithDescendants(basePage.path)
     .addConditionToListWithDescendants(basePage.path)
     .addViewerCondition(user);
     .addViewerCondition(user);
   const pagesReadable = builder
   const pagesReadable = builder
@@ -73,11 +80,17 @@ export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronServi
       callback();
       callback();
     },
     },
     final: async(callback) => {
     final: async(callback) => {
-      pageBulkExportJob.revisionListHash = revisionListHash.digest('hex');
-      pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
-      await pageBulkExportJob.save();
+      try {
+        pageBulkExportJob.revisionListHash = revisionListHash.digest('hex');
+        pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
+        await pageBulkExportJob.save();
 
 
-      await reuseDuplicateExportIfExists(pageBulkExportJob);
+        await reuseDuplicateExportIfExists.bind(this)(pageBulkExportJob);
+      }
+      catch (err) {
+        callback(err);
+        return;
+      }
       callback();
       callback();
     },
     },
   });
   });

+ 8 - 2
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts

@@ -42,8 +42,14 @@ function getPageWritable(this: IPageBulkExportJobCronService, pageBulkExportJob:
       callback();
       callback();
     },
     },
     final: async(callback) => {
     final: async(callback) => {
-      pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
-      await pageBulkExportJob.save();
+      try {
+        pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
+        await pageBulkExportJob.save();
+      }
+      catch (err) {
+        callback(err);
+        return;
+      }
       callback();
       callback();
     },
     },
   });
   });

+ 5 - 4
apps/app/src/server/crowi/index.js

@@ -13,10 +13,11 @@ import pkg from '^/package.json';
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
 import { startCronIfEnabled as startOpenaiCronIfEnabled } from '~/features/openai/server/services/cron';
 import { startCronIfEnabled as startOpenaiCronIfEnabled } from '~/features/openai/server/services/cron';
+import { checkPageBulkExportJobInProgressCronService } from '~/features/page-bulk-export/server/service/check-page-bulk-export-job-in-progress-cron';
 import instanciatePageBulkExportJobCleanUpCronService, {
 import instanciatePageBulkExportJobCleanUpCronService, {
   pageBulkExportJobCleanUpCronService,
   pageBulkExportJobCleanUpCronService,
 } from '~/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron';
 } from '~/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron';
-import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
+import instanciatePageBulkExportJobCronService from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import questionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
 import questionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
@@ -326,12 +327,12 @@ Crowi.prototype.setupSocketIoService = async function() {
 Crowi.prototype.setupCron = function() {
 Crowi.prototype.setupCron = function() {
   questionnaireCronService.startCron();
   questionnaireCronService.startCron();
 
 
+  instanciatePageBulkExportJobCronService(this);
+  checkPageBulkExportJobInProgressCronService.startCron();
+
   instanciatePageBulkExportJobCleanUpCronService(this);
   instanciatePageBulkExportJobCleanUpCronService(this);
   pageBulkExportJobCleanUpCronService.startCron();
   pageBulkExportJobCleanUpCronService.startCron();
 
 
-  instanciatePageBulkExportJobCronService(this);
-  pageBulkExportJobCronService.startCron();
-
   startOpenaiCronIfEnabled();
   startOpenaiCronIfEnabled();
 };
 };
 
 

+ 6 - 0
apps/app/src/server/service/config-loader.ts

@@ -769,6 +769,12 @@ const ENV_VAR_NAME_TO_CONFIG_INFO: Record<string, EnvConfig> = {
     type: ValueType.STRING,
     type: ValueType.STRING,
     default: '*/10 * * * * *', // every 10 seconds
     default: '*/10 * * * * *', // every 10 seconds
   },
   },
+  CHECK_PAGE_BULK_EXPORT_JOB_IN_PROGRESS_CRON_SCHEDULE: {
+    ns: 'crowi',
+    key: 'app:checkPageBulkExportJobInProgressCronSchedule',
+    type: ValueType.STRING,
+    default: '*/3 * * * *', // every 3 minutes
+  },
   BULK_EXPORT_JOB_CLEAN_UP_CRON_SCHEDULE: {
   BULK_EXPORT_JOB_CLEAN_UP_CRON_SCHEDULE: {
     ns: 'crowi',
     ns: 'crowi',
     key: 'app:pageBulkExportJobCleanUpCronSchedule',
     key: 'app:pageBulkExportJobCleanUpCronSchedule',

+ 7 - 2
apps/app/src/server/service/cron.ts

@@ -11,7 +11,7 @@ const logger = loggerFactory('growi:service:cron');
 abstract class CronService {
 abstract class CronService {
 
 
   // The current cronjob to manage
   // The current cronjob to manage
-  cronJob: ScheduledTask;
+  cronJob: ScheduledTask | undefined;
 
 
   /**
   /**
    * Create and start a new cronjob
    * Create and start a new cronjob
@@ -26,7 +26,12 @@ abstract class CronService {
    * Stop the current cronjob
    * Stop the current cronjob
    */
    */
   stopCron(): void {
   stopCron(): void {
-    this.cronJob.stop();
+    this.cronJob?.stop();
+    this.cronJob = undefined;
+  }
+
+  isJobRunning(): boolean {
+    return this.cronJob != null;
   }
   }
 
 
   /**
   /**