Răsfoiți Sursa

fix: use temp file for bulk export upload to resolve S3 streaming issues

Ryotaro Nagahara 4 săptămâni în urmă
părinte
comite
37f23964dd

+ 0 - 1
apps/app/package.json

@@ -62,7 +62,6 @@
   "dependencies": {
     "@akebifiky/remark-simple-plantuml": "^1.0.2",
     "@aws-sdk/client-s3": "3.454.0",
-    "@aws-sdk/lib-storage": "3.454.0",
     "@aws-sdk/s3-request-presigner": "3.454.0",
     "@azure/identity": "^4.4.1",
     "@azure/openai": "^2.0.0",

+ 46 - 19
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts

@@ -1,4 +1,6 @@
-import { PassThrough } from 'node:stream';
+import { createReadStream, createWriteStream } from 'node:fs';
+import fs from 'node:fs/promises';
+import path from 'node:path';
 import type { Archiver } from 'archiver';
 import archiver from 'archiver';
 
@@ -53,7 +55,15 @@ async function postProcess(
 }
 
 /**
- * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage
+ * Compress page files into a tar.gz archive and upload to cloud storage.
+ *
+ * Uses a temporary file instead of streaming directly to avoid two issues with AWS S3:
+ * 1. archiver's readable-stream (npm) fails AWS SDK's `instanceof Readable` check against Node.js built-in stream
+ * 2. PutObjectCommand sends `Transfer-Encoding: chunked` for streams without Content-Length, which S3 rejects with 501
+ *
+ * Writing to a temp file and using createReadStream resolves both:
+ * - createReadStream returns a native ReadStream (passes instanceof check)
+ * - AWS SDK auto-detects file size from ReadStream.path via lstatSync, setting Content-Length
  */
 export async function compressAndUpload(
   this: IPageBulkExportJobCronService,
@@ -75,42 +85,59 @@ export async function compressAndUpload(
   );
 
   const fileUploadService: FileUploader = this.crowi.fileUploadService;
+  // Place temp file in the parent directory to avoid archiver picking it up
+  // (archiver.directory() scans getTmpOutputDir asynchronously via glob)
+  const tmpFilePath = path.join(
+    this.getTmpOutputDir(pageBulkExportJob),
+    '..',
+    `${originalName}.tmp`,
+  );
 
   logger.info('starting');
 
-  // Wrap with Node.js native PassThrough so that AWS SDK recognizes the stream as a native Readable
-  const uploadStream = new PassThrough();
-
-  // Establish pipe before finalize to ensure data flows correctly
-  pageArchiver.pipe(uploadStream);
   pageArchiver.on('error', (err) => {
     logger.error('pageArchiver error', err);
-    uploadStream.destroy(err);
-    pageArchiver.destroy();
+    // Do not call pageArchiver.destroy() here: it corrupts internal state
+    // while the async queue is still processing, causing uncaught exceptions.
+    // The error is propagated via the Promise rejection below.
   });
 
   pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
   pageArchiver.finalize();
   logger.info('finalize called');
 
-  this.setStreamsInExecution(pageBulkExportJob._id, pageArchiver, uploadStream);
+  this.setStreamsInExecution(pageBulkExportJob._id, pageArchiver);
 
   try {
+    // Write compressed archive to temp file using .pipe() (not pipeline() which auto-destroys streams)
+    await new Promise<void>((resolve, reject) => {
+      const writeStream = createWriteStream(tmpFilePath);
+      pageArchiver.pipe(writeStream);
+      writeStream.on('close', resolve);
+      writeStream.on('error', reject);
+      pageArchiver.on('error', reject);
+    });
+    logger.info('archive written to temp file');
+
+    // Get file size for Content-Length
+    const stat = await fs.stat(tmpFilePath);
+    attachment.fileSize = stat.size;
+    logger.info(`temp file size: ${stat.size}`);
+
+    // Upload using createReadStream (native ReadStream with .path property)
     logger.info('starting upload');
-    await fileUploadService.uploadAttachment(uploadStream, attachment);
+    const readStream = createReadStream(tmpFilePath);
+    await fileUploadService.uploadAttachment(readStream, attachment);
     logger.info('upload completed, running postProcess');
-    await postProcess.bind(this)(
-      pageBulkExportJob,
-      attachment,
-      pageArchiver.pointer(),
-    );
+
+    await postProcess.bind(this)(pageBulkExportJob, attachment, stat.size);
     logger.info('postProcess completed');
   } catch (e) {
     logger.error('error caught', e);
     this.handleError(e, pageBulkExportJob);
   } finally {
-    logger.info('finally block, destroying streams');
-    pageArchiver.destroy();
-    uploadStream.destroy();
+    logger.info('finally block, cleaning up');
+    // Clean up temp file
+    await fs.unlink(tmpFilePath).catch(() => {});
   }
 }

+ 17 - 26
apps/app/src/server/service/file-uploader/aws/index.ts

@@ -13,7 +13,6 @@ import {
   PutObjectCommand,
   S3Client,
 } from '@aws-sdk/client-s3';
-import { Upload } from '@aws-sdk/lib-storage';
 import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
 import type { NonBlankString } from '@growi/core/dist/interfaces';
 import { toNonBlankStringOrUndefined } from '@growi/core/dist/interfaces';
@@ -252,32 +251,26 @@ class AwsFileUploader extends AbstractFileUploader {
 
     const filePath = getFilePathOnStorage(attachment);
     const contentHeaders = createContentHeaders(attachment);
-    const uploadTimeout = configManager.getConfig('app:fileUploadTimeout');
 
-    // Use @aws-sdk/lib-storage Upload for streaming support
-    // PutObjectCommand sends Transfer-Encoding: chunked for streams, which S3 rejects with 501 NotImplemented
-    const upload = new Upload({
-      client: s3,
-      params: {
-        Bucket: getS3Bucket(),
-        Key: filePath,
-        Body: readable,
-        ACL: getS3PutObjectCannedAcl(),
-        ContentType: getContentHeaderValue(contentHeaders, 'Content-Type'),
-        ContentDisposition: getContentHeaderValue(
-          contentHeaders,
-          'Content-Disposition',
-        ),
-      },
-    });
+    try {
+      const uploadTimeout = configManager.getConfig('app:fileUploadTimeout');
 
-    const timeoutId = setTimeout(() => {
-      logger.warn(`Upload timeout: fileName=${attachment.fileName}`);
-      upload.abort();
-    }, uploadTimeout);
+      await s3.send(
+        new PutObjectCommand({
+          Bucket: getS3Bucket(),
+          Key: filePath,
+          Body: readable,
+          ACL: getS3PutObjectCannedAcl(),
+          // put type and the file name for reference information when uploading
+          ContentType: getContentHeaderValue(contentHeaders, 'Content-Type'),
+          ContentDisposition: getContentHeaderValue(
+            contentHeaders,
+            'Content-Disposition',
+          ),
+        }),
+        { abortSignal: AbortSignal.timeout(uploadTimeout) },
+      );
 
-    try {
-      await upload.done();
       logger.debug(
         `File upload completed successfully: fileName=${attachment.fileName}`,
       );
@@ -294,8 +287,6 @@ class AwsFileUploader extends AbstractFileUploader {
       // Re-throw the error to be handled by the caller.
       // The pipeline automatically handles stream cleanup on error.
       throw error;
-    } finally {
-      clearTimeout(timeoutId);
     }
   }
 

+ 6 - 43
pnpm-lock.yaml

@@ -169,9 +169,6 @@ importers:
       '@aws-sdk/client-s3':
         specifier: 3.454.0
         version: 3.454.0
-      '@aws-sdk/lib-storage':
-        specifier: 3.454.0
-        version: 3.454.0(@aws-sdk/client-s3@3.454.0)
       '@aws-sdk/s3-request-presigner':
         specifier: 3.454.0
         version: 3.454.0
@@ -2048,12 +2045,6 @@ packages:
     resolution: {integrity: sha512-cC9uqmX0rgx1efiJGqeR+i0EXr8RQ5SAzH7M45WNBZpYiLEe6reWgIYJY9hmOxuaoMdWSi8kekuN3IjTIORRjw==}
     engines: {node: '>=16.0.0'}
 
-  '@aws-sdk/lib-storage@3.454.0':
-    resolution: {integrity: sha512-UygsmdtIwty9GJqBoCqTQeX/dwE2Oo/m3P5UzuUr2veC6AEuYQyMIvmSgLVEO/ek3hfK86kmRBff7VTGWUuN8Q==}
-    engines: {node: '>=14.0.0'}
-    peerDependencies:
-      '@aws-sdk/client-s3': ^3.0.0
-
   '@aws-sdk/middleware-bucket-endpoint@3.451.0':
     resolution: {integrity: sha512-KWyZ1JGnYz2QbHuJtYTP1BVnMOfVopR8rP8dTinVb/JR5HfAYz4imICJlJUbOYRjN7wpA3PrRI8dNRjrSBjWJg==}
     engines: {node: '>=14.0.0'}
@@ -6390,9 +6381,6 @@ packages:
   buffer@4.9.2:
     resolution: {integrity: sha512-xq+q3SRMOxGivLhBNaUdC64hDTQwejJ+H0T/NB1XMtTVEwNTrfFF3gAxiyW0Bu/xWEGhjVKgUcMhCrUy2+uCWg==}
 
-  buffer@5.6.0:
-    resolution: {integrity: sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==}
-
   buffer@5.7.1:
     resolution: {integrity: sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==}
 
@@ -8694,29 +8682,28 @@ packages:
 
   glob@10.4.5:
     resolution: {integrity: sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==}
-    deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
     hasBin: true
 
   glob@6.0.4:
     resolution: {integrity: sha512-MKZeRNyYZAVVVG1oZeLaWie1uweH40m9AZwIwxyPbTSX4hHrVYSzLg0Ro5Z5R7XKkIX+Cc6oD1rqeDJnwsB8/A==}
-    deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
+    deprecated: Glob versions prior to v9 are no longer supported
 
   glob@7.1.6:
     resolution: {integrity: sha512-LwaxwyZ72Lk7vZINtNNrywX0ZuLyStrdDtabefZKAY5ZGJhVtgdznluResxNmPitE0SAO+O26sWTHeKSI2wMBA==}
-    deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
+    deprecated: Glob versions prior to v9 are no longer supported
 
   glob@7.2.0:
     resolution: {integrity: sha512-lmLf6gtyrPq8tTjSmrO94wBeQbFR3HbLHbuyD69wuyQkImp2hWqMGB47OX65FBkPffO641IP9jWa1z4ivqG26Q==}
-    deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
+    deprecated: Glob versions prior to v9 are no longer supported
 
   glob@7.2.3:
     resolution: {integrity: sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==}
-    deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
+    deprecated: Glob versions prior to v9 are no longer supported
 
   glob@8.1.0:
     resolution: {integrity: sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==}
     engines: {node: '>=12'}
-    deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
+    deprecated: Glob versions prior to v9 are no longer supported
 
   global-directory@4.0.1:
     resolution: {integrity: sha512-wHTUcDUoZ1H5/0iVqEudYW4/kAlN5cZ3j/bXn0Dpbizl9iaUVeWSHqiOjsgk6OW2bkLclbBjzewBz6weQ1zA2Q==}
@@ -12893,9 +12880,6 @@ packages:
     resolution: {integrity: sha512-KXDYZ9dszj6bzvnEMRYvxgeTHU74QBFL54XKtP3nyMuJ81CFYtABZ3bAzL2EdFUaEwJOBOgENyFj3R7oTzDyyw==}
     engines: {node: '>=4', npm: '>=6'}
 
-  stream-browserify@3.0.0:
-    resolution: {integrity: sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==}
-
   stream-buffers@0.2.6:
     resolution: {integrity: sha512-ZRpmWyuCdg0TtNKk8bEqvm13oQvXMmzXDsfD4cBgcx5LouborvU5pm3JMkdTP3HcszyUI08AM1dHMXA5r2g6Sg==}
     engines: {node: '>= 0.3.0'}
@@ -13247,7 +13231,7 @@ packages:
   tar@6.2.1:
     resolution: {integrity: sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==}
     engines: {node: '>=10'}
-    deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
+    deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me
 
   teeny-request@7.2.0:
     resolution: {integrity: sha512-SyY0pek1zWsi0LRVAALem+avzMLc33MKW/JLLakdP4s9+D7+jHcy5x6P+h94g2QNZsAqQNfX5lsbd3WSeJXrrw==}
@@ -15305,17 +15289,6 @@ snapshots:
       - aws-crt
     optional: true
 
-  '@aws-sdk/lib-storage@3.454.0(@aws-sdk/client-s3@3.454.0)':
-    dependencies:
-      '@aws-sdk/client-s3': 3.454.0
-      '@smithy/abort-controller': 2.2.0
-      '@smithy/middleware-endpoint': 2.5.1
-      '@smithy/smithy-client': 2.5.1
-      buffer: 5.6.0
-      events: 3.3.0
-      stream-browserify: 3.0.0
-      tslib: 2.8.1
-
   '@aws-sdk/middleware-bucket-endpoint@3.451.0':
     dependencies:
       '@aws-sdk/types': 3.451.0
@@ -21273,11 +21246,6 @@ snapshots:
       ieee754: 1.2.1
       isarray: 1.0.0
 
-  buffer@5.6.0:
-    dependencies:
-      base64-js: 1.5.1
-      ieee754: 1.2.1
-
   buffer@5.7.1:
     dependencies:
       base64-js: 1.5.1
@@ -28680,11 +28648,6 @@ snapshots:
 
   stoppable@1.1.0: {}
 
-  stream-browserify@3.0.0:
-    dependencies:
-      inherits: 2.0.4
-      readable-stream: 3.6.0
-
   stream-buffers@0.2.6: {}
 
   stream-events@1.0.5: