|
|
@@ -43,6 +43,10 @@ export interface IPageBulkExportJobCronService {
|
|
|
getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Manages cronjob which proceeds PageBulkExportJobs in progress.
|
|
|
+ * If PageBulkExportJob finishes the current step, the next step will be started on the next cron execution.
|
|
|
+ */
|
|
|
class PageBulkExportJobCronService extends CronService implements IPageBulkExportJobCronService {
|
|
|
|
|
|
crowi: any;
|
|
|
@@ -64,6 +68,8 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
|
|
|
|
|
|
userModel: mongoose.Model<IUser>;
|
|
|
|
|
|
+ // Keep track of the stream executed for PageBulkExportJob to destroy it on job failure.
|
|
|
+ // The key is the id of a PageBulkExportJob.
|
|
|
private streamInExecutionMemo: {
|
|
|
[key: string]: Readable;
|
|
|
} = {};
|
|
|
@@ -101,21 +107,31 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the stream in execution of a job.
|
|
|
+ * Get the stream in execution for a job.
|
|
|
* A getter method that includes "undefined" in the return type
|
|
|
*/
|
|
|
getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
|
|
|
return this.streamInExecutionMemo[jobId.toString()];
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Set the stream in execution for a job
|
|
|
+ */
|
|
|
setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
|
|
|
this.streamInExecutionMemo[jobId.toString()] = stream;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Remove the stream in execution for a job
|
|
|
+ */
|
|
|
removeStreamInExecution(jobId: ObjectIdLike) {
|
|
|
delete this.streamInExecutionMemo[jobId.toString()];
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Proceed the page bulk export job if the next step is executable
|
|
|
+ * @param pageBulkExportJob PageBulkExportJob in progress
|
|
|
+ */
|
|
|
async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
|
|
|
if (pageBulkExportJob.restartFlag) {
|
|
|
await this.cleanUpExportJobResources(pageBulkExportJob, true);
|
|
|
@@ -125,6 +141,7 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
|
|
|
await pageBulkExportJob.save();
|
|
|
}
|
|
|
|
|
|
+ // return if job is still the same status as the previous cron exec
|
|
|
if (pageBulkExportJob.status === pageBulkExportJob.statusOnPreviousCronExec) {
|
|
|
return;
|
|
|
}
|
|
|
@@ -151,6 +168,11 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Handle errors that occurred inside a stream pipeline
|
|
|
+ * @param err error
|
|
|
+ * @param pageBulkExportJob PageBulkExportJob executed in the pipeline
|
|
|
+ */
|
|
|
async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
|
|
|
if (err == null) return;
|
|
|
|