Просмотр исходного кода

enable page bulk export for all file upload types

Futa Arai 1 год назад
Родитель
Сommit
d2509d73c1

+ 0 - 2
apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts

@@ -30,8 +30,6 @@ export interface IPageBulkExportJob {
   user: Ref<IUser>, // user that started export job
   user: Ref<IUser>, // user that started export job
   page: Ref<IPage>, // the root page of page tree to export
   page: Ref<IPage>, // the root page of page tree to export
   lastExportedPagePath?: string, // the path of page that was exported to the fs last
   lastExportedPagePath?: string, // the path of page that was exported to the fs last
-  uploadId?: string, // upload ID of multipart upload of S3/GCS
-  uploadKey?: string, // upload key of multipart upload of S3/GCS
   format: PageBulkExportFormat,
   format: PageBulkExportFormat,
   completedAt?: Date, // the date at which job was completed
   completedAt?: Date, // the date at which job was completed
   attachment?: Ref<IAttachment>,
   attachment?: Ref<IAttachment>,

+ 0 - 2
apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts

@@ -13,8 +13,6 @@ const pageBulkExportJobSchema = new Schema<PageBulkExportJobDocument>({
   user: { type: Schema.Types.ObjectId, ref: 'User', required: true },
   user: { type: Schema.Types.ObjectId, ref: 'User', required: true },
   page: { type: Schema.Types.ObjectId, ref: 'Page', required: true },
   page: { type: Schema.Types.ObjectId, ref: 'Page', required: true },
   lastExportedPagePath: { type: String },
   lastExportedPagePath: { type: String },
-  uploadId: { type: String, unique: true, sparse: true },
-  uploadKey: { type: String, unique: true, sparse: true },
   format: { type: String, enum: Object.values(PageBulkExportFormat), required: true },
   format: { type: String, enum: Object.values(PageBulkExportFormat), required: true },
   completedAt: { type: Date },
   completedAt: { type: Date },
   attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' },
   attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' },

+ 2 - 9
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -13,7 +13,6 @@ import type { ActivityDocument } from '~/server/models/activity';
 import type { PageModel } from '~/server/models/page';
 import type { PageModel } from '~/server/models/page';
 import { configManager } from '~/server/service/config-manager';
 import { configManager } from '~/server/service/config-manager';
 import CronService from '~/server/service/cron';
 import CronService from '~/server/service/cron';
-import type { FileUploader } from '~/server/service/file-uploader';
 import { preNotifyService } from '~/server/service/pre-notify';
 import { preNotifyService } from '~/server/service/pre-notify';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
 
 
@@ -38,7 +37,7 @@ export interface IPageBulkExportJobCronService {
   maxPartSize: number;
   maxPartSize: number;
   compressExtension: string;
   compressExtension: string;
   setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
   setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
-  handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void;
+  handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void;
   notifyExportResultAndCleanUp(action: SupportedActionType, pageBulkExportJob: PageBulkExportJobDocument): Promise<void>;
   notifyExportResultAndCleanUp(action: SupportedActionType, pageBulkExportJob: PageBulkExportJobDocument): Promise<void>;
   getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string;
   getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string;
 }
 }
@@ -151,7 +150,7 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
     }
     }
   }
   }
 
 
-  async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
+  async handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
     if (err == null) return;
     if (err == null) return;
 
 
     if (err instanceof BulkExportJobExpiredError) {
     if (err instanceof BulkExportJobExpiredError) {
@@ -195,7 +194,6 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
    * Do the following in parallel:
    * Do the following in parallel:
    * - delete page snapshots
    * - delete page snapshots
    * - remove the temporal output directory
    * - remove the temporal output directory
-   * - abort multipart upload
    */
    */
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
     const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id);
     const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id);
@@ -214,11 +212,6 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
       fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
       fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
     ];
     ];
 
 
