Futa Arai 1 год назад
Родитель
Сommit
366614a989

+ 41 - 15
apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts

@@ -16,10 +16,13 @@ export class PageBulkExportJobManager {
 
   parallelExecLimit = 5;
 
+  // contains jobs being executed and it's information
+  // the key is the _id of PageBulkExportJob and the value contains the stream of the job
   jobsInProgress: {
     [key: string]: { stream: Readable | undefined };
   } = {};
 
+  // jobs waiting to be executed in order
   jobQueue: HydratedDocument<PageBulkExportJobDocument>[] = [];
 
   constructor(pageBulkExportService: IPageBulkExportService) {
@@ -34,6 +37,11 @@ export class PageBulkExportJobManager {
     return this.jobsInProgress[jobId.toString()];
   }
 
+  /**
+   * Add a job to the queue or execute it if the number of jobs in progress is less than the limit
+   * @param job job to add or execute
+   * @param activityParameters parameters to record user activity
+   */
   addJob(job: HydratedDocument<PageBulkExportJobDocument>, activityParameters?: ActivityParameters): void {
     if (this.canExecuteNextJob()) {
       this.jobsInProgress[job.id.toString()] = { stream: undefined };
@@ -44,9 +52,17 @@ export class PageBulkExportJobManager {
     }
   }
 
+  /**
+   * Update the info of which stream is being executed for a job
+   * @param jobId id of job to update
+   * @param stream the new stream being executed for the job
+   */
   updateJobStream(jobId: ObjectIdLike, stream: Readable): void {
     const jobInProgress = this.getJobInProgress(jobId);
     if (jobInProgress != null) {
+      if (jobInProgress.stream != null && !jobInProgress.stream.readableEnded) {
+        jobInProgress.stream.destroy(new Error('Stream not finished before next stream started'));
+      }
       jobInProgress.stream = stream;
     }
     else {
@@ -55,21 +71,11 @@ export class PageBulkExportJobManager {
     }
   }
 
-  removeJobInProgress(jobId: ObjectIdLike, isJobRestarted = false): void {
-    const jobInProgress = this.getJobInProgress(jobId);
-    if (jobInProgress == null) return;
-
-    if (jobInProgress.stream != null) {
-      if (isJobRestarted) {
-        jobInProgress.stream.destroy(new BulkExportJobRestartedError());
-      }
-      else {
-        jobInProgress.stream.destroy(new BulkExportJobExpiredError());
-      }
-    }
-    delete this.jobsInProgress[jobId.toString()];
-  }
-
+  /**
+   * Remove a job in execution and queue the next job if there are any
+   * @param jobId id of job to remove
+   * @param isJobRestarted whether or not the job was restarted
+   */
   removeJobInProgressAndQueueNextJob(jobId: ObjectIdLike, isJobRestarted = false): void {
     this.removeJobInProgress(jobId, isJobRestarted);
 
@@ -84,4 +90,24 @@ export class PageBulkExportJobManager {
     }
   }
 
+  /**
+   * Remove a job in execution and destroy it's stream process
+   * @param jobId id of job to remove
+   * @param isJobRestarted whether or not the job was restarted
+   */
+  private removeJobInProgress(jobId: ObjectIdLike, isJobRestarted = false): void {
+    const jobInProgress = this.getJobInProgress(jobId);
+    if (jobInProgress == null) return;
+
+    if (jobInProgress.stream != null) {
+      if (isJobRestarted) {
+        jobInProgress.stream.destroy(new BulkExportJobRestartedError());
+      }
+      else {
+        jobInProgress.stream.destroy(new BulkExportJobExpiredError());
+      }
+    }
+    delete this.jobsInProgress[jobId.toString()];
+  }
+
 }