|
|
@@ -5,7 +5,9 @@ import { pipeline as pipelinePromise } from 'stream/promises';
|
|
|
|
|
|
|
|
|
import type { HasObjectId } from '@growi/core';
|
|
|
-import { type IPage, isPopulated, SubscriptionStatusType } from '@growi/core';
|
|
|
+import {
|
|
|
+ getIdForRef, type IPage, isPopulated, SubscriptionStatusType,
|
|
|
+} from '@growi/core';
|
|
|
import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
|
|
|
import type { Archiver } from 'archiver';
|
|
|
import archiver from 'archiver';
|
|
|
@@ -63,7 +65,7 @@ class PageBulkExportService {
|
|
|
|
|
|
// temporal path of local fs to output page files before upload
|
|
|
// TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
|
|
|
- tmpOutputRootDir = '/tmp';
|
|
|
+ tmpOutputRootDir = '/tmp/page-bulk-export';
|
|
|
|
|
|
pageModel: PageModel;
|
|
|
|
|
|
@@ -73,7 +75,10 @@ class PageBulkExportService {
|
|
|
this.pageModel = mongoose.model<IPage, PageModel>('Page');
|
|
|
}
|
|
|
|
|
|
- async createAndStartPageBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
|
|
|
+ /**
|
|
|
+ * Create a new page bulk export job and execute it
|
|
|
+ */
|
|
|
+ async createAndExecuteBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
|
|
|
const basePage = await this.pageModel.findByPathAndViewer(basePagePath, currentUser, null, true);
|
|
|
|
|
|
if (basePage == null) {
|
|
|
@@ -98,15 +103,21 @@ class PageBulkExportService {
|
|
|
|
|
|
await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
|
|
|
|
|
|
- this.executePageBulkExportJob(currentUser, activityParameters, pageBulkExportJob);
|
|
|
+ this.executePageBulkExportJob(activityParameters, pageBulkExportJob);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Execute a page bulk export job. This method can also resume a previously inturrupted job.
|
|
|
+ */
|
|
|
private async executePageBulkExportJob(
|
|
|
- currentUser, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument & HasObjectId,
|
|
|
+ activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument & HasObjectId,
|
|
|
): Promise<void> {
|
|
|
try {
|
|
|
+ const User = this.crowi.model('User');
|
|
|
+ const user = await User.findById(getIdForRef(pageBulkExportJob.user));
|
|
|
+
|
|
|
if (pageBulkExportJob.status === PageBulkExportJobStatus.initializing) {
|
|
|
- await this.createPageSnapshots(currentUser, pageBulkExportJob);
|
|
|
+ await this.createPageSnapshots(user, pageBulkExportJob);
|
|
|
pageBulkExportJob.status = PageBulkExportJobStatus.exporting;
|
|
|
await pageBulkExportJob.save();
|
|
|
}
|
|
|
@@ -116,7 +127,7 @@ class PageBulkExportService {
|
|
|
await pageBulkExportJob.save();
|
|
|
}
|
|
|
if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) {
|
|
|
- await this.compressAndUpload(currentUser, pageBulkExportJob);
|
|
|
+ await this.compressAndUpload(user, pageBulkExportJob);
|
|
|
}
|
|
|
}
|
|
|
catch (err) {
|
|
|
@@ -129,11 +140,11 @@ class PageBulkExportService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Handles export failure with the following:
|
|
|
- * - notify the user of the failure
|
|
|
+ * Do the following in parallel:
|
|
|
+ * - notify user of the export result
|
|
|
+ * - update pageBulkExportJob status
|
|
|
* - delete page snapshots
|
|
|
* - remove the temporal output directory
|
|
|
- * - set status of pageBulkExportJob to 'failed'
|
|
|
*/
|
|
|
private async notifyExportResultAndCleanUp(
|
|
|
succeeded: boolean,
|
|
|
@@ -153,28 +164,14 @@ class PageBulkExportService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Export pages to the file system before compressing and uploading to the cloud storage.
|
|
|
- * The export will resume from the last exported page if the process was interrupted.
|
|
|
- */
|
|
|
- private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
|
|
|
- const pageSnapshotsReadable = PageBulkExportPageSnapshot
|
|
|
- .find({ pageBulkExportJob, path: { $gt: pageBulkExportJob.lastExportedPagePath } })
|
|
|
- .populate('revision').sort({ path: 1 }).lean()
|
|
|
- .cursor({ batchSize: this.pageBatchSize });
|
|
|
- const pagesWritable = this.getPageWritable(pageBulkExportJob);
|
|
|
-
|
|
|
- return pipelinePromise(pageSnapshotsReadable, pagesWritable);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Create a snapshot for each page that is to be exported in the pageBulkExportJob
|
|
|
*/
|
|
|
- private async createPageSnapshots(currentUser, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
|
|
|
+ private async createPageSnapshots(user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
|
|
|
// if the process of creating snapshots was interrupted, delete the snapshots and create from the start
|
|
|
await PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob });
|
|
|
|
|
|
- const basePage = await this.pageModel.findById(pageBulkExportJob.page);
|
|
|
+ const basePage = await this.pageModel.findById(getIdForRef(pageBulkExportJob.page));
|
|
|
if (basePage == null) {
|
|
|
throw new Error('Base page not found');
|
|
|
}
|
|
|
@@ -183,7 +180,7 @@ class PageBulkExportService {
|
|
|
const { PageQueryBuilder } = this.pageModel;
|
|
|
const builder = await new PageQueryBuilder(this.pageModel.find())
|
|
|
.addConditionToListWithDescendants(basePage.path)
|
|
|
- .addViewerCondition(currentUser);
|
|
|
+ .addViewerCondition(user);
|
|
|
const pagesReadable = builder
|
|
|
.query
|
|
|
.lean()
|
|
|
@@ -211,6 +208,20 @@ class PageBulkExportService {
|
|
|
await pipelinePromise(pagesReadable, pageSnapshotsWritable);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Export pages to the file system before compressing and uploading to the cloud storage.
|
|
|
+ * The export will resume from the last exported page if the process was interrupted.
|
|
|
+ */
|
|
|
+ private async exportPagesToFS(pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
|
|
|
+ const pageSnapshotsReadable = PageBulkExportPageSnapshot
|
|
|
+ .find({ pageBulkExportJob, path: { $gt: pageBulkExportJob.lastExportedPagePath } })
|
|
|
+ .populate('revision').sort({ path: 1 }).lean()
|
|
|
+ .cursor({ batchSize: this.pageBatchSize });
|
|
|
+ const pagesWritable = this.getPageWritable(pageBulkExportJob);
|
|
|
+
|
|
|
+ return pipelinePromise(pageSnapshotsReadable, pagesWritable);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get a Writable that writes the page body temporarily to fs
|
|
|
*/
|
|
|
@@ -224,7 +235,7 @@ class PageBulkExportService {
|
|
|
|
|
|
if (revision != null && isPopulated(revision)) {
|
|
|
const markdownBody = revision.body;
|
|
|
- const pathNormalized = `${normalizePath(page.path)}.md`;
|
|
|
+ const pathNormalized = `${normalizePath(page.path)}.${PageBulkExportFormat.md}`;
|
|
|
const fileOutputPath = path.join(outputDir, pathNormalized);
|
|
|
const fileOutputParentPath = getParentPath(fileOutputPath);
|
|
|
|
|
|
@@ -243,6 +254,38 @@ class PageBulkExportService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
|
|
|
+ */
|
|
|
+ private async compressAndUpload(user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
|
|
|
+ const pageArchiver = this.setUpPageArchiver();
|
|
|
+ const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
|
|
|
+
|
|
|
+ const originalName = `${pageBulkExportJob._id}.${this.compressExtension}`;
|
|
|
+ const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
|
|
|
+ const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
|
|
|
+
|
|
|
+ const fileUploadService: FileUploader = this.crowi.fileUploadService;
|
|
|
+ // if the process of uploading was interrupted, delete and start from the start
|
|
|
+ if (pageBulkExportJob.uploadId != null) {
|
|
|
+ await fileUploadService.abortExistingMultipartUpload(uploadKey, pageBulkExportJob.uploadId);
|
|
|
+ }
|
|
|
+
|
|
|
+ // init multipart upload
|
|
|
+ const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
|
|
|
+ await multipartUploader.initUpload();
|
|
|
+ pageBulkExportJob.uploadId = multipartUploader.uploadId;
|
|
|
+ await pageBulkExportJob.save();
|
|
|
+
|
|
|
+ const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment);
|
|
|
+
|
|
|
+ const compressAndUploadPromise = pipelinePromise(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable);
|
|
|
+ pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
|
|
|
+ pageArchiver.finalize();
|
|
|
+
|
|
|
+ await compressAndUploadPromise;
|
|
|
+ }
|
|
|
+
|
|
|
private setUpPageArchiver(): Archiver {
|
|
|
const pageArchiver = archiver('tar', {
|
|
|
gzip: true,
|
|
|
@@ -301,37 +344,11 @@ class PageBulkExportService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private async compressAndUpload(currentUser, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
|
|
|
- const pageArchiver = this.setUpPageArchiver();
|
|
|
- const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
|
|
|
-
|
|
|
- const originalName = `${pageBulkExportJob._id}.${this.compressExtension}`;
|
|
|
- const attachment = Attachment.createWithoutSave(null, currentUser, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
|
|
|
- const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
|
|
|
-
|
|
|
- const fileUploadService: FileUploader = this.crowi.fileUploadService;
|
|
|
- // if the process of uploading was interrupted, delete and start from the start
|
|
|
- if (pageBulkExportJob.uploadId != null) {
|
|
|
- await fileUploadService.abortExistingMultipartUpload(uploadKey, pageBulkExportJob.uploadId);
|
|
|
- }
|
|
|
-
|
|
|
- // init multipart upload
|
|
|
- const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
|
|
|
- await multipartUploader.initUpload();
|
|
|
- pageBulkExportJob.uploadId = multipartUploader.uploadId;
|
|
|
- await pageBulkExportJob.save();
|
|
|
-
|
|
|
- const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment);
|
|
|
-
|
|
|
- const compressAndUploadPromise = pipelinePromise(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable);
|
|
|
- pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
|
|
|
- pageArchiver.finalize();
|
|
|
-
|
|
|
- await compressAndUploadPromise;
|
|
|
- }
|
|
|
-
|
|
|
+ /**
|
|
|
+ * Get the output directory on the fs to temporarily store page files before compressing and uploading
|
|
|
+ */
|
|
|
private getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string {
|
|
|
- return `${this.tmpOutputRootDir}/page-bulk-export/${pageBulkExportJob._id}`;
|
|
|
+ return `${this.tmpOutputRootDir}/${pageBulkExportJob._id}`;
|
|
|
}
|
|
|
|
|
|
private async notifyExportResult(
|