Browse Source

add PageBulkExportPdfConvertCronService

Futa Arai 1 year ago
parent
commit
b98f78e501

+ 10 - 10
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.ts → apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts

@@ -10,12 +10,12 @@ import PageBulkExportJob from '../models/page-bulk-export-job';
 
 import { pageBulkExportService } from './page-bulk-export';
 
-const logger = loggerFactory('growi:service:cron');
+const logger = loggerFactory('growi:service:page-bulk-export-job-clean-up-cron');
 
 /**
  * Manages cronjob which deletes unnecessary bulk export jobs
  */
-class PageBulkExportJobCronService extends CronService {
+class PageBulkExportJobCleanUpCronService extends CronService {
 
   crowi: any;
 
@@ -25,7 +25,7 @@ class PageBulkExportJobCronService extends CronService {
   }
 
   override getCronSchedule(): string {
-    return configManager.getConfig('crowi', 'app:pageBulkExportJobCronSchedule');
+    return configManager.getConfig('crowi', 'app:pageBulkExportJobCleanUpCronSchedule');
   }
 
   override async executeJob(): Promise<void> {
@@ -63,7 +63,7 @@ class PageBulkExportJobCronService extends CronService {
       completedAt: { $lt: thresholdDate },
     });
 
-    const cleanup = async(job: PageBulkExportJobDocument) => {
+    const cleanUp = async(job: PageBulkExportJobDocument) => {
       await pageBulkExportService?.cleanUpExportJobResources(job);
 
       const hasSameAttachmentAndDownloadNotExpired = await PageBulkExportJob.findOne({
@@ -77,7 +77,7 @@ class PageBulkExportJobCronService extends CronService {
       }
     };
 
-    await this.cleanUpAndDeleteBulkExportJobs(downloadExpiredExportJobs, cleanup);
+    await this.cleanUpAndDeleteBulkExportJobs(downloadExpiredExportJobs, cleanUp);
   }
 
   /**
@@ -93,15 +93,15 @@ class PageBulkExportJobCronService extends CronService {
 
   async cleanUpAndDeleteBulkExportJobs(
       pageBulkExportJobs: HydratedDocument<PageBulkExportJobDocument>[],
-      cleanup: (job: PageBulkExportJobDocument) => Promise<void>,
+      cleanUp: (job: PageBulkExportJobDocument) => Promise<void>,
   ): Promise<void> {
-    const results = await Promise.allSettled(pageBulkExportJobs.map(job => cleanup(job)));
+    const results = await Promise.allSettled(pageBulkExportJobs.map(job => cleanUp(job)));
     results.forEach((result) => {
       if (result.status === 'rejected') logger.error(result.reason);
     });
 
     // Only batch delete jobs which have been successfully cleaned up
-    // Cleanup failed jobs will be retried in the next cron execution
+    // Clean up failed jobs will be retried in the next cron execution
     const cleanedUpJobs = pageBulkExportJobs.filter((_, index) => results[index].status === 'fulfilled');
     if (cleanedUpJobs.length > 0) {
       const cleanedUpJobIds = cleanedUpJobs.map(job => job._id);
@@ -112,7 +112,7 @@ class PageBulkExportJobCronService extends CronService {
 }
 
 // eslint-disable-next-line import/no-mutable-exports
-export let pageBulkExportJobCronService: PageBulkExportJobCronService | undefined; // singleton instance
+export let pageBulkExportJobCleanUpCronService: PageBulkExportJobCleanUpCronService | undefined; // singleton instance
 export default function instanciate(crowi): void {
-  pageBulkExportJobCronService = new PageBulkExportJobCronService(crowi);
+  pageBulkExportJobCleanUpCronService = new PageBulkExportJobCleanUpCronService(crowi);
 }

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.integ.ts

@@ -5,7 +5,7 @@ import { configManager } from '~/server/service/config-manager';
 import { PageBulkExportFormat, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
 import PageBulkExportJob from '../models/page-bulk-export-job';
 
-import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from './page-bulk-export-job-cron';
+import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from './page-bulk-export-job-clean-up-cron';
 
 // TODO: use actual user model after ~/server/models/user.js becomes importable in vitest
 // ref: https://github.com/vitest-dev/vitest/issues/846

+ 117 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-pdf-convert-cron.ts

@@ -0,0 +1,117 @@
+import type { HydratedDocument } from 'mongoose';
+import EventEmitter from 'node:events';
+
+import { configManager } from '~/server/service/config-manager';
+import CronService from '~/server/service/cron';
+import loggerFactory from '~/utils/logger';
+
+import { PageBulkExportFormat, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
+import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
+import PageBulkExportJob from '../models/page-bulk-export-job';
+
+import { BulkExportJobExpiredError } from './page-bulk-export/errors';
+import { PdfCtrlSyncJobStatus202Status, PdfCtrlSyncJobStatusBodyStatus, pdfCtrlSyncJobStatus } from '^/../pdf-converter/dist/client-library';
+import PageBulkExportPageSnapshot from '../models/page-bulk-export-page-snapshot';
+
+const logger = loggerFactory('growi:service:page-bulk-export-pdf-convert-cron');
+
+const eventEmitter = new EventEmitter();
+
+/**
+ * Start pdf export by requesting pdf-converter and keep updating/checking the status until the export is done
+ * ref) https://dev.growi.org/66ee8495830566b31e02c953#growi
+ * @param pageBulkExportJob page bulk export job in execution
+ */
+class PageBulkExportPdfConvertCronService extends CronService {
+
+  override getCronSchedule(): string {
+    return configManager.getConfig('crowi', 'app:pageBulkExportPdfConvertCronSchedule');
+  }
+
+  override async executeJob(): Promise<void> {
+    const pdfExportingJobs = await PageBulkExportJob.find({
+      format: PageBulkExportFormat.pdf,
+      status: PageBulkExportJobStatus.exporting,
+    });
+
+    const results = await Promise.allSettled(
+      pdfExportingJobs.map(async (pageBulkExportJob) => {
+        await this.requestPdfConverter(pageBulkExportJob);
+      })
+    )
+
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
+  }
+
+  async requestPdfConverter(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>): Promise<void> {
+    const jobCreatedAt = pageBulkExportJob.createdAt;
+    if (jobCreatedAt == null) {
+      eventEmitter.emit('pdfExportFailed');
+      throw new Error('createdAt is not set');
+    }
+
+    const exportJobExpirationSeconds = configManager.getConfig('crowi', 'app:bulkExportJobExpirationSeconds');
+    const bulkExportJobExpirationDate = new Date(jobCreatedAt.getTime() + exportJobExpirationSeconds * 1000);
+    let pdfConvertStatus: PdfCtrlSyncJobStatusBodyStatus = PdfCtrlSyncJobStatusBodyStatus.HTML_EXPORT_IN_PROGRESS;
+
+    const lastExportPagePath = (await PageBulkExportPageSnapshot.findOne({ pageBulkExportJob }).sort({ path: -1 }))?.path;
+    if (lastExportPagePath == null) {
+      eventEmitter.emit('pdfExportFailed id:' + pageBulkExportJob._id.toString());
+      throw new Error('lastExportPagePath is missing');
+    }
+
+    if (new Date() > bulkExportJobExpirationDate) {
+      eventEmitter.emit('bulkExportJobExpired id:' + pageBulkExportJob._id.toString());
+    }
+    try {
+      if (pageBulkExportJob.lastExportedPagePath === lastExportPagePath) {
+        pdfConvertStatus = PdfCtrlSyncJobStatusBodyStatus.HTML_EXPORT_DONE;
+      }
+
+      if (pageBulkExportJob.status === PageBulkExportJobStatus.failed) {
+        pdfConvertStatus = PdfCtrlSyncJobStatusBodyStatus.FAILED;
+      }
+
+      const res = await pdfCtrlSyncJobStatus({
+        jobId: pageBulkExportJob._id.toString(), expirationDate: bulkExportJobExpirationDate.toISOString(), status: pdfConvertStatus,
+      }, { baseURL: configManager.getConfig('crowi', 'app:pageBulkExportPdfConverterUrl') });
+
+      if (res.data.status === PdfCtrlSyncJobStatus202Status.PDF_EXPORT_DONE) {
+        eventEmitter.emit('pdfExportDone id:' + pageBulkExportJob._id.toString());
+      }
+      else if (res.data.status === PdfCtrlSyncJobStatus202Status.FAILED) {
+        eventEmitter.emit('pdfExportFailed id:' + pageBulkExportJob._id.toString());
+      }
+    }
+    catch (err) {
+      // Only set as failure when host is ready but failed.
+      // If host is not ready, the request should be retried on the next cron execution.
+      if (!['ENOTFOUND', 'ECONNREFUSED'].includes(err.code)) {
+        eventEmitter.emit('pdfExportFailed id:' + pageBulkExportJob._id.toString());
+        throw err;
+      }
+    }
+  };
+}
+
+export const pageBulkExportPdfConvertCronService = Object.freeze(new PageBulkExportPdfConvertCronService());
+
+export function waitPdfExportToFs(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+  return new Promise<void>((resolve, reject) => {
+    eventEmitter.on('pdfExportDone id:' + pageBulkExportJob._id.toString(), () => {
+      resolve();
+      return;
+    });
+
+    eventEmitter.on('bulkExportJobExpired id:' + pageBulkExportJob._id.toString(), () => {
+      reject(new BulkExportJobExpiredError());
+      return;
+    });
+    eventEmitter.on('pdfExportFailed id:' + pageBulkExportJob._id.toString(), () => {
+      reject(new Error('PDF export failed'));
+      return;
+    });
+  });
+}

+ 15 - 74
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/index.ts

@@ -3,14 +3,6 @@ import fs from 'fs';
 import path from 'path';
 import { Writable, pipeline } from 'stream';
 import { pipeline as pipelinePromise } from 'stream/promises';
-
-
-import type { IUser } from '@growi/core';
-import {
-  getIdForRef, getIdStringForRef, type IPage, isPopulated, SubscriptionStatusType,
-} from '@growi/core';
-import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
-import { pdfCtrlSyncJobStatus, PdfCtrlSyncJobStatus202Status, PdfCtrlSyncJobStatusBodyStatus } from '@growi/pdf-converter/dist/client-library';
 import type { Archiver } from 'archiver';
 import archiver from 'archiver';
 import gc from 'expose-gc/function';
@@ -19,6 +11,12 @@ import mongoose from 'mongoose';
 import remark from 'remark';
 import html from 'remark-html';
 
+import type { IUser } from '@growi/core';
+import {
+  getIdForRef, getIdStringForRef, type IPage, isPopulated, SubscriptionStatusType,
+} from '@growi/core';
+import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
+
 import type { SupportedActionType } from '~/interfaces/activity';
 import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
 import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
@@ -27,7 +25,6 @@ import type { IAttachmentDocument } from '~/server/models/attachment';
 import { Attachment } from '~/server/models/attachment';
 import type { PageModel, PageDocument } from '~/server/models/page';
 import Subscription from '~/server/models/subscription';
-import { configManager } from '~/server/service/config-manager';
 import type { FileUploader } from '~/server/service/file-uploader';
 import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
 import { preNotifyService } from '~/server/service/pre-notify';
@@ -39,20 +36,21 @@ import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-jo
 import PageBulkExportJob from '../../models/page-bulk-export-job';
 import type { PageBulkExportPageSnapshotDocument } from '../../models/page-bulk-export-page-snapshot';
 import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';
-
 import { BulkExportJobExpiredError, BulkExportJobRestartedError, DuplicateBulkExportJobError } from './errors';
-import { PageBulkExportJobManager } from './page-bulk-export-job-manager';
+import { IPageBulkExportJobManager, PageBulkExportJobManager } from './page-bulk-export-job-manager';
+import { waitPdfExportToFs } from '../page-bulk-export-pdf-convert-cron';
 
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
 
-export type ActivityParameters ={
+export type ActivityParameters = {
   ip?: string;
   endpoint: string;
 }
 
 export interface IPageBulkExportService {
   executePageBulkExportJob: (pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters) => Promise<void>
+  pageBulkExportJobManager: IPageBulkExportJobManager;
 }
 
 class PageBulkExportService implements IPageBulkExportService {
@@ -68,7 +66,7 @@ class PageBulkExportService implements IPageBulkExportService {
 
   compressExtension = 'tar.gz';
 
-  pageBulkExportJobManager: PageBulkExportJobManager;
+  pageBulkExportJobManager: IPageBulkExportJobManager;
 
   // temporal path of local fs to output page files before upload
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
@@ -157,7 +155,7 @@ class PageBulkExportService implements IPageBulkExportService {
         await pageBulkExportJob.save();
       }
       if (pageBulkExportJob.status === PageBulkExportJobStatus.exporting) {
-        await this.exportPagesToFS(pageBulkExportJob);
+        await this.exportPagesToFs(pageBulkExportJob);
         pageBulkExportJob.status = PageBulkExportJobStatus.uploading;
         await pageBulkExportJob.save();
       }
@@ -185,7 +183,7 @@ class PageBulkExportService implements IPageBulkExportService {
   }
 
   /**
-   * Notify the user of the export result, and cleanup the resources used in the export process
+   * Notify the user of the export result, and clean up the resources used in the export process
    * @param action whether the export was successful
    * @param pageBulkExportJob the page bulk export job
    * @param activityParameters parameters to record user activity
@@ -268,7 +266,7 @@ class PageBulkExportService implements IPageBulkExportService {
    * Export pages to the file system before compressing and uploading to the cloud storage.
    * The export will resume from the last exported page if the process was interrupted.
    */
-  private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+  private async exportPagesToFs(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
     const findQuery = pageBulkExportJob.lastExportedPagePath != null ? {
       pageBulkExportJob,
       path: { $gt: pageBulkExportJob.lastExportedPagePath },
@@ -284,7 +282,7 @@ class PageBulkExportService implements IPageBulkExportService {
 
     if (pageBulkExportJob.format === PageBulkExportFormat.pdf) {
       pipeline(pageSnapshotsReadable, pagesWritable, (err) => { if (err != null) logger.error(err); });
-      await this.startAndWaitPdfExportFinish(pageBulkExportJob);
+      await waitPdfExportToFs(pageBulkExportJob);
     }
     else {
       await pipelinePromise(pageSnapshotsReadable, pagesWritable);
@@ -344,63 +342,6 @@ class PageBulkExportService implements IPageBulkExportService {
     return htmlString;
   }
 
-  /**
-   * Start pdf export by requesting pdf-converter and keep updating/checking the status until the export is done
-   * ref) https://dev.growi.org/66ee8495830566b31e02c953#growi
-   * @param pageBulkExportJob page bulk export job in execution
-   */
-  private async startAndWaitPdfExportFinish(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
-    const jobCreatedAt = pageBulkExportJob.createdAt;
-    if (jobCreatedAt == null) throw new Error('createdAt is not set');
-
-    const exportJobExpirationSeconds = configManager.getConfig('crowi', 'app:bulkExportJobExpirationSeconds');
-    const jobExpirationDate = new Date(jobCreatedAt.getTime() + exportJobExpirationSeconds * 1000);
-    let status: PdfCtrlSyncJobStatusBodyStatus = PdfCtrlSyncJobStatusBodyStatus.HTML_EXPORT_IN_PROGRESS;
-
-    const lastExportPagePath = (await PageBulkExportPageSnapshot.findOne({ pageBulkExportJob }).sort({ path: -1 }))?.path;
-    if (lastExportPagePath == null) throw new Error('lastExportPagePath is missing');
-
-    return new Promise<void>((resolve, reject) => {
-      // Request sync job API until the pdf export is done. If pdf export status is updated in growi, send the status to pdf-converter.
-      const interval = setInterval(async() => {
-        if (new Date() > jobExpirationDate) {
-          reject(new BulkExportJobExpiredError());
-        }
-        try {
-          const latestPageBulkExportJob = await PageBulkExportJob.findById(pageBulkExportJob._id);
-          if (latestPageBulkExportJob == null) throw new Error('pageBulkExportJob is missing');
-          if (latestPageBulkExportJob.lastExportedPagePath === lastExportPagePath) {
-            status = PdfCtrlSyncJobStatusBodyStatus.HTML_EXPORT_DONE;
-          }
-
-          if (latestPageBulkExportJob.status === PageBulkExportJobStatus.failed) {
-            status = PdfCtrlSyncJobStatusBodyStatus.FAILED;
-          }
-
-          const res = await pdfCtrlSyncJobStatus({
-            jobId: pageBulkExportJob._id.toString(), expirationDate: jobExpirationDate.toISOString(), status,
-          }, { baseURL: configManager.getConfig('crowi', 'app:pageBulkExportPdfConverterUrl') });
-
-          if (res.data.status === PdfCtrlSyncJobStatus202Status.PDF_EXPORT_DONE) {
-            clearInterval(interval);
-            resolve();
-          }
-          else if (res.data.status === PdfCtrlSyncJobStatus202Status.FAILED) {
-            clearInterval(interval);
-            reject(new Error('PDF export failed'));
-          }
-        }
-        catch (err) {
-          // continue the loop if the host is not ready
-          if (!['ENOTFOUND', 'ECONNREFUSED'].includes(err.code)) {
-            clearInterval(interval);
-            reject(err);
-          }
-        }
-      }, 60 * 1000 * 1);
-    });
-  }
-
   /**
    * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
    */

+ 12 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts

@@ -11,13 +11,24 @@ import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors
 
 import type { ActivityParameters, IPageBulkExportService } from '.';
 
+export interface IPageBulkExportJobManager {
+  jobsInProgress: {
+    [key: string]: { stream: Readable | undefined };
+  };
+  canExecuteNextJob(): boolean;
+  getJobInProgress(jobId: ObjectIdLike): { stream: Readable | undefined } | undefined;
+  addJob(job: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters): void;
+  updateJobStream(jobId: ObjectIdLike, stream: Readable): void;
+  removeJobInProgressAndQueueNextJob(jobId: ObjectIdLike, isJobRestarted?: boolean): void;
+}
+
 /**
  * Manage PageBulkExportJob execution.
  * - Keep track of jobs being executed and enable destroying the stream if the job is terminated
  * - Limit the number of jobs being executed in parallel
  * - Queue jobs to be executed in order
  */
-export class PageBulkExportJobManager {
+export class PageBulkExportJobManager implements IPageBulkExportJobManager {
 
   pageBulkExportService: IPageBulkExportService;
 

+ 6 - 3
apps/app/src/server/crowi/index.js

@@ -15,10 +15,11 @@ import { LdapUserGroupSyncService } from '~/features/external-user-group/server/
 import { PageBulkExportJobInProgressStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
 import PageBulkExportJob from '~/features/page-bulk-export/server/models/page-bulk-export-job';
 import instanciatePageBulkExportService, { pageBulkExportService } from '~/features/page-bulk-export/server/service/page-bulk-export';
-import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
+import instanciatePageBulkExportJobCleanUpCronService, { pageBulkExportJobCleanUpCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron';
 import { startCronIfEnabled as startOpenaiCronIfEnabled } from '~/features/openai/server/services/cron';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import questionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
+import { pageBulkExportPdfConvertCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-pdf-convert-cron';
 import loggerFactory from '~/utils/logger';
 import { projectRoot } from '~/utils/project-dir-utils';
 
@@ -331,8 +332,10 @@ Crowi.prototype.setupSocketIoService = async function() {
 Crowi.prototype.setupCron = function() {
   questionnaireCronService.startCron();
 
-  instanciatePageBulkExportJobCronService(this);
-  pageBulkExportJobCronService.startCron();
+  instanciatePageBulkExportJobCleanUpCronService(this);
+  pageBulkExportJobCleanUpCronService.startCron();
+
+  pageBulkExportPdfConvertCronService.startCron();
 
   startOpenaiCronIfEnabled();
 };

+ 8 - 2
apps/app/src/server/service/config-loader.ts

@@ -763,12 +763,18 @@ const ENV_VAR_NAME_TO_CONFIG_INFO: Record<string, EnvConfig> = {
     type: ValueType.NUMBER,
     default: 86400, // 1 day
   },
-  BULK_EXPORT_JOB_CRON_SCHEDULE: {
+  BULK_EXPORT_JOB_CLEAN_UP_CRON_SCHEDULE: {
     ns: 'crowi',
-    key: 'app:pageBulkExportJobCronSchedule',
+    key: 'app:pageBulkExportJobCleanUpCronSchedule',
     type: ValueType.STRING,
     default: '*/10 * * * *', // every 10 minutes
   },
+  BULK_EXPORT_PDF_CONVERT_CRON_SCHEDULE: {
+    ns: 'crowi',
+    key: 'app:pageBulkExportPdfConvertCronSchedule',
+    type: ValueType.STRING,
+    default: '* * * * *', // every 1 minute
+  },
   BULK_EXPORT_PARALLEL_EXEC_LIMIT: {
     ns: 'crowi',
     key: 'app:pageBulkExportParallelExecLimit',