|
@@ -1,12 +1,15 @@
|
|
|
|
|
+import fs from 'fs';
|
|
|
|
|
+import path from 'path';
|
|
|
import type { Readable } from 'stream';
|
|
import type { Readable } from 'stream';
|
|
|
import { Writable, pipeline } from 'stream';
|
|
import { Writable, pipeline } from 'stream';
|
|
|
|
|
+import { pipeline as pipelinePromise } from 'stream/promises';
|
|
|
|
|
+
|
|
|
|
|
|
|
|
import type { HasObjectId } from '@growi/core';
|
|
import type { HasObjectId } from '@growi/core';
|
|
|
import { type IPage, isPopulated, SubscriptionStatusType } from '@growi/core';
|
|
import { type IPage, isPopulated, SubscriptionStatusType } from '@growi/core';
|
|
|
-import { normalizePath } from '@growi/core/dist/utils/path-utils';
|
|
|
|
|
|
|
+import { getParentPath, normalizePath } from '@growi/core/dist/utils/path-utils';
|
|
|
import type { Archiver } from 'archiver';
|
|
import type { Archiver } from 'archiver';
|
|
|
import archiver from 'archiver';
|
|
import archiver from 'archiver';
|
|
|
-import type { QueueObject } from 'async';
|
|
|
|
|
import gc from 'expose-gc/function';
|
|
import gc from 'expose-gc/function';
|
|
|
import mongoose from 'mongoose';
|
|
import mongoose from 'mongoose';
|
|
|
|
|
|
|
@@ -28,12 +31,8 @@ import { PageBulkExportFormat } from '../../interfaces/page-bulk-export';
|
|
|
import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
|
|
import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
|
|
|
import PageBulkExportJob from '../models/page-bulk-export-job';
|
|
import PageBulkExportJob from '../models/page-bulk-export-job';
|
|
|
|
|
|
|
|
-const logger = loggerFactory('growi:services:PageBulkExportService');
|
|
|
|
|
|
|
|
|
|
-// Custom type for back pressure workaround
|
|
|
|
|
-interface ArchiverWithQueue extends Archiver {
|
|
|
|
|
- _queue?: QueueObject<any>;
|
|
|
|
|
-}
|
|
|
|
|
|
|
+const logger = loggerFactory('growi:services:PageBulkExportService');
|
|
|
|
|
|
|
|
type ActivityParameters ={
|
|
type ActivityParameters ={
|
|
|
ip: string;
|
|
ip: string;
|
|
@@ -51,12 +50,18 @@ class PageBulkExportService {
|
|
|
|
|
|
|
|
pageBatchSize = 100;
|
|
pageBatchSize = 100;
|
|
|
|
|
|
|
|
|
|
+ compressExtension = 'tar.gz';
|
|
|
|
|
+
|
|
|
|
|
+ // temporal path of local fs to output page files before upload
|
|
|
|
|
+ // TODO: If necessary, change to a proper path in https://redmine.weseek.co.jp/issues/149512
|
|
|
|
|
+ tmpOutputRootDir = '/tmp';
|
|
|
|
|
+
|
|
|
constructor(crowi) {
|
|
constructor(crowi) {
|
|
|
this.crowi = crowi;
|
|
this.crowi = crowi;
|
|
|
this.activityEvent = crowi.event('activity');
|
|
this.activityEvent = crowi.event('activity');
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- async bulkExportWithBasePagePath(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
|
|
|
|
|
|
|
+ async createAndStartPageBulkExportJob(basePagePath: string, currentUser, activityParameters: ActivityParameters): Promise<void> {
|
|
|
const Page = mongoose.model<IPage, PageModel>('Page');
|
|
const Page = mongoose.model<IPage, PageModel>('Page');
|
|
|
const basePage = await Page.findByPathAndViewer(basePagePath, currentUser, null, true);
|
|
const basePage = await Page.findByPathAndViewer(basePagePath, currentUser, null, true);
|
|
|
|
|
|
|
@@ -64,54 +69,96 @@ class PageBulkExportService {
|
|
|
throw new Error('Base page not found or not accessible');
|
|
throw new Error('Base page not found or not accessible');
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ const pageBulkExportJob: PageBulkExportJobDocument & HasObjectId = await PageBulkExportJob.create({
|
|
|
|
|
+ user: currentUser,
|
|
|
|
|
+ page: basePage,
|
|
|
|
|
+ format: PageBulkExportFormat.markdown,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
|
|
|
|
|
+
|
|
|
|
|
+ this.bulkExportWithBasePagePath(basePagePath, currentUser, activityParameters, pageBulkExportJob);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async bulkExportWithBasePagePath(
|
|
|
|
|
+ basePagePath: string, currentUser, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument & HasObjectId,
|
|
|
|
|
+ ): Promise<void> {
|
|
|
const timeStamp = (new Date()).getTime();
|
|
const timeStamp = (new Date()).getTime();
|
|
|
- const originalName = `page-bulk-export-${timeStamp}.zip`;
|
|
|
|
|
- const attachment = Attachment.createWithoutSave(null, currentUser, originalName, 'zip', 0, AttachmentType.PAGE_BULK_EXPORT);
|
|
|
|
|
- const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
|
|
|
|
|
|
|
+ const exportName = `page-bulk-export-${timeStamp}`;
|
|
|
|
|
|
|
|
- const pagesReadable = this.getPageReadable(basePagePath);
|
|
|
|
|
- const zipArchiver = this.setUpZipArchiver();
|
|
|
|
|
- const pagesWritable = this.getPageWritable(zipArchiver);
|
|
|
|
|
|
|
+ // export pages to fs temporarily
|
|
|
|
|
+ const tmpOutputDir = `${this.tmpOutputRootDir}/${exportName}`;
|
|
|
|
|
+ try {
|
|
|
|
|
+ await this.exportPagesToFS(basePagePath, tmpOutputDir);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (err) {
|
|
|
|
|
+ await this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ const pageArchiver = this.setUpPageArchiver();
|
|
|
const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
|
|
const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
|
|
|
|
|
|
|
|
|
|
+ const originalName = `${exportName}.${this.compressExtension}`;
|
|
|
|
|
+ const attachment = Attachment.createWithoutSave(null, currentUser, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
|
|
|
|
|
+ const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
|
|
|
|
|
+
|
|
|
// init multipart upload
|
|
// init multipart upload
|
|
|
const fileUploadService: FileUploader = this.crowi.fileUploadService;
|
|
const fileUploadService: FileUploader = this.crowi.fileUploadService;
|
|
|
const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
|
|
const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
|
|
|
- let pageBulkExportJob: PageBulkExportJobDocument & HasObjectId;
|
|
|
|
|
try {
|
|
try {
|
|
|
await multipartUploader.initUpload();
|
|
await multipartUploader.initUpload();
|
|
|
- pageBulkExportJob = await PageBulkExportJob.create({
|
|
|
|
|
- user: currentUser,
|
|
|
|
|
- page: basePage,
|
|
|
|
|
- uploadId: multipartUploader.uploadId,
|
|
|
|
|
- format: PageBulkExportFormat.markdown,
|
|
|
|
|
- });
|
|
|
|
|
- await Subscription.upsertSubscription(currentUser, SupportedTargetModel.MODEL_PAGE_BULK_EXPORT_JOB, pageBulkExportJob, SubscriptionStatusType.SUBSCRIBE);
|
|
|
|
|
|
|
+ pageBulkExportJob.uploadId = multipartUploader.uploadId;
|
|
|
|
|
+ await pageBulkExportJob.save;
|
|
|
}
|
|
}
|
|
|
catch (err) {
|
|
catch (err) {
|
|
|
- logger.error(err);
|
|
|
|
|
- await multipartUploader.abortUpload();
|
|
|
|
|
- throw err;
|
|
|
|
|
|
|
+ await this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir, multipartUploader);
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment, activityParameters);
|
|
|
|
|
|
|
+ const multipartUploadWritable = this.getMultipartUploadWritable(multipartUploader, pageBulkExportJob, attachment, activityParameters, tmpOutputDir);
|
|
|
|
|
|
|
|
- // Cannot directly pipe from pagesWritable to zipArchiver due to how the 'append' method works.
|
|
|
|
|
- // Hence, execution of two pipelines is required.
|
|
|
|
|
- pipeline(pagesReadable, pagesWritable, err => this.handleExportErrorInStream(err, activityParameters, pageBulkExportJob, multipartUploader));
|
|
|
|
|
- pipeline(zipArchiver, bufferToPartSizeTransform, multipartUploadWritable,
|
|
|
|
|
- err => this.handleExportErrorInStream(err, activityParameters, pageBulkExportJob, multipartUploader));
|
|
|
|
|
|
|
+ pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable,
|
|
|
|
|
+ err => this.handleExportError(err, activityParameters, pageBulkExportJob, tmpOutputDir, multipartUploader));
|
|
|
|
|
+ pageArchiver.directory(tmpOutputDir, false);
|
|
|
|
|
+ pageArchiver.finalize();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private async handleExportErrorInStream(
|
|
|
|
|
- err: Error | null, activityParameters: ActivityParameters, pageBulkExportJob: PageBulkExportJobDocument, multipartUploader: IMultipartUploader,
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * Handles export failure with the following:
|
|
|
|
|
+ * - notify the user of the failure
|
|
|
|
|
+ * - remove the temporal output directory
|
|
|
|
|
+ * - abort multipart upload
|
|
|
|
|
+ */
|
|
|
|
|
+ // TODO: update completedAt of pageBulkExportJob, or add a failed status flag to it (https://redmine.weseek.co.jp/issues/78040)
|
|
|
|
|
+ private async handleExportError(
|
|
|
|
|
+ err: Error | null,
|
|
|
|
|
+ activityParameters: ActivityParameters,
|
|
|
|
|
+ pageBulkExportJob: PageBulkExportJobDocument,
|
|
|
|
|
+ tmpOutputDir: string,
|
|
|
|
|
+ multipartUploader?: IMultipartUploader,
|
|
|
): Promise<void> {
|
|
): Promise<void> {
|
|
|
if (err != null) {
|
|
if (err != null) {
|
|
|
- await multipartUploader.abortUpload();
|
|
|
|
|
- await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED);
|
|
|
|
|
|
|
+ logger.error(err);
|
|
|
|
|
+
|
|
|
|
|
+ const results = await Promise.allSettled([
|
|
|
|
|
+ this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED),
|
|
|
|
|
+ fs.promises.rm(tmpOutputDir, { recursive: true, force: true }),
|
|
|
|
|
+ multipartUploader?.abortUpload(),
|
|
|
|
|
+ ]);
|
|
|
|
|
+ results.forEach((result) => {
|
|
|
|
|
+ if (result.status === 'rejected') logger.error(result.reason);
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ private async exportPagesToFS(basePagePath: string, outputDir: string): Promise<void> {
|
|
|
|
|
+ const pagesReadable = this.getPageReadable(basePagePath);
|
|
|
|
|
+ const pagesWritable = this.getPageWritable(outputDir);
|
|
|
|
|
+
|
|
|
|
|
+ return pipelinePromise(pagesReadable, pagesWritable);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* Get a Readable of all the pages under the specified path, including the root page.
|
|
* Get a Readable of all the pages under the specified path, including the root page.
|
|
|
*/
|
|
*/
|
|
@@ -130,9 +177,9 @@ class PageBulkExportService {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Get a Writable that writes the page body to a zip file
|
|
|
|
|
|
|
+ * Get a Writable that writes the page body temporarily to fs
|
|
|
*/
|
|
*/
|
|
|
- private getPageWritable(zipArchiver: Archiver): Writable {
|
|
|
|
|
|
|
+ private getPageWritable(outputDir: string): Writable {
|
|
|
return new Writable({
|
|
return new Writable({
|
|
|
objectMode: true,
|
|
objectMode: true,
|
|
|
write: async(page: PageDocument, encoding, callback) => {
|
|
write: async(page: PageDocument, encoding, callback) => {
|
|
@@ -141,16 +188,12 @@ class PageBulkExportService {
|
|
|
|
|
|
|
|
if (revision != null && isPopulated(revision)) {
|
|
if (revision != null && isPopulated(revision)) {
|
|
|
const markdownBody = revision.body;
|
|
const markdownBody = revision.body;
|
|
|
- const pathNormalized = normalizePath(page.path);
|
|
|
|
|
- // Since archiver does not provide a proper way to back pressure at the moment, use the _queue property as a workaround
|
|
|
|
|
- // ref: https://github.com/archiverjs/node-archiver/issues/611
|
|
|
|
|
- const { _queue } = zipArchiver.append(markdownBody, { name: `${pathNormalized}.md` }) as ArchiverWithQueue;
|
|
|
|
|
- if (_queue == null) {
|
|
|
|
|
- throw Error('Cannot back pressure the export pipeline. Aborting the export.');
|
|
|
|
|
- }
|
|
|
|
|
- if (_queue.length() > this.pageBatchSize) {
|
|
|
|
|
- await _queue.drain();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ const pathNormalized = `${normalizePath(page.path)}.md`;
|
|
|
|
|
+ const fileOutputPath = path.join(outputDir, pathNormalized);
|
|
|
|
|
+ const fileOutputParentPath = getParentPath(fileOutputPath);
|
|
|
|
|
+
|
|
|
|
|
+ await fs.promises.mkdir(fileOutputParentPath, { recursive: true });
|
|
|
|
|
+ await fs.promises.writeFile(fileOutputPath, markdownBody);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
catch (err) {
|
|
catch (err) {
|
|
@@ -159,25 +202,21 @@ class PageBulkExportService {
|
|
|
}
|
|
}
|
|
|
callback();
|
|
callback();
|
|
|
},
|
|
},
|
|
|
- final: (callback) => {
|
|
|
|
|
- zipArchiver.finalize();
|
|
|
|
|
- callback();
|
|
|
|
|
- },
|
|
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private setUpZipArchiver(): Archiver {
|
|
|
|
|
- const zipArchiver = archiver('zip', {
|
|
|
|
|
- zlib: { level: 9 }, // maximum compression
|
|
|
|
|
|
|
+ private setUpPageArchiver(): Archiver {
|
|
|
|
|
+ const pageArchiver = archiver('tar', {
|
|
|
|
|
+ gzip: true,
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
// good practice to catch warnings (ie stat failures and other non-blocking errors)
|
|
// good practice to catch warnings (ie stat failures and other non-blocking errors)
|
|
|
- zipArchiver.on('warning', (err) => {
|
|
|
|
|
|
|
+ pageArchiver.on('warning', (err) => {
|
|
|
if (err.code === 'ENOENT') logger.error(err);
|
|
if (err.code === 'ENOENT') logger.error(err);
|
|
|
else throw err;
|
|
else throw err;
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
- return zipArchiver;
|
|
|
|
|
|
|
+ return pageArchiver;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private getMultipartUploadWritable(
|
|
private getMultipartUploadWritable(
|
|
@@ -185,6 +224,7 @@ class PageBulkExportService {
|
|
|
pageBulkExportJob: PageBulkExportJobDocument,
|
|
pageBulkExportJob: PageBulkExportJobDocument,
|
|
|
attachment: IAttachmentDocument,
|
|
attachment: IAttachmentDocument,
|
|
|
activityParameters: ActivityParameters,
|
|
activityParameters: ActivityParameters,
|
|
|
|
|
+ tmpOutputDir: string,
|
|
|
): Writable {
|
|
): Writable {
|
|
|
let partNumber = 1;
|
|
let partNumber = 1;
|
|
|
|
|
|
|
@@ -216,6 +256,7 @@ class PageBulkExportService {
|
|
|
await pageBulkExportJob.save();
|
|
await pageBulkExportJob.save();
|
|
|
|
|
|
|
|
await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED);
|
|
await this.notifyExportResult(activityParameters, pageBulkExportJob, SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED);
|
|
|
|
|
+ await fs.promises.rm(tmpOutputDir, { recursive: true, force: true });
|
|
|
}
|
|
}
|
|
|
catch (err) {
|
|
catch (err) {
|
|
|
callback(err);
|
|
callback(err);
|