Procházet zdrojové kódy

Improved stream handling

Shun Miyazawa před 10 měsíci
rodič
revize
008aaa764a
1 změnil soubory, kde provedl 31 přidání a 41 odebrání
  1. 31 41
      apps/app/src/server/service/attachment.js

+ 31 - 41
apps/app/src/server/service/attachment.js

@@ -56,51 +56,41 @@ class AttachmentService {
       await fileUploadService.uploadAttachment(readStreamForCreateAttachmentDocument, attachment);
       await attachment.save();
 
-      this.attachHandlers.forEach(async(handler) => {
-        //  Creates a new stream for each operation instead of reusing the original stream.
-        //  REASON: Node.js Readable streams cannot be reused after consumption.
-        //  When a stream is piped or consumed, its internal state changes and the data pointers
-        //  are advanced to the end, making it impossible to read the same data again.
-        const readStreamForHandler = createReadStream(file.path);
-        const chunks = [];
-
-        const attachedHandlerStream = new Transform({
-          objectMode: true,
-          transform(chunk, encoding, callback) {
-            chunks.push(chunk);
-            callback(null, chunk);
-          },
-
-          async flush(callback) {
-            // At this point we have the complete file as a Buffer
-            // This approach assumes handler needs the complete file data
-            const completeData = Buffer.concat(chunks);
-            handler(pageId, attachment, file, completeData)
-              .then(() => {
-                callback();
-              })
-              .catch((err) => {
-                logger.error('Error in attach handler:', err);
-                callback(err);
-              });
-          },
-        });
-
-        try {
-          await pipeline(
-            readStreamForHandler,
-            attachedHandlerStream,
-          );
-        }
-        catch (err) {
-          logger.error('Error in stream processing:', err);
-        }
+      const readStreamForAttacheHandler = createReadStream(file.path);
+      const chunks = [];
+      const attachHandlers = this.attachHandlers;
+
+      const attachedHandlerStream = new Transform({
+        objectMode: true,
+        transform(chunk, encoding, callback) {
+          chunks.push(chunk);
+          callback(null, chunk);
+        },
+
+        async flush(callback) {
+          // At this point we have the complete file as a Buffer
+          // This approach assumes handler needs the complete file data
+          const completeData = Buffer.concat(chunks);
+
+          const promises = attachHandlers.map((handler) => {
+            return handler(pageId, attachment, file, completeData);
+          });
+
+          await Promise.all(promises)
+            .then(() => {
+              callback();
+            })
+            .catch((err) => {
+              logger.error('Error while executing attach handler', err);
+              callback(err);
+            });
+        },
       });
 
       // Do not await, run in background
-      Promise.all(attachedHandlerPromises)
+      pipeline(readStreamForAttacheHandler, attachedHandlerStream)
         .catch((err) => {
-          logger.error('Error while executing attach handler', err);
+          logger.error('Error in stream processing', err);
         })
         .finally(() => {
           disposeTmpFileCallback?.(file);