Просмотр исходного кода

Merge pull request #9031 from weseek/imprv/143599-152623-limit-parallel-bulk-export-execution

limit parallel bulk export execution
Futa Arai 1 год назад
Родитель
Сommit
59b92337c3

+ 0 - 3
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron.integ.ts

@@ -84,7 +84,6 @@ describe('PageBulkExportJobCronService', () => {
     });
 
     test('should delete expired jobs', async() => {
-      // assert
       expect(await PageBulkExportJob.find()).toHaveLength(4);
 
       // act
@@ -133,7 +132,6 @@ describe('PageBulkExportJobCronService', () => {
     });
 
     test('should delete download expired jobs', async() => {
-      // assert
       expect(await PageBulkExportJob.find()).toHaveLength(4);
 
       // act
@@ -166,7 +164,6 @@ describe('PageBulkExportJobCronService', () => {
     });
 
     test('should delete failed export jobs', async() => {
-      // assert
       expect(await PageBulkExportJob.find()).toHaveLength(3);
 
       // act

+ 16 - 11
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/index.ts

@@ -36,17 +36,21 @@ import type { PageBulkExportPageSnapshotDocument } from '../../models/page-bulk-
 import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';
 
 import { BulkExportJobExpiredError, BulkExportJobRestartedError, DuplicateBulkExportJobError } from './errors';
-import { PageBulkExportJobStreamManager } from './page-bulk-export-job-stream-manager';
+import { PageBulkExportJobManager } from './page-bulk-export-job-manager';
 
 
 const logger = loggerFactory('growi:services:PageBulkExportService');
 
-type ActivityParameters ={
+export type ActivityParameters ={
   ip?: string;
   endpoint: string;
 }
 
-class PageBulkExportService {
+export interface IPageBulkExportService {
+  executePageBulkExportJob: (pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters) => Promise<void>
+}
+
+class PageBulkExportService implements IPageBulkExportService {
 
   crowi: any;
 
@@ -59,7 +63,7 @@ class PageBulkExportService {
 
   compressExtension = 'tar.gz';
 
-  pageBulkExportJobStreamManager: PageBulkExportJobStreamManager = new PageBulkExportJobStreamManager();
+  pageBulkExportJobManager: PageBulkExportJobManager;
 
   // temporal path of local fs to output page files before upload
   // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
@@ -71,6 +75,7 @@ class PageBulkExportService {
     this.crowi = crowi;
     this.activityEvent = crowi.event('activity');
     this.pageModel = mongoose.model<IPage, PageModel>('Page');
+    this.pageBulkExportJobManager = new PageBulkExportJobManager(this);
   }
 
   /**
@@ -92,7 +97,7 @@ class PageBulkExportService {
     });
     if (duplicatePageBulkExportJobInProgress != null) {
       if (restartJob) {
-        this.restartBulkExportJob(duplicatePageBulkExportJobInProgress);
+        this.restartBulkExportJob(duplicatePageBulkExportJobInProgress, activityParameters);
         return;
       }
       throw new DuplicateBulkExportJobError();
@@ -103,18 +108,18 @@ class PageBulkExportService {
 
     await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
 
-    this.executePageBulkExportJob(pageBulkExportJob, activityParameters);
+    this.pageBulkExportJobManager.addJob(pageBulkExportJob, activityParameters);
   }
 
   /**
    * Restart page bulk export job in progress from the beginning
    */
-  async restartBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>): Promise<void> {
+  async restartBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>, activityParameters: ActivityParameters): Promise<void> {
     await this.cleanUpExportJobResources(pageBulkExportJob, true);
 
     pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
     await pageBulkExportJob.save();
-    this.executePageBulkExportJob(pageBulkExportJob);
+    this.pageBulkExportJobManager.addJob(pageBulkExportJob, activityParameters);
   }
 
   /**
@@ -245,7 +250,7 @@ class PageBulkExportService {
       },
     });
 
-    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pagesReadable);
+    this.pageBulkExportJobManager.updateJobStream(pageBulkExportJob._id, pagesReadable);
 
     await pipelinePromise(pagesReadable, pageSnapshotsWritable);
 
@@ -269,7 +274,7 @@ class PageBulkExportService {
 
     const pagesWritable = this.getPageWritable(pageBulkExportJob);
 
-    this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pageSnapshotsReadable);
+    this.pageBulkExportJobManager.updateJobStream(pageBulkExportJob._id, pageSnapshotsReadable);
 
     return pipelinePromise(pageSnapshotsReadable, pagesWritable);
   }
@@ -430,7 +435,7 @@ class PageBulkExportService {
    * - abort multipart upload
    */
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
-    this.pageBulkExportJobStreamManager.destroyJobStream(pageBulkExportJob._id, restarted);
+    this.pageBulkExportJobManager.removeJobInProgressAndQueueNextJob(pageBulkExportJob._id, restarted);
 
     const promises = [
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),

+ 231 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.spec.ts

@@ -0,0 +1,231 @@
+import { Readable } from 'stream';
+import { finished } from 'stream/promises';
+
+import type { HydratedDocument } from 'mongoose';
+
+import { configManager } from '~/server/service/config-manager';
+
+import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
+
+import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
+import { PageBulkExportJobManager } from './page-bulk-export-job-manager';
+
+describe('PageBulkExportJobManager', () => {
+  let pageBulkExportServiceMock;
+  let jobManager: PageBulkExportJobManager;
+
+  beforeAll(() => {
+    vi.spyOn(configManager, 'getConfig').mockImplementation((namespace, key) => {
+      if (namespace === 'crowi' && key === 'app:pageBulkExportParallelExecLimit') {
+        return 3;
+      }
+      return undefined; // or whatever the default return value should be
+    });
+  });
+
+  beforeEach(() => {
+    pageBulkExportServiceMock = {
+      executePageBulkExportJob: vi.fn(),
+    };
+    jobManager = new PageBulkExportJobManager(pageBulkExportServiceMock);
+  });
+
+  describe('canExecuteNextJob', () => {
+    it('should return true if jobs in progress are less than the limit', () => {
+      // act, assert
+      expect(jobManager.canExecuteNextJob()).toBe(true);
+    });
+
+    it('should return false if jobs in progress exceed the limit', () => {
+      // arrange
+      jobManager.jobsInProgress = {
+        job1: { stream: undefined },
+        job2: { stream: undefined },
+        job3: { stream: undefined },
+      };
+
+      // act, assert
+      expect(jobManager.canExecuteNextJob()).toBe(false);
+    });
+  });
+
+  describe('getJobInProgress', () => {
+    it('should return the info of job in progress', () => {
+      // arrange
+      const jobId = 'job1';
+      jobManager.jobsInProgress[jobId] = { stream: undefined };
+
+      // act, assert
+      expect(jobManager.getJobInProgress(jobId)).toEqual({ stream: undefined });
+    });
+
+    it('should return undefined if job is not in progress', () => {
+      // arrange
+      const jobId = 'job1';
+
+      // act, assert
+      expect(jobManager.getJobInProgress(jobId)).toBeUndefined();
+    });
+  });
+
+  describe('addJob', () => {
+    it('should add the job to jobsInProgress if under the parallelExecLimit', () => {
+      // arrange
+      const job = { _id: 'job1' } as HydratedDocument<PageBulkExportJobDocument>;
+      expect(jobManager.jobQueue.length).toBe(0);
+
+      // act
+      jobManager.addJob(job, { endpoint: '/test/endpoint' });
+
+      // assert
+      expect(jobManager.jobQueue.length).toBe(0);
+      expect(jobManager.jobsInProgress[job._id.toString()]).toEqual({ stream: undefined });
+      expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(job, { endpoint: '/test/endpoint' });
+    });
+
+    it('should queue the job if the parallelExecLimit is reached', () => {
+      // arrange
+      jobManager.jobsInProgress = {
+        job1: { stream: undefined },
+        job2: { stream: undefined },
+        job3: { stream: undefined },
+      };
+      const job = { _id: 'job2' } as HydratedDocument<PageBulkExportJobDocument>;
+      expect(jobManager.jobQueue.length).toBe(0);
+
+      // act
+      jobManager.addJob(job);
+
+      // assert
+      expect(jobManager.jobQueue.length).toBe(1);
+      expect(jobManager.jobQueue[0]).toEqual({ job });
+      expect(pageBulkExportServiceMock.executePageBulkExportJob).not.toHaveBeenCalled();
+    });
+  });
+
+  describe('updateJobStream', () => {
+    it('should set a new stream when there are no streams executing for the job', () => {
+      // arrange
+      const jobId = 'job1';
+      const mockStream = new Readable();
+      jobManager.jobsInProgress[jobId] = { stream: undefined };
+
+      // act
+      jobManager.updateJobStream(jobId, mockStream);
+
+      // assert
+      expect(jobManager.jobsInProgress[jobId].stream).toBe(mockStream);
+    });
+
+    it('should set a new stream when previous stream is finished', async() => {
+      // arrange
+      const jobId = 'job1';
+      const oldStream = new Readable({
+        read(size) {
+          // End the stream immediately
+          this.push(null);
+        },
+      });
+      oldStream.read();
+      await finished(oldStream);
+      const newStream = vi.fn().mockImplementation(() => {
+        const stream = new Readable();
+        stream.destroy = vi.fn();
+        return stream;
+      })() as unknown as Readable;
+      jobManager.addJob({ _id: jobId } as HydratedDocument<PageBulkExportJobDocument>);
+
+      // act
+      jobManager.updateJobStream(jobId, oldStream);
+
+      // assert
+      expect(oldStream.readableEnded).toBe(true);
+      jobManager.updateJobStream(jobId, newStream);
+      expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream);
+    });
+
+    it('should destroy non-finished stream with an error before setting a new stream', () => {
+      // arrange
+      const jobId = 'job1';
+      const oldStream = vi.fn().mockImplementation(() => {
+        const stream = new Readable();
+        stream.destroy = vi.fn();
+        return stream;
+      })();
+      const newStream = new Readable();
+      const destroySpy = vi.spyOn(oldStream, 'destroy');
+      jobManager.addJob({ _id: jobId } as HydratedDocument<PageBulkExportJobDocument>);
+      jobManager.updateJobStream(jobId, oldStream);
+
+      // act
+      jobManager.updateJobStream(jobId, newStream);
+      expect(destroySpy).toHaveBeenCalledWith(expect.any(Error));
+
+      // assert
+      expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream);
+    });
+
+    it('should destroy the new stream with BulkExportJobExpiredError if job is not in progress', () => {
+      // arrange
+      const jobId = 'job1';
+      const newStream = vi.fn().mockImplementation(() => {
+        const stream = new Readable();
+        stream.destroy = vi.fn();
+        return stream;
+      })();
+      const destroySpy = vi.spyOn(newStream, 'destroy');
+
+      // act
+      jobManager.updateJobStream(jobId, newStream);
+
+      // assert
+      expect(destroySpy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError));
+    });
+  });
+
+  describe('removeJobInProgressAndQueueNextJob', () => {
+    it('should remove the job in progress and queue the next job', () => {
+      // arrange
+      const jobId = 'job1';
+      const mockStream = vi.fn().mockImplementation(() => {
+        const stream = new Readable();
+        stream.destroy = vi.fn();
+        return stream;
+      })();
+      vi.spyOn(mockStream, 'destroy');
+      const nextJob = { _id: 'job2' } as HydratedDocument<PageBulkExportJobDocument>;
+      jobManager.jobsInProgress[jobId] = { stream: mockStream };
+      jobManager.jobQueue.push({ job: nextJob });
+      expect(jobManager.jobQueue.length).toBe(1);
+
+      // act
+      jobManager.removeJobInProgressAndQueueNextJob(jobId);
+
+      // assert
+      expect(jobManager.jobQueue.length).toBe(0);
+      expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError));
+      expect(jobManager.jobsInProgress[jobId]).toBeUndefined();
+      expect(jobManager.jobsInProgress[nextJob._id.toString()]).toEqual({ stream: undefined });
+      expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(nextJob, undefined);
+    });
+
+    it('should destroy the stream with a BulkExportJobRestartedError if job was restarted', () => {
+      // arrange
+      const jobId = 'job1';
+      const mockStream = vi.fn().mockImplementation(() => {
+        const stream = new Readable();
+        stream.destroy = vi.fn();
+        return stream;
+      })();
+      vi.spyOn(mockStream, 'destroy');
+      jobManager.jobsInProgress[jobId] = { stream: mockStream };
+
+      // act
+      jobManager.removeJobInProgressAndQueueNextJob(jobId, true);
+
+      // assert
+      expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobRestartedError));
+      expect(jobManager.jobsInProgress[jobId]).toBeUndefined();
+    });
+  });
+});

+ 125 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts

@@ -0,0 +1,125 @@
+import type { Readable } from 'stream';
+
+import type { HydratedDocument } from 'mongoose';
+
+import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
+import { configManager } from '~/server/service/config-manager';
+
+import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
+
+import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
+
+import type { ActivityParameters, IPageBulkExportService } from '.';
+
+/**
+ * Manage PageBulkExportJob execution.
+ * - Keep track of jobs being executed and enable destroying the stream if the job is terminated
+ * - Limit the number of jobs being executed in parallel
+ * - Queue jobs to be executed in order
+ */
+export class PageBulkExportJobManager {
+
+  pageBulkExportService: IPageBulkExportService;
+
+  private parallelExecLimit: number;
+
+  // contains jobs being executed and it's information
+  // the key is the _id of PageBulkExportJob and the value contains the stream of the job
+  jobsInProgress: {
+    [key: string]: { stream: Readable | undefined };
+  } = {};
+
+  // jobs waiting to be executed in order
+  jobQueue: { job: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters }[] = [];
+
+  constructor(pageBulkExportService: IPageBulkExportService) {
+    this.pageBulkExportService = pageBulkExportService;
+    this.parallelExecLimit = configManager.getConfig('crowi', 'app:pageBulkExportParallelExecLimit');
+  }
+
+  canExecuteNextJob(): boolean {
+    return Object.keys(this.jobsInProgress).length < this.parallelExecLimit;
+  }
+
+  /**
+   * Get the information of a job in progress.
+   * A getter method that includes "undefined" in the return type
+   */
+  getJobInProgress(jobId: ObjectIdLike): { stream: Readable | undefined } | undefined {
+    return this.jobsInProgress[jobId.toString()];
+  }
+
+  /**
+   * Add a job to the queue or execute it if the number of jobs in progress is less than the limit
+   * @param job job to add or execute
+   * @param activityParameters parameters to record user activity
+   */
+  addJob(job: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters): void {
+    if (this.canExecuteNextJob()) {
+      this.jobsInProgress[job._id.toString()] = { stream: undefined };
+      this.pageBulkExportService.executePageBulkExportJob(job, activityParameters);
+    }
+    else {
+      this.jobQueue.push({ job, activityParameters });
+    }
+  }
+
+  /**
+   * Update the info of which stream is being executed for a job
+   * @param jobId id of job to update
+   * @param stream the new stream being executed for the job
+   */
+  updateJobStream(jobId: ObjectIdLike, stream: Readable): void {
+    const jobInProgress = this.getJobInProgress(jobId);
+    if (jobInProgress != null) {
+      if (jobInProgress.stream != null && !jobInProgress.stream.readableEnded) {
+        jobInProgress.stream.destroy(new Error('Stream not finished before next stream started'));
+      }
+      jobInProgress.stream = stream;
+    }
+    else {
+      // job was terminated beforehand, so destroy the stream
+      stream.destroy(new BulkExportJobExpiredError());
+    }
+  }
+
+  /**
+   * Remove a job in execution and queue the next job if there are any
+   * @param jobId id of job to remove
+   * @param isJobRestarted whether or not the job was restarted
+   */
+  removeJobInProgressAndQueueNextJob(jobId: ObjectIdLike, isJobRestarted = false): void {
+    this.removeJobInProgress(jobId, isJobRestarted);
+
+    if (this.jobQueue.length > 0) {
+      while (this.canExecuteNextJob() && this.jobQueue.length > 0) {
+        const nextJob = this.jobQueue.shift();
+        if (nextJob != null) {
+          this.jobsInProgress[nextJob.job._id.toString()] = { stream: undefined };
+          this.pageBulkExportService.executePageBulkExportJob(nextJob.job, nextJob.activityParameters);
+        }
+      }
+    }
+  }
+
+  /**
+   * Remove a job in execution and destroy it's stream process
+   * @param jobId id of job to remove
+   * @param isJobRestarted whether or not the job was restarted
+   */
+  private removeJobInProgress(jobId: ObjectIdLike, isJobRestarted = false): void {
+    const jobInProgress = this.getJobInProgress(jobId);
+    if (jobInProgress == null) return;
+
+    if (jobInProgress.stream != null) {
+      if (isJobRestarted) {
+        jobInProgress.stream.destroy(new BulkExportJobRestartedError());
+      }
+      else {
+        jobInProgress.stream.destroy(new BulkExportJobExpiredError());
+      }
+    }
+    delete this.jobsInProgress[jobId.toString()];
+  }
+
+}

+ 0 - 38
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-stream-manager.ts

@@ -1,38 +0,0 @@
-import type { Readable } from 'stream';
-
-import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
-
-import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
-
-/**
- * Used to keep track of streams currently being executed, and enable destroying them
- */
-export class PageBulkExportJobStreamManager {
-
-  private jobStreams: Record<string, Readable> = {};
-
-  addJobStream(jobId: ObjectIdLike, stream: Readable): void {
-    if (this.jobStreams[jobId.toString()] != null) {
-      this.destroyJobStream(jobId);
-    }
-    this.jobStreams[jobId.toString()] = stream;
-  }
-
-  removeJobStream(jobId: ObjectIdLike): void {
-    delete this.jobStreams[jobId.toString()];
-  }
-
-  destroyJobStream(jobId: ObjectIdLike, restarted = false): void {
-    const stream = this.jobStreams[jobId.toString()];
-    if (stream != null) {
-      if (restarted) {
-        stream.destroy(new BulkExportJobRestartedError());
-      }
-      else {
-        stream.destroy(new BulkExportJobExpiredError());
-      }
-    }
-    this.removeJobStream(jobId);
-  }
-
-}

+ 1 - 2
apps/app/src/server/crowi/index.js

@@ -782,12 +782,11 @@ Crowi.prototype.setupExternalUserGroupSyncService = function() {
   this.keycloakUserGroupSyncService = new KeycloakUserGroupSyncService(this.s2sMessagingService, this.socketIoService);
 };
 
-// TODO: Limit the number of jobs to execute in parallel (https://redmine.weseek.co.jp/issues/143599)
 Crowi.prototype.resumeIncompletePageBulkExportJobs = async function() {
   const jobs = await PageBulkExportJob.find({
     $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
   });
-  Promise.all(jobs.map(job => pageBulkExportService.executePageBulkExportJob(job)));
+  jobs.forEach(job => pageBulkExportService?.pageBulkExportJobManager?.addJob(job));
 };
 
 export default Crowi;

+ 6 - 0
apps/app/src/server/service/config-loader.ts

@@ -754,6 +754,12 @@ const ENV_VAR_NAME_TO_CONFIG_INFO = {
     type: ValueType.STRING,
     default: '*/10 * * * *', // every 10 minutes
   },
+  BULK_EXPORT_PARALLEL_EXEC_LIMIT: {
+    ns: 'crowi',
+    key: 'app:pageBulkExportParallelExecLimit',
+    type: ValueType.NUMBER,
+    default: 5,
+  },
 };