|
|
@@ -1,10 +1,11 @@
|
|
|
+import { createHash } from 'crypto';
|
|
|
import fs from 'fs';
|
|
|
import path from 'path';
|
|
|
import { Writable } from 'stream';
|
|
|
import { pipeline as pipelinePromise } from 'stream/promises';
|
|
|
|
|
|
import {
|
|
|
- getIdForRef, type IPage, isPopulated, SubscriptionStatusType,
|
|
|
+ getIdForRef, getIdStringForRef, type IPage, isPopulated, SubscriptionStatusType,
|
|
|
} from '@growi/core';
|
|
|
import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
|
|
|
import type { Archiver } from 'archiver';
|
|
|
@@ -33,7 +34,7 @@ import PageBulkExportJob from '../../models/page-bulk-export-job';
|
|
|
import type { PageBulkExportPageSnapshotDocument } from '../../models/page-bulk-export-page-snapshot';
|
|
|
import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';
|
|
|
|
|
|
-import { BulkExportJobExpiredError, DuplicateBulkExportJobError } from './errors';
|
|
|
+import { BulkExportJobExpiredError, BulkExportJobRestartedError, DuplicateBulkExportJobError } from './errors';
|
|
|
import { PageBulkExportJobStreamManager } from './page-bulk-export-job-stream-manager';
|
|
|
|
|
|
|
|
|
@@ -74,7 +75,7 @@ class PageBulkExportService {
|
|
|
/**
|
|
|
* Create a new page bulk export job and execute it
|
|
|
*/
|
|
|
- async createAndExecuteBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
|
|
|
+ async createAndExecuteOrRestartBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters, restartJob = false): Promise<void> {
|
|
|
const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
|
|
|
|
|
|
if (basePage == null) {
|
|
|
@@ -89,6 +90,10 @@ class PageBulkExportService {
|
|
|
$or: Object.values(PageBulkExportJobInProgressStatus).map(status => ({ status })),
|
|
|
});
|
|
|
if (duplicatePageBulkExportJobInProgress != null) {
|
|
|
+ if (restartJob) {
|
|
|
+ this.restartBulkExportJob(duplicatePageBulkExportJobInProgress);
|
|
|
+ return;
|
|
|
+ }
|
|
|
throw new DuplicateBulkExportJobError();
|
|
|
}
|
|
|
const pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument> = await PageBulkExportJob.create({
|
|
|
@@ -100,6 +105,17 @@ class PageBulkExportService {
|
|
|
this.executePageBulkExportJob(pageBulkExportJob, activityParameters);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Restart page bulk export job in progress from the beginning
|
|
|
+ */
|
|
|
+ async restartBulkExportJob(pageBulkExportJob: HydratedDocument<PageBulkExportJobDocument>): Promise<void> {
|
|
|
+ await this.cleanUpExportJobResources(pageBulkExportJob, true);
|
|
|
+
|
|
|
+ pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
|
|
|
+ await pageBulkExportJob.save();
|
|
|
+ this.executePageBulkExportJob(pageBulkExportJob);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Execute a page bulk export job. This method can also resume a previously inturrupted job.
|
|
|
*/
|
|
|
@@ -110,7 +126,22 @@ class PageBulkExportService {
|
|
|
|
|
|
if (pageBulkExportJob.status === PageBulkExportJobStatus.initializing) {
|
|
|
await this.createPageSnapshots(user, pageBulkExportJob);
|
|
|
- pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
|
|
|
+
|
|
|
+ const duplicateExportJob = await PageBulkExportJob.findOne({
|
|
|
+ user: pageBulkExportJob.user,
|
|
|
+ page: pageBulkExportJob.page,
|
|
|
+ format: pageBulkExportJob.format,
|
|
|
+ status: PageBulkExportJobStatus.completed,
|
|
|
+ revisionListHash: pageBulkExportJob.revisionListHash,
|
|
|
+ });
|
|
|
+ if (duplicateExportJob != null) {
|
|
|
+ // if an upload with the exact same contents exists, re-use the same attachment of that upload
|
|
|
+ pageBulkExportJob.attachment = duplicateExportJob.attachment;
|
|
|
+ pageBulkExportJob.status = PageBulkExportJobStatus.completed;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
|
|
|
+ }
|
|
|
await pageBulkExportJob.save();
|
|
|
}
|
|
|
if (pageBulkExportJob.status === PageBulkExportJobStatus.exporting) {
|
|
|
@@ -123,11 +154,16 @@ class PageBulkExportService {
|
|
|
}
|
|
|
}
|
|
|
catch (err) {
|
|
|
- logger.error(err);
|
|
|
if (err instanceof BulkExportJobExpiredError) {
|
|
|
+ logger.error(err);
|
|
|
await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, pageBulkExportJob, activityParameters);
|
|
|
}
|
|
|
+ else if (err instanceof BulkExportJobRestartedError) {
|
|
|
+ logger.info(err.message);
|
|
|
+ await this.cleanUpExportJobResources(pageBulkExportJob);
|
|
|
+ }
|
|
|
else {
|
|
|
+ logger.error(err);
|
|
|
await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED, pageBulkExportJob, activityParameters);
|
|
|
}
|
|
|
return;
|
|
|
@@ -162,7 +198,8 @@ class PageBulkExportService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Create a snapshot for each page that is to be exported in the pageBulkExportJob
|
|
|
+ * Create a snapshot for each page that is to be exported in the pageBulkExportJob.
|
|
|
+ * Also calulate revisionListHash and save it to the pageBulkExportJob.
|
|
|
*/
|
|
|
private async createPageSnapshots(user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
|
|
|
// if the process of creating snapshots was interrupted, delete the snapshots and create from the start
|
|
|
@@ -173,6 +210,8 @@ class PageBulkExportService {
|
|
|
throw new Error('Base page not found');
|
|
|
}
|
|
|
|
|
|
+ const revisionListHash = createHash('sha256');
|
|
|
+
|
|
|
// create a Readable for pages to be exported
|
|
|
const { PageQueryBuilder } = this.pageModel;
|
|
|
const builder = await new PageQueryBuilder(this.pageModel.find())
|
|
|
@@ -188,6 +227,9 @@ class PageBulkExportService {
|
|
|
objectMode: true,
|
|
|
write: async(page: PageDocument, encoding, callback) => {
|
|
|
try {
|
|
|
+ if (page.revision != null) {
|
|
|
+ revisionListHash.update(getIdStringForRef(page.revision));
|
|
|
+ }
|
|
|
await PageBulkExportPageSnapshot.create({
|
|
|
pageBulkExportJob,
|
|
|
path: page.path,
|
|
|
@@ -205,6 +247,9 @@ class PageBulkExportService {
|
|
|
this.pageBulkExportJobStreamManager.addJobStream(pageBulkExportJob._id, pagesReadable);
|
|
|
|
|
|
await pipelinePromise(pagesReadable, pageSnapshotsWritable);
|
|
|
+
|
|
|
+ pageBulkExportJob.revisionListHash = revisionListHash.digest('hex');
|
|
|
+ await pageBulkExportJob.save();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -267,7 +312,8 @@ class PageBulkExportService {
|
|
|
const pageArchiver = this.setUpPageArchiver();
|
|
|
const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
|
|
|
|
|
|
- const originalName = `${pageBulkExportJob._id}.${this.compressExtension}`;
|
|
|
+ if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set');
|
|
|
+ const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`;
|
|
|
const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
|
|
|
const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
|
|
|
|
|
|
@@ -382,8 +428,8 @@ class PageBulkExportService {
|
|
|
* - remove the temporal output directory
|
|
|
* - abort multipart upload
|
|
|
*/
|
|
|
- async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument) {
|
|
|
- this.pageBulkExportJobStreamManager.destroyJobStream(pageBulkExportJob._id);
|
|
|
+ async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
|
|
|
+ this.pageBulkExportJobStreamManager.destroyJobStream(pageBulkExportJob._id, restarted);
|
|
|
|
|
|
const promises = [
|
|
|
PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
|