Przeglądaj źródła

Merge pull request #9467 from weseek/feat/158222-158279-bulk-export-to-normal-fs

Feat/158222 158279 bulk export for file upload types other than s3/gcs
Yuki Takei 1 rok temu
rodzic
commit
f8dfda879c
20 zmienionych plików z 114 dodań i 182 usunięć
  1. 0 1
      apps/app/public/static/locales/en_US/translation.json
  2. 0 1
      apps/app/public/static/locales/fr_FR/translation.json
  3. 0 1
      apps/app/public/static/locales/ja_JP/translation.json
  4. 0 1
      apps/app/public/static/locales/zh_CN/translation.json
  5. 12 21
      apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx
  6. 0 2
      apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts
  7. 0 2
      apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts
  8. 2 1
      apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts
  9. 7 13
      apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts
  10. 0 117
      apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts
  11. 70 0
      apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts
  12. 1 1
      apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts
  13. 1 1
      apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts
  14. 2 1
      apps/app/src/pages/[[...path]].page.tsx
  15. 3 3
      apps/app/src/server/service/file-uploader/aws/index.ts
  16. 3 3
      apps/app/src/server/service/file-uploader/azure.ts
  17. 3 3
      apps/app/src/server/service/file-uploader/file-uploader.ts
  18. 7 5
      apps/app/src/server/service/file-uploader/gcs/index.ts
  19. 2 3
      apps/app/src/server/service/file-uploader/gridfs.ts
  20. 1 2
      apps/app/src/server/service/file-uploader/local.ts

+ 0 - 1
apps/app/public/static/locales/en_US/translation.json

@@ -666,7 +666,6 @@
     "bulk_export_started": "Please wait a moment...",
     "bulk_export_download_expired": "Download period has expired",
     "bulk_export_job_expired": "Export process was canceled because it took too long",
-    "bulk_export_only_available_for": "Only available for AWS or GCP",
     "export_in_progress": "Export in progress",
     "export_in_progress_explanation": "Export with the same format is already in progress. Would you like to restart to export the latest page contents?",
     "export_cancel_warning": "The following export in progress will be canceled",

+ 0 - 1
apps/app/public/static/locales/fr_FR/translation.json

@@ -659,7 +659,6 @@
     "bulk_export_started": "Patientez s'il-vous-plait...",
     "bulk_export_download_expired": "La période de téléchargement a expiré",
     "bulk_export_job_expired": "Le traitement a été interrompu car le temps d'exportation était trop long",
-    "bulk_export_only_available_for": "Uniquement disponible pour AWS ou GCP",
     "export_in_progress": "Exportation en cours",
     "export_in_progress_explanation": "L'exportation avec le même format est déjà en cours. Souhaitez-vous redémarrer pour exporter le dernier contenu de la page ?",
     "export_cancel_warning": "Les exportations suivantes en cours seront annulées",

+ 0 - 1
apps/app/public/static/locales/ja_JP/translation.json

@@ -698,7 +698,6 @@
     "bulk_export_started": "ただいま準備中です...",
     "bulk_export_download_expired": "ダウンロード期限が切れました",
     "bulk_export_job_expired": "エクスポート時間が長すぎるため、処理が中断されました",
-    "bulk_export_only_available_for": "AWS と GCP のみ対応しています",
     "export_in_progress": "エクスポート進行中",
     "export_in_progress_explanation": "既に同じ形式でのエクスポートが進行中です。最新のページ内容でエクスポートを最初からやり直しますか?",
     "export_cancel_warning": "進行中の以下のエクスポートはキャンセルされます",

+ 0 - 1
apps/app/public/static/locales/zh_CN/translation.json

@@ -668,7 +668,6 @@
     "bulk_export_started": "目前我们正在准备...",
     "bulk_export_download_expired": "下载期限已过",
     "bulk_export_job_expired": "由于导出时间太长,处理被中断",
