Просмотр исходного кода

Merge pull request #9974 from weseek/fix/166394-memory-leak-prevention

fix: Memory leak prevention (2)
Yuki Takei 10 месяцев назад
Родитель
Сommit
a18db1999f

+ 21 - 10
apps/app/src/features/openai/server/services/openai.ts

@@ -1,3 +1,4 @@
+import fs from 'fs';
 import assert from 'node:assert';
 import { Readable, Transform, Writable } from 'stream';
 import { pipeline } from 'stream/promises';
@@ -322,11 +323,21 @@ class OpenaiService implements IOpenaiService {
     return uploadedFile;
   }
 
-  private async uploadFileForAttachment(file: Buffer | NodeJS.ReadableStream, fileName: string): Promise<OpenAI.Files.FileObject> {
+  private async uploadFileForAttachment(fileName: string, readStream?: NodeJS.ReadableStream, filePath?: string): Promise<OpenAI.Files.FileObject> {
+    const streamSource: NodeJS.ReadableStream = (() => {
+      if (readStream != null) {
+        return readStream;
+      }
+
+      if (filePath != null) {
+        return fs.createReadStream(filePath);
+      }
+
+      throw new Error('readStream and filePath are both null');
+    })();
+
     const uploadableFile = await toFile(
-      file instanceof Readable
-        ? file
-        : Readable.from([file]),
+      streamSource,
       fileName,
     );
 
@@ -367,8 +378,8 @@ class OpenaiService implements IOpenaiService {
             if (!isVectorStoreCompatible(attachment.originalName, attachment.fileFormat)) {
               continue;
             }
-            const fileStream = await this.crowi.fileUploadService.findDeliveryFile(attachment);
-            const uploadedFileForAttachment = await this.uploadFileForAttachment(fileStream, attachment.originalName);
+            const readStream = await this.crowi.fileUploadService.findDeliveryFile(attachment);
+            const uploadedFileForAttachment = await this.uploadFileForAttachment(attachment.originalName, readStream);
             prepareVectorStoreFileRelations(
               vectorStoreRelationId, pageId, uploadedFileForAttachment.id, vectorStoreFileRelationsMap, attachment._id,
             );
@@ -621,9 +632,9 @@ class OpenaiService implements IOpenaiService {
 
 
   private async filterPagesByAccessScope(aiAssistant: AiAssistantDocument, pages: HydratedDocument<PageDocument>[]) {
-    const isPublicPage = (page :HydratedDocument<PageDocument>) => page.grant === PageGrant.GRANT_PUBLIC;
+    const isPublicPage = (page: HydratedDocument<PageDocument>) => page.grant === PageGrant.GRANT_PUBLIC;
 
-    const isUserGroupAccessible = (page :HydratedDocument<PageDocument>, ownerUserGroupIds: string[]) => {
+    const isUserGroupAccessible = (page: HydratedDocument<PageDocument>, ownerUserGroupIds: string[]) => {
       if (page.grant !== PageGrant.GRANT_USER_GROUP) return false;
       return page.grantedGroups.some(group => ownerUserGroupIds.includes(getIdStringForRef(group.item)));
     };
@@ -726,7 +737,7 @@ class OpenaiService implements IOpenaiService {
   }
 
   private async createVectorStoreFileOnUploadAttachment(
-      pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, buffer: Buffer,
+      pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File,
   ): Promise<void> {
     if (!isVectorStoreCompatible(file.originalname, file.mimetype)) {
       return;
@@ -743,7 +754,7 @@ class OpenaiService implements IOpenaiService {
       return;
     }
 
-    const uploadedFile = await this.uploadFileForAttachment(buffer, file.originalname);
+    const uploadedFile = await this.uploadFileForAttachment(file.originalname, undefined, file.path);
     logger.debug('Uploaded file', uploadedFile);
 
     for await (const aiAssistant of aiAssistants) {

+ 8 - 52
apps/app/src/server/service/attachment.js

@@ -1,6 +1,3 @@
-import { Transform } from 'stream';
-import { pipeline } from 'stream/promises';
-
 import loggerFactory from '~/utils/logger';
 
 import { AttachmentType } from '../interfaces/attachment';
@@ -24,7 +21,7 @@ const createReadStream = (filePath) => {
  */
 class AttachmentService {
 
-  /** @type {Array<(pageId: string, attachment: Attachment, file: Express.Multer.File, buffer: Buffer) => Promise<void>>} */
+  /** @type {Array<(pageId: string, attachment: Attachment, file: Express.Multer.File) => Promise<void>>} */
   attachHandlers = [];
 
   /** @type {Array<(attachmentId: string) => Promise<void>>} */
@@ -47,15 +44,6 @@ class AttachmentService {
       throw new Error(res.errorMessage);
     }
 
-    // Always call only once
-    let isDisposedTmpFile = false;
-    const safeDisposeTmpFile = () => {
-      if (!isDisposedTmpFile && disposeTmpFileCallback) {
-        isDisposedTmpFile = true;
-        disposeTmpFileCallback?.(file);
-      }
-    };
-
     // create an Attachment document and upload file
     let attachment;
     let readStreamForCreateAttachmentDocument;
@@ -65,54 +53,22 @@ class AttachmentService {
       await fileUploadService.uploadAttachment(readStreamForCreateAttachmentDocument, attachment);
       await attachment.save();
 
-      if (this.attachHandlers.length === 0) {
-        safeDisposeTmpFile();
-        return attachment;
-      }
-
-      const readStreamForAttachHandler = createReadStream(file.path);
-      const chunks = [];
-      const attachHandlers = this.attachHandlers;
-
-      const attachedHandlerStream = new Transform({
-        objectMode: true,
-        transform(chunk, encoding, callback) {
-          chunks.push(chunk);
-          callback(null, chunk);
-        },
-
-        async flush(callback) {
-          // At this point we have the complete file as a Buffer
-          // This approach assumes handler needs the complete file data
-          const completeData = Buffer.concat(chunks);
-
-          const promises = attachHandlers.map((handler) => {
-            return handler(pageId, attachment, file, completeData);
-          });
-
-          await Promise.all(promises)
-            .then(() => {
-              callback();
-            })
-            .catch((err) => {
-              logger.error('Error while executing attach handler', err);
-              callback(err);
-            });
-        },
+      const attachHandlerPromises = this.attachHandlers.map((handler) => {
+        return handler(pageId, attachment, file);
       });
 
       // Do not await, run in background
-      pipeline(readStreamForAttachHandler, attachedHandlerStream)
+      Promise.all(attachHandlerPromises)
         .catch((err) => {
-          logger.error('Error in stream processing', err);
+          logger.error('Error while executing attach handler', err);
         })
         .finally(() => {
-          safeDisposeTmpFile();
+          disposeTmpFileCallback?.(file);
         });
     }
     catch (err) {
       logger.error('Error while creating attachment', err);
-      safeDisposeTmpFile();
+      disposeTmpFileCallback?.(file);
       throw err;
     }
     finally {
@@ -170,7 +126,7 @@ class AttachmentService {
 
   /**
    * Register a handler that will be called after attachment creation
-   * @param {(pageId: string, attachment: Attachment, file: Express.Multer.File, buffer: Buffer) => Promise<void>} handler
+   * @param {(pageId: string, attachment: Attachment, file: Express.Multer.File) => Promise<void>} handler
    */
   addAttachHandler(handler) {
     this.attachHandlers.push(handler);