-    const fileUploadService: FileUploader = this.crowi.fileUploadService;
-    if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
-      promises.push(fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId));
-    }
-
     const results = await Promise.allSettled(promises);
     const results = await Promise.allSettled(promises);
     results.forEach((result) => {
     results.forEach((result) => {
       if (result.status === 'rejected') logger.error(result.reason);
       if (result.status === 'rejected') logger.error(result.reason);

+ 21 - 69
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts

@@ -1,17 +1,12 @@
-import { Writable, pipeline } from 'stream';
-
 import type { Archiver } from 'archiver';
 import type { Archiver } from 'archiver';
 import archiver from 'archiver';
 import archiver from 'archiver';
-import gc from 'expose-gc/function';
 
 
 import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
 import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
 import { SupportedAction } from '~/interfaces/activity';
 import { SupportedAction } from '~/interfaces/activity';
-import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
+import { AttachmentType } from '~/server/interfaces/attachment';
 import type { IAttachmentDocument } from '~/server/models/attachment';
 import type { IAttachmentDocument } from '~/server/models/attachment';
 import { Attachment } from '~/server/models/attachment';
 import { Attachment } from '~/server/models/attachment';
 import type { FileUploader } from '~/server/service/file-uploader';
 import type { FileUploader } from '~/server/service/file-uploader';
-import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
-import { getBufferToFixedSizeTransform } from '~/server/util/stream';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
 
 
 import type { IPageBulkExportJobCronService } from '..';
 import type { IPageBulkExportJobCronService } from '..';
@@ -33,85 +28,42 @@ function setUpPageArchiver(): Archiver {
   return pageArchiver;
   return pageArchiver;
 }
 }
 
 
-function getMultipartUploadWritable(
-    this: IPageBulkExportJobCronService,
-    multipartUploader: IMultipartUploader,
-    pageBulkExportJob: PageBulkExportJobDocument,
-    attachment: IAttachmentDocument,
-): Writable {
-  let partNumber = 1;
-
-  return new Writable({
-    write: async(part: Buffer, encoding, callback) => {
-      try {
-        await multipartUploader.uploadPart(part, partNumber);
-        partNumber += 1;
-        // First aid to prevent unexplained memory leaks
-        logger.info('global.gc() invoked.');
-        gc();
-      }
-      catch (err) {
-        await multipartUploader.abortUpload();
-        callback(err);
-        return;
-      }
-      callback();
-    },
-    final: async(callback) => {
-      try {
-        await multipartUploader.completeUpload();
+async function postProcess(
+    this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument, attachment: IAttachmentDocument, fileSize: number,
+): Promise<void> {
+  attachment.fileSize = fileSize;
+  await attachment.save();
 
 
-        const fileSize = await multipartUploader.getUploadedFileSize();
-        attachment.fileSize = fileSize;
-        await attachment.save();
-
-        pageBulkExportJob.completedAt = new Date();
-        pageBulkExportJob.attachment = attachment._id;
-        pageBulkExportJob.status = PageBulkExportJobStatus.completed;
-        await pageBulkExportJob.save();
+  pageBulkExportJob.completedAt = new Date();
+  pageBulkExportJob.attachment = attachment._id;
+  pageBulkExportJob.status = PageBulkExportJobStatus.completed;
+  await pageBulkExportJob.save();
 
 
-        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
-      }
-      catch (err) {
-        callback(err);
-        return;
-      }
-      callback();
-    },
-  });
+  await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
 }
 }
 
 
-
 /**
 /**
  * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
  * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
  */
  */
 export async function compressAndUploadAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
 export async function compressAndUploadAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
   const pageArchiver = setUpPageArchiver();
   const pageArchiver = setUpPageArchiver();
-  const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
 
 
   if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set');
   if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set');
   const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`;
   const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`;
   const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
   const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
-  const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
 
 
   const fileUploadService: FileUploader = this.crowi.fileUploadService;
   const fileUploadService: FileUploader = this.crowi.fileUploadService;
-  // if the process of uploading was interrupted, delete and start from the start
-  if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
-    await fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId);
-  }
 
 
-  // init multipart upload
-  const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
-  await multipartUploader.initUpload();
-  pageBulkExportJob.uploadKey = uploadKey;
-  pageBulkExportJob.uploadId = multipartUploader.uploadId;
-  await pageBulkExportJob.save();
-
-  const multipartUploadWritable = getMultipartUploadWritable.bind(this)(multipartUploader, pageBulkExportJob, attachment);
-
-  pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable, (err) => {
-    this.handlePipelineError(err, pageBulkExportJob);
-  });
   pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
   pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
   pageArchiver.finalize();
   pageArchiver.finalize();
+  this.setStreamInExecution(pageBulkExportJob._id, pageArchiver);
+
+  try {
+    await fileUploadService.uploadAttachment(pageArchiver, attachment);
+  }
+  catch (e) {
+    logger.error(e);
+    this.handleError(e, pageBulkExportJob);
+  }
+  await postProcess.bind(this)(pageBulkExportJob, attachment, pageArchiver.pointer());
 }
 }

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts

@@ -85,6 +85,6 @@ export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronServi
   this.setStreamInExecution(pageBulkExportJob._id, pagesReadable);
   this.setStreamInExecution(pageBulkExportJob._id, pagesReadable);
 
 
   pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
   pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
-    this.handlePipelineError(err, pageBulkExportJob);
+    this.handleError(err, pageBulkExportJob);
   });
   });
 }
 }

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts

@@ -68,6 +68,6 @@ export function exportPagesToFsAsync(this: IPageBulkExportJobCronService, pageBu
   this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable);
   this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable);
 
 
   pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
   pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
-    this.handlePipelineError(err, pageBulkExportJob);
+    this.handleError(err, pageBulkExportJob);
   });
   });
 }
 }