-    "bulk_export_only_available_for": "仅适用于 AWS 或 GCP",
     "export_in_progress": "导出正在进行中",
     "export_in_progress_explanation": "已在进行相同格式的导出。您要重新启动以导出最新的页面内容吗?",
     "export_cancel_warning": "以下正在进行的导出将被取消",

+ 12 - 21
apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx

@@ -85,8 +85,6 @@ const PageOperationMenuItems = (props: PageOperationMenuItemsProps): JSX.Element
 
   const { data: codeMirrorEditor } = useCodeMirrorEditorIsolated(GlobalCodeMirrorEditorKey.MAIN);
 
-  const [isBulkExportTooltipOpen, setIsBulkExportTooltipOpen] = useState(false);
-
   const syncLatestRevisionBodyHandler = useCallback(async() => {
     // eslint-disable-next-line no-alert
     const answer = window.confirm(t('sync-latest-revision-body.confirm'));
@@ -144,25 +142,18 @@ const PageOperationMenuItems = (props: PageOperationMenuItemsProps): JSX.Element
       </DropdownItem>
 
       {/* Bulk export */}
-      <span id="bulkExportDropdownItem">
-        <DropdownItem
-          disabled={!isPageBulkExportEnabled}
-          onClick={openPageBulkExportSelectModal}
-          className="grw-page-control-dropdown-item"
-        >
-          <span className="material-symbols-outlined me-1 grw-page-control-dropdown-icon">cloud_download</span>
-          {t('page_export.bulk_export')}
-        </DropdownItem>
-      </span>
-      <Tooltip
-        placement="left"
-        isOpen={!isPageBulkExportEnabled && isBulkExportTooltipOpen}
-        // Tooltip cannot be activated when target is disabled so set the target to wrapper span
-        target="bulkExportDropdownItem"
-        toggle={() => setIsBulkExportTooltipOpen(!isBulkExportTooltipOpen)}
-      >
-        {t('page_export.bulk_export_only_available_for')}
-      </Tooltip>
+      {isPageBulkExportEnabled && (
+        <span id="bulkExportDropdownItem">
+          <DropdownItem
+            disabled={!isPageBulkExportEnabled}
+            onClick={openPageBulkExportSelectModal}
+            className="grw-page-control-dropdown-item"
+          >
+            <span className="material-symbols-outlined me-1 grw-page-control-dropdown-icon">cloud_download</span>
+            {t('page_export.bulk_export')}
+          </DropdownItem>
+        </span>
+      )}
 
       <DropdownItem divider />
 

+ 0 - 2
apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts

@@ -30,8 +30,6 @@ export interface IPageBulkExportJob {
   user: Ref<IUser>, // user that started export job
   page: Ref<IPage>, // the root page of page tree to export
   lastExportedPagePath?: string, // the path of page that was exported to the fs last
-  uploadId?: string, // upload ID of multipart upload of S3/GCS
-  uploadKey?: string, // upload key of multipart upload of S3/GCS
   format: PageBulkExportFormat,
   completedAt?: Date, // the date at which job was completed
   attachment?: Ref<IAttachment>,

+ 0 - 2
apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts

@@ -13,8 +13,6 @@ const pageBulkExportJobSchema = new Schema<PageBulkExportJobDocument>({
   user: { type: Schema.Types.ObjectId, ref: 'User', required: true },
   page: { type: Schema.Types.ObjectId, ref: 'Page', required: true },
   lastExportedPagePath: { type: String },
-  uploadId: { type: String, unique: true, sparse: true },
-  uploadKey: { type: String, unique: true, sparse: true },
   format: { type: String, enum: Object.values(PageBulkExportFormat), required: true },
   completedAt: { type: Date },
   attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' },

+ 2 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts

@@ -29,7 +29,8 @@ class PageBulkExportJobCleanUpCronService extends CronService {
   }
 
   override async executeJob(): Promise<void> {
-    const isPageBulkExportEnabled = PageBulkExportEnabledFileUploadTypes.includes(configManager.getConfig('crowi', 'app:fileUploadType'));
+    // TODO: allow enabling/disabling bulk export in https://redmine.weseek.co.jp/issues/158221
+    const isPageBulkExportEnabled = true;
     if (!isPageBulkExportEnabled) return;
 
     await this.deleteExpiredExportJobs();

+ 7 - 13
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -13,7 +13,6 @@ import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
 import type { ActivityDocument } from '~/server/models/activity';
 import { configManager } from '~/server/service/config-manager';
 import CronService from '~/server/service/cron';
-import type { FileUploader } from '~/server/service/file-uploader';
 import { preNotifyService } from '~/server/service/pre-notify';
 import loggerFactory from '~/utils/logger';
 
@@ -24,7 +23,7 @@ import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snaps
 
 
 import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
-import { compressAndUploadAsync } from './steps/compress-and-upload-async';
+import { compressAndUpload } from './steps/compress-and-upload';
 import { createPageSnapshotsAsync } from './steps/create-page-snapshots-async';
 import { exportPagesToFsAsync } from './steps/export-pages-to-fs-async';
 
@@ -37,7 +36,8 @@ export interface IPageBulkExportJobCronService {
   maxPartSize: number;
   compressExtension: string;
   setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
-  handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void;
+  removeStreamInExecution(jobId: ObjectIdLike): void;
+  handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void;
   notifyExportResultAndCleanUp(action: SupportedActionType, pageBulkExportJob: PageBulkExportJobDocument): Promise<void>;
   getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string;
 }
@@ -157,7 +157,7 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
         exportPagesToFsAsync.bind(this)(pageBulkExportJob);
       }
       else if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) {
-        await compressAndUploadAsync.bind(this)(user, pageBulkExportJob);
+        compressAndUpload.bind(this)(user, pageBulkExportJob);
       }
     }
     catch (err) {
@@ -167,11 +167,11 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
   }
 
   /**
-   * Handle errors that occurred inside a stream pipeline
+   * Handle errors that occurred during page bulk export
    * @param err error
    * @param pageBulkExportJob PageBulkExportJob executed in the pipeline
    */
-  async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
+  async handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) {
     if (err == null) return;
 
     if (err instanceof BulkExportJobExpiredError) {
@@ -215,7 +215,6 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
    * Do the following in parallel:
    * - delete page snapshots
    * - remove the temporal output directory
-   * - abort multipart upload
    */
   async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) {
     const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id);
@@ -226,19 +225,14 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor
       else {
         streamInExecution.destroy(new BulkExportJobExpiredError());
       }
+      this.removeStreamInExecution(pageBulkExportJob._id);
     }
-    this.removeStreamInExecution(pageBulkExportJob._id);
 
     const promises = [
       PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }),
       fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }),
     ];
 
