Просмотр исходного кода

improve stream handling with pipeline API

Shun Miyazawa 10 месяцев назад
Родитель
Сommit
6dc3e51d5c
1 измененных файлов с 23 добавлено и 11 удалено
  1. 23 11
      apps/app/src/server/service/attachment.js

+ 23 - 11
apps/app/src/server/service/attachment.js

@@ -1,3 +1,5 @@
+import { pipeline } from 'stream/promises';
+
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
 
 
 import { AttachmentType } from '../interfaces/attachment';
 import { AttachmentType } from '../interfaces/attachment';
@@ -53,15 +55,27 @@ class AttachmentService {
       await fileUploadService.uploadAttachment(readStreamForCreateAttachmentDocument, attachment);
       await fileUploadService.uploadAttachment(readStreamForCreateAttachmentDocument, attachment);
       await attachment.save();
       await attachment.save();
 
 
-      //  Creates a new stream for each operation instead of reusing the original stream.
-      //  REASON: Node.js Readable streams cannot be reused after consumption.
-      //  When a stream is piped or consumed, its internal state changes and the data pointers
-      //  are advanced to the end, making it impossible to read the same data again.
-      const readStreamForAttachedHandlers = [];
-      const attachedHandlerPromises = this.attachHandlers.map((handler) => {
-        const readStreamForAttachedHandler = createReadStream(file.path);
-        readStreamForAttachedHandlers.push(readStreamForAttachedHandler);
-        return handler(pageId, file, readStreamForAttachedHandler);
+      const attachedHandlerPromises = this.attachHandlers.map(async(handler) => {
+        //  Creates a new stream for each operation instead of reusing the original stream.
+        //  REASON: Node.js Readable streams cannot be reused after consumption.
+        //  When a stream is piped or consumed, its internal state changes and the data pointers
+        //  are advanced to the end, making it impossible to read the same data again.
+        const readStreamForHandler = createReadStream(file.path);
+
+        try {
+          await pipeline(
+            readStreamForHandler,
+            async(source) => {
+              await handler(pageId, attachment, file, source);
+            },
+          );
+        }
+        catch (err) {
+          logger.error('Error in stream processing:', err);
+        }
+        finally {
+          readStreamForHandler.destroy();
+        }
       });
       });
 
 
       // Do not await, run in background
       // Do not await, run in background
@@ -70,8 +84,6 @@ class AttachmentService {
           logger.error('Error while executing attach handler', err);
           logger.error('Error while executing attach handler', err);
         })
         })
         .finally(() => {
         .finally(() => {
-          // readStreamForAttachedHandler?.destroy();
-          readStreamForAttachedHandlers.forEach(readStream => readStream.destroy());
           disposeTmpFileCallback?.(file);
           disposeTmpFileCallback?.(file);
         });
         });
     }
     }