+ 3 - 3
apps/app/src/server/service/file-uploader/azure.ts

@@ -1,4 +1,4 @@
-import type { ReadStream } from 'fs';
+import type { Readable } from 'stream';
 
 
 import type { TokenCredential } from '@azure/identity';
 import type { TokenCredential } from '@azure/identity';
 import { ClientSecretCredential } from '@azure/identity';
 import { ClientSecretCredential } from '@azure/identity';
@@ -102,7 +102,7 @@ class AzureFileUploader extends AbstractFileUploader {
   /**
   /**
    * @inheritdoc
    * @inheritdoc
    */
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     if (!this.getIsUploadable()) {
     if (!this.getIsUploadable()) {
       throw new Error('Azure is not configured.');
       throw new Error('Azure is not configured.');
     }
     }
@@ -113,7 +113,7 @@ class AzureFileUploader extends AbstractFileUploader {
     const blockBlobClient: BlockBlobClient = containerClient.getBlockBlobClient(filePath);
     const blockBlobClient: BlockBlobClient = containerClient.getBlockBlobClient(filePath);
     const contentHeaders = new ContentHeaders(attachment);
     const contentHeaders = new ContentHeaders(attachment);
 
 
-    await blockBlobClient.uploadStream(readStream, undefined, undefined, {
+    await blockBlobClient.uploadStream(readable, undefined, undefined, {
       blobHTTPHeaders: {
       blobHTTPHeaders: {
         // put type and the file name for reference information when uploading
         // put type and the file name for reference information when uploading
         blobContentType: contentHeaders.contentType?.value.toString(),
         blobContentType: contentHeaders.contentType?.value.toString(),

+ 3 - 3
apps/app/src/server/service/file-uploader/file-uploader.ts

@@ -1,4 +1,4 @@
-import type { ReadStream } from 'fs';
+import type { Readable } from 'stream';
 
 
 import type { Response } from 'express';
 import type { Response } from 'express';
 import { v4 as uuidv4 } from 'uuid';
 import { v4 as uuidv4 } from 'uuid';
@@ -39,7 +39,7 @@ export interface FileUploader {
   getTotalFileSize(): Promise<number>,
   getTotalFileSize(): Promise<number>,
   doCheckLimit(uploadFileSize: number, maxFileSize: number, totalLimit: number): Promise<ICheckLimitResult>,
   doCheckLimit(uploadFileSize: number, maxFileSize: number, totalLimit: number): Promise<ICheckLimitResult>,
   determineResponseMode(): ResponseMode,
   determineResponseMode(): ResponseMode,
-  uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>,
+  uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void>,
   respond(res: Response, attachment: IAttachmentDocument, opts?: RespondOptions): void,
   respond(res: Response, attachment: IAttachmentDocument, opts?: RespondOptions): void,
   findDeliveryFile(attachment: IAttachmentDocument): Promise<NodeJS.ReadableStream>,
   findDeliveryFile(attachment: IAttachmentDocument): Promise<NodeJS.ReadableStream>,
   generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise<TemporaryUrl>,
   generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise<TemporaryUrl>,
@@ -164,7 +164,7 @@ export abstract class AbstractFileUploader implements FileUploader {
     throw new Error('Multipart upload not available for file upload type');
     throw new Error('Multipart upload not available for file upload type');
   }
   }
 
 
-  abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>;
+  abstract uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void>;
 
 
   /**
   /**
    * Abort an existing multipart upload without creating a MultipartUploader instance
    * Abort an existing multipart upload without creating a MultipartUploader instance

+ 2 - 2
apps/app/src/server/service/file-uploader/gridfs.ts

@@ -62,7 +62,7 @@ class GridfsFileUploader extends AbstractFileUploader {
   /**
   /**
    * @inheritdoc
    * @inheritdoc
    */
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     logger.debug(`File uploading: fileName=${attachment.fileName}`);
     logger.debug(`File uploading: fileName=${attachment.fileName}`);
 
 
     const contentHeaders = new ContentHeaders(attachment);
     const contentHeaders = new ContentHeaders(attachment);
@@ -73,7 +73,7 @@ class GridfsFileUploader extends AbstractFileUploader {
         filename: attachment.fileName,
         filename: attachment.fileName,
         contentType: contentHeaders.contentType?.value.toString(),
         contentType: contentHeaders.contentType?.value.toString(),
       },
       },
-      readStream,
+      readable,
     );
     );
   }
   }
 
 

+ 1 - 1
apps/app/src/server/service/file-uploader/local.ts

@@ -76,7 +76,7 @@ class LocalFileUploader extends AbstractFileUploader {
   /**
   /**
    * @inheritdoc
    * @inheritdoc
    */
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     throw new Error('Method not implemented.');
     throw new Error('Method not implemented.');
   }
   }