-    const fileUploadService: FileUploader = this.crowi.fileUploadService;
-    if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
-      promises.push(fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId));
-    }
-
     const results = await Promise.allSettled(promises);
     results.forEach((result) => {
       if (result.status === 'rejected') logger.error(result.reason);

+ 0 - 117
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts

@@ -1,117 +0,0 @@
-import { Writable, pipeline } from 'stream';
-
-import type { Archiver } from 'archiver';
-import archiver from 'archiver';
-import gc from 'expose-gc/function';
-
-import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
-import { SupportedAction } from '~/interfaces/activity';
-import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
-import type { IAttachmentDocument } from '~/server/models/attachment';
-import { Attachment } from '~/server/models/attachment';
-import type { FileUploader } from '~/server/service/file-uploader';
-import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
-import { getBufferToFixedSizeTransform } from '~/server/util/stream';
-import loggerFactory from '~/utils/logger';
-
-import type { IPageBulkExportJobCronService } from '..';
-import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job';
-
-const logger = loggerFactory('growi:service:page-bulk-export-job-cron:compress-and-upload-async');
-
-function setUpPageArchiver(): Archiver {
-  const pageArchiver = archiver('tar', {
-    gzip: true,
-  });
-
-  // good practice to catch warnings (ie stat failures and other non-blocking errors)
-  pageArchiver.on('warning', (err) => {
-    if (err.code === 'ENOENT') logger.error(err);
-    else throw err;
-  });
-
-  return pageArchiver;
-}
-
-function getMultipartUploadWritable(
-    this: IPageBulkExportJobCronService,
-    multipartUploader: IMultipartUploader,
-    pageBulkExportJob: PageBulkExportJobDocument,
-    attachment: IAttachmentDocument,
-): Writable {
-  let partNumber = 1;
-
-  return new Writable({
-    write: async(part: Buffer, encoding, callback) => {
-      try {
-        await multipartUploader.uploadPart(part, partNumber);
-        partNumber += 1;
-        // First aid to prevent unexplained memory leaks
-        logger.info('global.gc() invoked.');
-        gc();
-      }
-      catch (err) {
-        await multipartUploader.abortUpload();
-        callback(err);
-        return;
-      }
-      callback();
-    },
-    final: async(callback) => {
-      try {
-        await multipartUploader.completeUpload();
-
-        const fileSize = await multipartUploader.getUploadedFileSize();
-        attachment.fileSize = fileSize;
-        await attachment.save();
-
-        pageBulkExportJob.completedAt = new Date();
-        pageBulkExportJob.attachment = attachment._id;
-        pageBulkExportJob.status = PageBulkExportJobStatus.completed;
-        await pageBulkExportJob.save();
-
-        await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
-      }
-      catch (err) {
-        callback(err);
-        return;
-      }
-      callback();
-    },
-  });
-}
-
-
-/**
- * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
- */
-export async function compressAndUploadAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
-  const pageArchiver = setUpPageArchiver();
-  const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize);
-
-  if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set');
-  const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`;
-  const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
-  const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`;
-
-  const fileUploadService: FileUploader = this.crowi.fileUploadService;
-  // if the process of uploading was interrupted, delete and start from the start
-  if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) {
-    await fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId);
-  }
-
-  // init multipart upload
-  const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize);
-  await multipartUploader.initUpload();
-  pageBulkExportJob.uploadKey = uploadKey;
-  pageBulkExportJob.uploadId = multipartUploader.uploadId;
-  await pageBulkExportJob.save();
-
-  const multipartUploadWritable = getMultipartUploadWritable.bind(this)(multipartUploader, pageBulkExportJob, attachment);
-
-  pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable, (err) => {
-    this.handlePipelineError(err, pageBulkExportJob);
-  });
-  pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
-  pageArchiver.finalize();
-}

