Răsfoiți Sursa

limit parallel bulk export execution

Futa Arai 1 an în urmă
părinte
comite
93a2a7cb56

+ 16 - 11
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/index.ts

@@ -35,17 +35,21 @@ import type { PageBulkExportPageSnapshotDocument } from '../../models/page-bulk-
 import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';
 
 import { BulkExportJobExpiredError, BulkExportJobRestartedError, DuplicateBulkExportJobError } from './errors';
-import { PageBulkExportJobStreamManager } from './page-bulk-export-job-stream-manager';
+import { PageBulkExportJobManager } from './page-bulk-export-job-manager';
 
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
 
-type ActivityParameters ={
+export type ActivityParameters ={
   ip?: string;
   endpoint: string;
 }
 
-class PageBulkExportService {
+export interface IPageBulkExportService {
+  executePageBulkExportJob: (pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters) => Promise<void>
+}
+
+class PageBulkExportService implements IPageBulkExportService {
 
   crowi: any;
 
@@ -58,7 +62,7 @@ class PageBulkExportService {
 
   compressExtension = 'tar.gz';
 
-  pageBulkExportJobStreamManager: PageBulkExportJobStreamManager = new PageBulkExportJobStreamManager();
+  pageBulkExportJobManager: PageBulkExportJobManager;
 
   // temporal path of local fs to output page files before upload
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
@@ -70,6 +74,7 @@ class PageBulkExportService {
     this.crowi = crowi;
     this.activityEvent = crowi.event('activity');
     this.pageModel = mongoose.model<IPage, PageModel>('Page');
+    this.pageBulkExportJobManager = new PageBulkExportJobManager(this);
   }
 
   /**
@@ -91,7 +96,7 @@ class PageBulkExportService {
     });
     if (duplicatePageBulkExportJobInProgress != null) {
       if (restartJob) {
-        this.restartBulkExportJob(duplicatePageBulkExportJobInProgress);
+        this.restartBulkExportJob(duplicatePageBulkExportJobInProgress, activityParameters);
         return;
       }
       throw new DuplicateBulkExportJobError();
@@ -102,18 +107,18 @@ class PageBulkExportService {
 
     await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
 
-    this.executePageBulkExportJob(pageBulkExportJob, activityParameters);
+    this.pageBulkExportJobManager.addJob(pageBulkExportJob, activityParameters);
   }
 
   /**
    * Restart page bulk export job in progress from the beginning
    */
-  async restartBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>): Promise<void> {
+  async restartBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters: ActivityParameters): Promise<void> {
     await this.cleanUpExportJobResources(pageBulkExportJob, true);
 
     pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
     await pageBulkExportJob.save();
-    this.executePageBulkExportJob(pageBulkExportJob);
+    this.pageBulkExportJobManager.addJob(pageBulkExportJob, activityParameters);
   }
 
   /**
@@ -244,7 +249,7 @@ class PageBulkExportService {
       },
     });
 
-    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pagesReadable);
+    this.pageBulkExportJobManager.updateJobStream(pageBulkExportJob._id, pagesReadable);
 
     await pipelinePromise(pagesReadable, pageSnapshotsWritable);
 
@@ -268,7 +273,7 @@ class PageBulkExportService {
 
     const pagesWritable = this.getPageWritable(pageBulkExportJob);
 
-    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pageSnapshotsReadable);
+    this.pageBulkExportJobManager.updateJobStream(pageBulkExportJob._id, pageSnapshotsReadable);
 
     return pipelinePromise(pageSnapshotsReadable, pagesWritable);
   }
@@ -429,7 +434,7 @@ class PageBulkExportService {
    * - abort multipart upload
    */
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
-    this.pageBulkExportJobStreamManager?.destroyJobStream(pageBulkExportJob._id, restarted);
+    this.pageBulkExportJobManager.removeJobInProgress(pageBulkExportJob._id, restarted);
 
     const promises = [
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),

+ 85 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts

@@ -0,0 +1,85 @@
+import type { Readable } from 'stream';
+
+import type { HydratedDocument } from 'mongoose';
+
+import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
+
+import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
+
+import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
+
+import type { ActivityParameters, IPageBulkExportService } from '.';
+
+export class PageBulkExportJobManager {
+
+  pageBulkExportService: IPageBulkExportService;
+
+  parallelExecLimit = 5;
+
+  jobsInProgress: {
+    [key: string]: { stream: Readable | undefined };
+  } = {};
+
+  jobQueue: HydratedDocument<PageBulkExportJobDocument>[] = [];
+
+  constructor(pageBulkExportService: IPageBulkExportService) {
+    this.pageBulkExportService = pageBulkExportService;
+  }
+
+  canExecuteNextJob(): boolean {
+    return Object.keys(this.jobsInProgress).length < this.parallelExecLimit;
+  }
+
+  getJobInProgress(jobId: ObjectIdLike): { stream: Readable | undefined } | undefined {
+    return this.jobsInProgress[jobId.toString()];
+  }
+
+  addJob(job: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters): void {
+    if (this.canExecuteNextJob()) {
+      this.jobsInProgress[job.id.toString()] = { stream: undefined };
+      this.pageBulkExportService.executePageBulkExportJob(job, activityParameters);
+    }
+    else {
+      this.jobQueue.push(job);
+    }
+  }
+
+  updateJobStream(jobId: ObjectIdLike, stream: Readable): void {
+    const jobInProgress = this.getJobInProgress(jobId);
+    if (jobInProgress != null) {
+      jobInProgress.stream = stream;
+    }
+    else {
+      // job was terminated beforehand, so destroy the stream
+      stream.destroy(new BulkExportJobExpiredError());
+    }
+  }
+
+  removeJobInProgress(jobId: ObjectIdLike, isJobRestarted = false): void {
+    const jobInProgress = this.getJobInProgress(jobId);
+    if (jobInProgress == null) return;
+
+    if (jobInProgress.stream != null) {
+      if (isJobRestarted) {
+        jobInProgress.stream.destroy(new BulkExportJobRestartedError());
+      }
+      else {
+        jobInProgress.stream.destroy(new BulkExportJobExpiredError());
+      }
+    }
+    delete this.jobsInProgress[jobId.toString()];
+  }
+
+  removeJobInProgressAndQueueNextJob(jobId: ObjectIdLike, isJobRestarted = false): void {
+    this.removeJobInProgress(jobId, isJobRestarted);
+
+    if (this.jobQueue.length > 0 && this.canExecuteNextJob()) {
+      const nextJob = this.jobQueue.shift();
+      if (nextJob != null) {
+        this.jobsInProgress[nextJob.id.toString()] = { stream: undefined };
+        this.pageBulkExportService.executePageBulkExportJob(nextJob);
+      }
+    }
+  }
+
+}

+ 1 - 2
apps/app/src/server/crowi/index.js

@@ -797,12 +797,11 @@ Crowi.prototype.setupExternalUserGroupSyncService = function() {
   this.keycloakUserGroupSyncService = new KeycloakUserGroupSyncService(this.s2sMessagingService, this.socketIoService);
 };
 
-// TODO: Limit the number of jobs to execute in parallel (https://redmine.weseek.co.jp/issues/143599)
 Crowi.prototype.resumeIncompletePageBulkExportJobs = async function() {
   const jobs = await PageBulkExportJob.find({
     $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
   });
-  Promise.all(jobs.map(job => pageBulkExportService.executePageBulkExportJob(job)));
+  jobs.forEach(job => pageBulkExportService?.pageBulkExportJobManager?.addJob(job));
 };
 
 export default Crowi;