Просмотр исходного кода

Merge pull request #9949 from weseek/fix/165372-memory-leak-prevention

fix: Memory leak prevention
Shun Miyazawa 10 месяцев назад
Родитель
Сommit
024e339560

+ 6 - 5
apps/app/src/features/openai/server/services/openai.ts

@@ -82,7 +82,8 @@ export interface IOpenaiService {
   createVectorStoreFileOnPageCreate(pages: PageDocument[]): Promise<void>;
   updateVectorStoreFileOnPageUpdate(page: HydratedDocument<PageDocument>): Promise<void>;
   createVectorStoreFileOnUploadAttachment(
-    pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, readable: Readable): Promise<void>;
+    pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, buffer: Buffer
+  ): Promise<void>;
   deleteVectorStoreFile(vectorStoreRelationId: Types.ObjectId, pageId: Types.ObjectId): Promise<void>;
   deleteVectorStoreFilesByPageIds(pageIds: Types.ObjectId[]): Promise<void>;
   deleteObsoleteVectorStoreFile(limit: number, apiCallInterval: number): Promise<void>; // for CronJob
@@ -323,8 +324,8 @@ class OpenaiService implements IOpenaiService {
     return uploadedFile;
   }
 
-  private async uploadFileForAttachment(readable: Readable, fileName: string): Promise<OpenAI.Files.FileObject> {
-    const uploadableFile = await toFile(Readable.from(readable), fileName);
+  private async uploadFileForAttachment(buffer: Buffer, fileName: string): Promise<OpenAI.Files.FileObject> {
+    const uploadableFile = await toFile(Readable.from([buffer]), fileName);
     const uploadedFile = await this.client.uploadFile(uploadableFile);
     return uploadedFile;
   }
@@ -629,7 +630,7 @@ class OpenaiService implements IOpenaiService {
   }
 
   async createVectorStoreFileOnUploadAttachment(
-      pageId: string, attachment:HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, readable: Readable,
+      pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, buffer: Buffer,
   ): Promise<void> {
     if (!isVectorStoreCompatible(file)) {
       return;
@@ -646,7 +647,7 @@ class OpenaiService implements IOpenaiService {
       return;
     }
 
-    const uploadedFile = await this.uploadFileForAttachment(readable, file.originalname);
+    const uploadedFile = await this.uploadFileForAttachment(buffer, file.originalname);
     logger.debug('Uploaded file', uploadedFile);
 
     for await (const aiAssistant of aiAssistants) {

+ 56 - 16
apps/app/src/server/service/attachment.js

@@ -1,3 +1,6 @@
+import { Transform } from 'stream';
+import { pipeline } from 'stream/promises';
+
 import loggerFactory from '~/utils/logger';
 
 import { AttachmentType } from '../interfaces/attachment';
@@ -21,7 +24,7 @@ const createReadStream = (filePath) => {
  */
 class AttachmentService {
 
-  /** @type {Array<(pageId: string, attachment: Attachment, file: Express.Multer.File, readable: Readable) => Promise<void>>} */
+  /** @type {Array<(pageId: string, attachment: Attachment, file: Express.Multer.File, buffer: Buffer) => Promise<void>>} */
   attachHandlers = [];
 
   /** @type {Array<(attachmentId: string) => Promise<void>>} */
@@ -44,40 +47,77 @@ class AttachmentService {
       throw new Error(res.errorMessage);
     }
 
+    // Always call only once
+    let isDisposedTmpFile = false;
+    const safeDisposeTmpFile = () => {
+      if (!isDisposedTmpFile && disposeTmpFileCallback) {
+        isDisposedTmpFile = true;
+        disposeTmpFileCallback?.(file);
+      }
+    };
+
     // create an Attachment document and upload file
     let attachment;
+    let readStreamForCreateAttachmentDocument;
     try {
+      readStreamForCreateAttachmentDocument = createReadStream(file.path);
       attachment = Attachment.createWithoutSave(pageId, user, file.originalname, file.mimetype, file.size, attachmentType);
-      await fileUploadService.uploadAttachment(createReadStream(file.path), attachment);
+      await fileUploadService.uploadAttachment(readStreamForCreateAttachmentDocument, attachment);
       await attachment.save();
 
-      //  Creates a new stream for each operation instead of reusing the original stream.
-      //  REASON: Node.js Readable streams cannot be reused after consumption.
-      //  When a stream is piped or consumed, its internal state changes and the data pointers
-      //  are advanced to the end, making it impossible to read the same data again.
-      let fileStreamForAttachedHandler;
-      if (this.attachHandlers.length !== 0) {
-        fileStreamForAttachedHandler = createReadStream(file.path);
+      if (this.attachHandlers.length === 0) {
+        safeDisposeTmpFile();
+        return attachment;
       }
 
-      const attachedHandlerPromises = this.attachHandlers.map((handler) => {
-        return handler(pageId, attachment, file, fileStreamForAttachedHandler);
+      const readStreamForAttachHandler = createReadStream(file.path);
+      const chunks = [];
+      const attachHandlers = this.attachHandlers;
+
+      const attachedHandlerStream = new Transform({
+        objectMode: true,
+        transform(chunk, encoding, callback) {
+          chunks.push(chunk);
+          callback(null, chunk);
+        },
+
+        async flush(callback) {
+          // At this point we have the complete file as a Buffer
+          // This approach assumes handler needs the complete file data
+          const completeData = Buffer.concat(chunks);
+
+          const promises = attachHandlers.map((handler) => {
+            return handler(pageId, attachment, file, completeData);
+          });
+
+          await Promise.all(promises)
+            .then(() => {
+              callback();
+            })
+            .catch((err) => {
+              logger.error('Error while executing attach handler', err);
+              callback(err);
+            });
+        },
       });
 
       // Do not await, run in background
-      Promise.all(attachedHandlerPromises)
+      pipeline(readStreamForAttachHandler, attachedHandlerStream)
         .catch((err) => {
-          logger.error('Error while executing attach handler', err);
+          logger.error('Error in stream processing', err);
         })
         .finally(() => {
-          disposeTmpFileCallback?.(file);
+          safeDisposeTmpFile();
         });
     }
     catch (err) {
       logger.error('Error while creating attachment', err);
-      disposeTmpFileCallback?.(file);
+      safeDisposeTmpFile();
       throw err;
     }
+    finally {
+      readStreamForCreateAttachmentDocument.destroy();
+    }
 
     return attachment;
   }
@@ -130,7 +170,7 @@ class AttachmentService {
 
   /**
    * Register a handler that will be called after attachment creation
-   * @param {(pageId: string, attachment: Attachment, file: Express.Multer.File, readable: Readable) => Promise<void>} handler
+   * @param {(pageId: string, attachment: Attachment, file: Express.Multer.File, buffer: Buffer) => Promise<void>} handler
    */
   addAttachHandler(handler) {
     this.attachHandlers.push(handler);