+ 70 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts

@@ -0,0 +1,70 @@
+import type { Archiver } from 'archiver';
+import archiver from 'archiver';
+
+import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
+import { SupportedAction } from '~/interfaces/activity';
+import { AttachmentType } from '~/server/interfaces/attachment';
+import type { IAttachmentDocument } from '~/server/models/attachment';
+import { Attachment } from '~/server/models/attachment';
+import type { FileUploader } from '~/server/service/file-uploader';
+import loggerFactory from '~/utils/logger';
+
+import type { IPageBulkExportJobCronService } from '..';
+import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job';
+
+const logger = loggerFactory('growi:service:page-bulk-export-job-cron:compress-and-upload-async');
+
+function setUpPageArchiver(): Archiver {
+  const pageArchiver = archiver('tar', {
+    gzip: true,
+  });
+
+  // good practice to catch warnings (ie stat failures and other non-blocking errors)
+  pageArchiver.on('warning', (err) => {
+    if (err.code === 'ENOENT') logger.error(err);
+    else throw err;
+  });
+
+  return pageArchiver;
+}
+
+async function postProcess(
+    this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument, attachment: IAttachmentDocument, fileSize: number,
+): Promise<void> {
+  attachment.fileSize = fileSize;
+  await attachment.save();
+
+  pageBulkExportJob.completedAt = new Date();
+  pageBulkExportJob.attachment = attachment._id;
+  pageBulkExportJob.status = PageBulkExportJobStatus.completed;
+  await pageBulkExportJob.save();
+
+  this.removeStreamInExecution(pageBulkExportJob._id);
+  await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob);
+}
+
+/**
+ * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
+ */
+export async function compressAndUpload(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise<void> {
+  const pageArchiver = setUpPageArchiver();
+
+  if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set');
+  const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`;
+  const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT);
+
+  const fileUploadService: FileUploader = this.crowi.fileUploadService;
+
+  pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
+  pageArchiver.finalize();
+  this.setStreamInExecution(pageBulkExportJob._id, pageArchiver);
+
+  try {
+    await fileUploadService.uploadAttachment(pageArchiver, attachment);
+  }
+  catch (e) {
+    logger.error(e);
+    this.handleError(e, pageBulkExportJob);
+  }
+  await postProcess.bind(this)(pageBulkExportJob, attachment, pageArchiver.pointer());
+}

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts

@@ -98,6 +98,6 @@ export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronServi
   this.setStreamInExecution(pageBulkExportJob._id, pagesReadable);
 
   pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
-    this.handlePipelineError(err, pageBulkExportJob);
+    this.handleError(err, pageBulkExportJob);
   });
 }

+ 1 - 1
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts

@@ -74,6 +74,6 @@ export function exportPagesToFsAsync(this: IPageBulkExportJobCronService, pageBu
   this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable);
 
   pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
-    this.handlePipelineError(err, pageBulkExportJob);
+    this.handleError(err, pageBulkExportJob);
   });
 }

+ 2 - 1
apps/app/src/pages/[[...path]].page.tsx

@@ -585,7 +585,8 @@ function injectServerConfigurations(context: GetServerSidePropsContext, props: P
   props.disableLinkSharing = configManager.getConfig('crowi', 'security:disableLinkSharing');
   props.isUploadAllFileAllowed = crowi.fileUploadService.getFileUploadEnabled();
   props.isUploadEnabled = crowi.fileUploadService.getIsUploadable();
-  props.isPageBulkExportEnabled = PageBulkExportEnabledFileUploadTypes.includes(configManager.getConfig('crowi', 'app:fileUploadType'));
+  // TODO: allow enabling/disabling bulk export in https://redmine.weseek.co.jp/issues/158221
+  props.isPageBulkExportEnabled = true;
 
   props.isLocalAccountRegistrationEnabled = crowi.passportService.isLocalStrategySetup
   && configManager.getConfig('crowi', 'security:registrationMode') !== RegistrationMode.CLOSED;

+ 3 - 3
apps/app/src/server/service/file-uploader/aws/index.ts

@@ -1,4 +1,4 @@
-import type { ReadStream } from 'fs';
+import type { Readable } from 'stream';
 
 import type { GetObjectCommandInput, HeadObjectCommandInput } from '@aws-sdk/client-s3';
 import {
@@ -157,7 +157,7 @@ class AwsFileUploader extends AbstractFileUploader {
   /**
    * @inheritdoc
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     if (!this.getIsUploadable()) {
       throw new Error('AWS is not configured.');
     }
@@ -172,7 +172,7 @@ class AwsFileUploader extends AbstractFileUploader {
     await s3.send(new PutObjectCommand({
       Bucket: getS3Bucket(),
       Key: filePath,
-      Body: readStream,
+      Body: readable,
       ACL: getS3PutObjectCannedAcl(),
       // put type and the file name for reference information when uploading
       ContentType: contentHeaders.contentType?.value.toString(),

+ 3 - 3
apps/app/src/server/service/file-uploader/azure.ts

@@ -1,4 +1,4 @@
-import type { ReadStream } from 'fs';
+import type { Readable } from 'stream';
 
 import type { TokenCredential } from '@azure/identity';
 import { ClientSecretCredential } from '@azure/identity';
@@ -102,7 +102,7 @@ class AzureFileUploader extends AbstractFileUploader {
   /**
    * @inheritdoc
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     if (!this.getIsUploadable()) {
       throw new Error('Azure is not configured.');
     }
@@ -113,7 +113,7 @@ class AzureFileUploader extends AbstractFileUploader {
     const blockBlobClient: BlockBlobClient = containerClient.getBlockBlobClient(filePath);
     const contentHeaders = new ContentHeaders(attachment);
 
-    await blockBlobClient.uploadStream(readStream, undefined, undefined, {
+    await blockBlobClient.uploadStream(readable, undefined, undefined, {
       blobHTTPHeaders: {
         // put type and the file name for reference information when uploading
         blobContentType: contentHeaders.contentType?.value.toString(),

+ 3 - 3
apps/app/src/server/service/file-uploader/file-uploader.ts

@@ -1,4 +1,4 @@
-import type { ReadStream } from 'fs';
+import type { Readable } from 'stream';
 
 import type { Response } from 'express';
 import { v4 as uuidv4 } from 'uuid';
@@ -39,7 +39,7 @@ export interface FileUploader {
   getTotalFileSize(): Promise<number>,
   doCheckLimit(uploadFileSize: number, maxFileSize: number, totalLimit: number): Promise<ICheckLimitResult>,
   determineResponseMode(): ResponseMode,
-  uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>,
+  uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void>,
   respond(res: Response, attachment: IAttachmentDocument, opts?: RespondOptions): void,
   findDeliveryFile(attachment: IAttachmentDocument): Promise<NodeJS.ReadableStream>,
   generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise<TemporaryUrl>,
@@ -164,7 +164,7 @@ export abstract class AbstractFileUploader implements FileUploader {
     throw new Error('Multipart upload not available for file upload type');
   }
 
-  abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void>;
+  abstract uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void>;
 
   /**
    * Abort an existing multipart upload without creating a MultipartUploader instance

+ 7 - 5
apps/app/src/server/service/file-uploader/gcs/index.ts

@@ -1,4 +1,5 @@
-import type { ReadStream } from 'fs';
+import type { Readable } from 'stream';
+import { pipeline } from 'stream/promises';
 
 import { Storage } from '@google-cloud/storage';
 import axios from 'axios';
@@ -109,7 +110,7 @@ class GcsFileUploader extends AbstractFileUploader {
   /**
    * @inheritdoc
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     if (!this.getIsUploadable()) {
       throw new Error('GCS is not configured.');
     }
@@ -121,11 +122,12 @@ class GcsFileUploader extends AbstractFileUploader {
     const filePath = getFilePathOnStorage(attachment);
     const contentHeaders = new ContentHeaders(attachment);
 
-    await myBucket.upload(readStream.path.toString(), {
-      destination: filePath,
+    const file = myBucket.file(filePath);
+
+    await pipeline(readable, file.createWriteStream({
       // put type and the file name for reference information when uploading
       contentType: contentHeaders.contentType?.value.toString(),
-    });
+    }));
   }
 
   /**

+ 2 - 3
apps/app/src/server/service/file-uploader/gridfs.ts

@@ -1,4 +1,3 @@
-import type { ReadStream } from 'fs';
 import { Readable } from 'stream';
 import util from 'util';
 
@@ -62,7 +61,7 @@ class GridfsFileUploader extends AbstractFileUploader {
   /**
    * @inheritdoc
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     logger.debug(`File uploading: fileName=${attachment.fileName}`);
 
     const contentHeaders = new ContentHeaders(attachment);
@@ -73,7 +72,7 @@ class GridfsFileUploader extends AbstractFileUploader {
         filename: attachment.fileName,
         contentType: contentHeaders.contentType?.value.toString(),
       },
-      readStream,
+      readable,
     );
   }
 

+ 1 - 2
apps/app/src/server/service/file-uploader/local.ts

@@ -1,4 +1,3 @@
-import type { ReadStream } from 'fs';
 import type { Writable } from 'stream';
 import { Readable } from 'stream';
 import { pipeline } from 'stream/promises';
@@ -76,7 +75,7 @@ class LocalFileUploader extends AbstractFileUploader {
   /**
    * @inheritdoc
    */
-  override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise<void> {
+  override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise<void> {
     throw new Error('Method not implemented.');
   }