Procházet zdrojové kódy

enable resuming page bulk export job on app restart

Futa Arai před 1 rokem
rodič
revize
6b6898bbb8

+ 1 - 5
apps/app/src/features/page-bulk-export/server/routes/apiv3/page-bulk-export.ts

@@ -34,13 +34,9 @@ module.exports = (crowi: Crowi): Router => {
     }
     }
 
 
     const { path, format } = req.body;
     const { path, format } = req.body;
-    const activityParameters = {
-      ip: req.ip,
-      endpoint: req.originalUrl,
-    };
 
 
     try {
     try {
-      await pageBulkExportService?.createAndExecuteBulkExportJob(path, req.user, activityParameters);
+      await pageBulkExportService?.createAndExecuteBulkExportJob(path, req.user);
       return res.apiv3({}, 204);
       return res.apiv3({}, 204);
     }
     }
     catch (err) {
     catch (err) {

+ 13 - 17
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -37,11 +37,6 @@ import PageBulkExportPageSnapshot from '../models/page-bulk-export-page-snapshot
 
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
 const logger = loggerFactory('growi:services:PageBulkExportService');
 
 
-type ActivityParameters ={
-  ip: string | undefined;
-  endpoint: string;
-}
-
 export class DuplicateBulkExportJobError extends Error {
 export class DuplicateBulkExportJobError extends Error {
 
 
   constructor() {
   constructor() {
@@ -78,7 +73,7 @@ class PageBulkExportService {
   /**
   /**
    * Create a new page bulk export job and execute it
    * Create a new page bulk export job and execute it
    */
    */
-  async createAndExecuteBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
+  async createAndExecuteBulkExportJob(basePagePath: string, currentUser): Promise<void> {
     const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
     const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
 
 
     if (basePage == null) {
     if (basePage == null) {
@@ -103,15 +98,13 @@ class PageBulkExportService {
 
 
     await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
     await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
 
 
-    this.executePageBulkExportJob(activityParameters, pageBulkExportJob);
+    this.executePageBulkExportJob(pageBulkExportJob);
   }
   }
 
 
   /**
   /**
    * Execute a page bulk export job. This method can also resume a previously inturrupted job.
    * Execute a page bulk export job. This method can also resume a previously inturrupted job.
    */
    */
-  private async executePageBulkExportJob(
-      activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument & HasObjectId,
-  ): Promise<void> {
+  async executePageBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument & HasObjectId): Promise<void> {
     try {
     try {
       const User = this.crowi.model('User');
       const User = this.crowi.model('User');
       const user = await User.findById(getIdForRef(pageBulkExportJob.user));
       const user = await User.findById(getIdForRef(pageBulkExportJob.user));
@@ -132,11 +125,11 @@ class PageBulkExportService {
     }
     }
     catch (err) {
     catch (err) {
       logger.error(err);
       logger.error(err);
-      await this.notifyExportResultAndCleanUp(false, activityParameters, pageBulkExportJob);
+      await this.notifyExportResultAndCleanUp(false, pageBulkExportJob);
       return;
       return;
     }
     }
 
 
-    await this.notifyExportResultAndCleanUp(true, activityParameters, pageBulkExportJob);
+    await this.notifyExportResultAndCleanUp(true, pageBulkExportJob);
   }
   }
 
 
   /**
   /**
@@ -148,13 +141,12 @@ class PageBulkExportService {
    */
    */
   private async notifyExportResultAndCleanUp(
   private async notifyExportResultAndCleanUp(
       succeeded: boolean,
       succeeded: boolean,
-      activityParameters: ActivityParameters,
       pageBulkExportJob: PageBulkExportJobDocument,
       pageBulkExportJob: PageBulkExportJobDocument,
   ): Promise<void> {
   ): Promise<void> {
     const action = succeeded ? SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED : SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED;
     const action = succeeded ? SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED : SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED;
     pageBulkExportJob.status = succeeded ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
     pageBulkExportJob.status = succeeded ? PageBulkExportJobStatus.completed : PageBulkExportJobStatus.failed;
     const results = await Promise.allSettled([
     const results = await Promise.allSettled([
-      this.notifyExportResult(activityParameters, pageBulkExportJob, action),
+      this.notifyExportResult(pageBulkExportJob, action),
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
       fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
       fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
       pageBulkExportJob.save(),
       pageBulkExportJob.save(),
@@ -213,10 +205,15 @@ class PageBulkExportService {
    * The export will resume from the last exported page if the process was interrupted.
    * The export will resume from the last exported page if the process was interrupted.
    */
    */
   private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
   private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+    const findQuery = pageBulkExportJob.lastExportedPagePath != null ? {
+      pageBulkExportJob,
+      path: { $gt: pageBulkExportJob.lastExportedPagePath },
+    } : { pageBulkExportJob };
     const pageSnapshotsReadable = PageBulkExportPageSnapshot
     const pageSnapshotsReadable = PageBulkExportPageSnapshot
-      .find({ pageBulkExportJob, path: { $gt: pageBulkExportJob.lastExportedPagePath } })
+      .find(findQuery)
       .populate('revision').sort({ path: 1 }).lean()
       .populate('revision').sort({ path: 1 }).lean()
       .cursor({ batchSize: this.pageBatchSize });
       .cursor({ batchSize: this.pageBatchSize });
+
     const pagesWritable = this.getPageWritable(pageBulkExportJob);
     const pagesWritable = this.getPageWritable(pageBulkExportJob);
 
 
     return pipelinePromise(pageSnapshotsReadable, pagesWritable);
     return pipelinePromise(pageSnapshotsReadable, pagesWritable);
@@ -352,10 +349,9 @@ class PageBulkExportService {
   }
   }
 
 
   private async notifyExportResult(
   private async notifyExportResult(
-      activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType,
+      pageBulkExportJob: PageBulkExportJobDocument, action: SupportedActionType,
   ) {
   ) {
     const activity = await this.crowi.activityService.createActivity({
     const activity = await this.crowi.activityService.createActivity({
-      ...activityParameters,
       action,
       action,
       targetModel: SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB,
       targetModel: SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB,
       target: pageBulkExportJob,
       target: pageBulkExportJob,

+ 14 - 1
apps/app/src/server/crowi/index.js

@@ -12,7 +12,9 @@ import pkg from '^/package.json';
 
 
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
-import instanciatePageBulkExportService from '~/features/page-bulk-export/server/service/page-bulk-export';
+import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+import PageBulkExportJob from '~/features/page-bulk-export/server/models/page-bulk-export-job';
+import instanciatePageBulkExportService, { pageBulkExportService } from '~/features/page-bulk-export/server/service/page-bulk-export';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
 import QuestionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
 import QuestionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
@@ -176,6 +178,8 @@ Crowi.prototype.init = async function() {
   ]);
   ]);
 
 
   await normalizeData();
   await normalizeData();
+
+  this.resumeIncompletePageBulkExportJobs();
 };
 };
 
 
 /**
 /**
@@ -791,4 +795,13 @@ Crowi.prototype.setupExternalUserGroupSyncService = function() {
   this.keycloakUserGroupSyncService = new KeycloakUserGroupSyncService(this.s2sMessagingService, this.socketIoService);
   this.keycloakUserGroupSyncService = new KeycloakUserGroupSyncService(this.s2sMessagingService, this.socketIoService);
 };
 };
 
 
+Crowi.prototype.resumeIncompletePageBulkExportJobs = async function() {
+  const jobs = await PageBulkExportJob.find({
+    $or: [
+      { status: PageBulkExportJobStatus.initializing }, { status: PageBulkExportJobStatus.exporting }, { status: PageBulkExportJobStatus.uploading },
+    ],
+  });
+  Promise.all(jobs.map(job => pageBulkExportService.executePageBulkExportJob(job)));
+};
+
 export default Crowi;
 export default Crowi;