|
|
@@ -30,7 +30,7 @@ import { preNotifyService } from '~/server/service/pre-notify';
|
|
|
import { getBufferToFixedSizeTransform } from '~/server/util/stream';
|
|
|
import loggerFactory from '~/utils/logger';
|
|
|
|
|
|
-import { PageBulkExportFormat, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
|
|
|
+import { PageBulkExportFormat, PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
|
|
|
import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
|
|
|
import PageBulkExportJob from '../models/page-bulk-export-job';
|
|
|
import type { PageBulkExportPageSnapshotDocument } from '../models/page-bulk-export-page-snapshot';
|
|
|
@@ -55,6 +55,14 @@ class BulkExportJobExpiredError extends Error {
|
|
|
|
|
|
}
|
|
|
|
|
|
+class BulkExportJobRestartedError extends Error {
|
|
|
+
|
|
|
+ constructor() {
|
|
|
+ super('Bulk export job has restarted');
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* Used to keep track of streams currently being executed, and enable destroying them
|
|
|
*/
|
|
|
@@ -70,10 +78,15 @@ class PageBulkExportJobStreamManager {
|
|
|
delete this.jobStreams[jobId.toString()];
|
|
|
}
|
|
|
|
|
|
- destroyJobStream(jobId: ObjectIdLike) {
|
|
|
+ destroyJobStream(jobId: ObjectIdLike, restarted = false) {
|
|
|
const stream = this.jobStreams[jobId.toString()];
|
|
|
if (stream != null) {
|
|
|
- stream.destroy(new BulkExportJobExpiredError());
|
|
|
+ if (restarted) {
|
|
|
+ stream.destroy(new BulkExportJobRestartedError());
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ stream.destroy(new BulkExportJobExpiredError());
|
|
|
+ }
|
|
|
}
|
|
|
this.removeJobStream(jobId);
|
|
|
}
|
|
|
@@ -110,7 +123,7 @@ class PageBulkExportService {
|
|
|
/**
|
|
|
* Create a new page bulk export job and execute it
|
|
|
*/
|
|
|
- async createAndExecuteBulkExportJob(basePagePath: string, currentUser): Promise<void> {
|
|
|
+ async createAndExecuteOrRestartBulkExportJob(basePagePath: string, currentUser, restartJob = false): Promise<void> {
|
|
|
const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
|
|
|
|
|
|
if (basePage == null) {
|
|
|
@@ -122,11 +135,13 @@ class PageBulkExportService {
|
|
|
user: currentUser,
|
|
|
page: basePage,
|
|
|
format,
|
|
|
- $or: [
|
|
|
- { status: PageBulkExportJobStatus.initializing }, { status: PageBulkExportJobStatus.exporting }, { status: PageBulkExportJobStatus.uploading },
|
|
|
- ],
|
|
|
+ $or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
|
|
|
});
|
|
|
if (duplicatePageBulkExportJobInProgress != null) {
|
|
|
+ if (restartJob) {
|
|
|
+ this.restartBulkExportJob(duplicatePageBulkExportJobInProgress);
|
|
|
+ return;
|
|
|
+ }
|
|
|
throw new DuplicateBulkExportJobError();
|
|
|
}
|
|
|
|
|
|
@@ -139,6 +154,18 @@ class PageBulkExportService {
|
|
|
this.executePageBulkExportJob(pageBulkExportJob);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Restart page bulk export job in progress from the beginning
|
|
|
+ */
|
|
|
+ async restartBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument & HasObjectId): Promise<void> {
|
|
|
+ this.pageBulkExportJobStreamManager.destroyJobStream(pageBulkExportJob._id, true);
|
|
|
+ await this.cleanUpExportJobResources(pageBulkExportJob);
|
|
|
+
|
|
|
+ pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
|
|
|
+ await pageBulkExportJob.save();
|
|
|
+ this.executePageBulkExportJob(pageBulkExportJob);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Execute a page bulk export job. This method can also resume a previously inturrupted job.
|
|
|
*/
|
|
|
@@ -177,11 +204,16 @@ class PageBulkExportService {
|
|
|
}
|
|
|
}
|
|
|
catch (err) {
|
|
|
- logger.error(err);
|
|
|
if (err instanceof BulkExportJobExpiredError) {
|
|
|
+ logger.error(err);
|
|
|
await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, pageBulkExportJob);
|
|
|
}
|
|
|
+ else if (err instanceof BulkExportJobRestartedError) {
|
|
|
+ logger.info(err.message);
|
|
|
+ await this.cleanUpExportJobResources(pageBulkExportJob);
|
|
|
+ }
|
|
|
else {
|
|
|
+ logger.error(err);
|
|
|
await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED, pageBulkExportJob);
|
|
|
}
|
|
|
return;
|