Просмотр исходного кода

refactor: update attachment handling to use Buffer instead of Readable stream

Shun Miyazawa 10 месяцев назад
Родитель
Сommit
e138b08749

+ 6 - 5
apps/app/src/features/openai/server/services/openai.ts

@@ -82,7 +82,8 @@ export interface IOpenaiService {
   createVectorStoreFileOnPageCreate(pages: PageDocument[]): Promise<void>;
   createVectorStoreFileOnPageCreate(pages: PageDocument[]): Promise<void>;
   updateVectorStoreFileOnPageUpdate(page: HydratedDocument<PageDocument>): Promise<void>;
   updateVectorStoreFileOnPageUpdate(page: HydratedDocument<PageDocument>): Promise<void>;
   createVectorStoreFileOnUploadAttachment(
   createVectorStoreFileOnUploadAttachment(
-    pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, readable: Readable): Promise<void>;
+    pageId: string, attachment:HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, buffer: Buffer
+  ): Promise<void>;
   deleteVectorStoreFile(vectorStoreRelationId: Types.ObjectId, pageId: Types.ObjectId): Promise<void>;
   deleteVectorStoreFile(vectorStoreRelationId: Types.ObjectId, pageId: Types.ObjectId): Promise<void>;
   deleteVectorStoreFilesByPageIds(pageIds: Types.ObjectId[]): Promise<void>;
   deleteVectorStoreFilesByPageIds(pageIds: Types.ObjectId[]): Promise<void>;
   deleteObsoleteVectorStoreFile(limit: number, apiCallInterval: number): Promise<void>; // for CronJob
   deleteObsoleteVectorStoreFile(limit: number, apiCallInterval: number): Promise<void>; // for CronJob
@@ -323,8 +324,8 @@ class OpenaiService implements IOpenaiService {
     return uploadedFile;
     return uploadedFile;
   }
   }
 
 
-  private async uploadFileForAttachment(readable: Readable, fileName: string): Promise<OpenAI.Files.FileObject> {
-    const uploadableFile = await toFile(Readable.from(readable), fileName);
+  private async uploadFileForAttachment(buffer: Buffer, fileName: string): Promise<OpenAI.Files.FileObject> {
+    const uploadableFile = await toFile(Readable.from(buffer), fileName);
     const uploadedFile = await this.client.uploadFile(uploadableFile);
     const uploadedFile = await this.client.uploadFile(uploadableFile);
     return uploadedFile;
     return uploadedFile;
   }
   }
@@ -629,7 +630,7 @@ class OpenaiService implements IOpenaiService {
   }
   }
 
 
   async createVectorStoreFileOnUploadAttachment(
   async createVectorStoreFileOnUploadAttachment(
-      pageId: string, attachment:HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, readable: Readable,
+      pageId: string, attachment:HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, buffer: Buffer,
   ): Promise<void> {
   ): Promise<void> {
     if (!isVectorStoreCompatible(file)) {
     if (!isVectorStoreCompatible(file)) {
       return;
       return;
@@ -646,7 +647,7 @@ class OpenaiService implements IOpenaiService {
       return;
       return;
     }
     }
 
 
-    const uploadedFile = await this.uploadFileForAttachment(readable, file.originalname);
+    const uploadedFile = await this.uploadFileForAttachment(buffer, file.originalname);
     logger.debug('Uploaded file', uploadedFile);
     logger.debug('Uploaded file', uploadedFile);
 
 
     for await (const aiAssistant of aiAssistants) {
     for await (const aiAssistant of aiAssistants) {

+ 24 - 5
apps/app/src/server/service/attachment.js

@@ -1,3 +1,4 @@
+import { Transform } from 'stream';
 import { pipeline } from 'stream/promises';
 import { pipeline } from 'stream/promises';
 
 
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
@@ -23,7 +24,7 @@ const createReadStream = (filePath) => {
  */
  */
 class AttachmentService {
 class AttachmentService {
 
 
-  /** @type {Array<(pageId: string, file: Express.Multer.File, readable: Readable) => Promise<void>>} */
+  /** @type {Array<(pageId: string, attachment: Attachment file: Express.Multer.File, buffer: Buffer) => Promise<void>>} */
   attachHandlers = [];
   attachHandlers = [];
 
 
   /** @type {Array<(attachmentId: string) => Promise<void>>} */
   /** @type {Array<(attachmentId: string) => Promise<void>>} */
@@ -61,13 +62,31 @@ class AttachmentService {
         //  When a stream is piped or consumed, its internal state changes and the data pointers
         //  When a stream is piped or consumed, its internal state changes and the data pointers
         //  are advanced to the end, making it impossible to read the same data again.
         //  are advanced to the end, making it impossible to read the same data again.
         const readStreamForHandler = createReadStream(file.path);
         const readStreamForHandler = createReadStream(file.path);
+        const chunks = [];
+
+        const attachedHandlerStream = new Transform({
+          objectMode: true,
+          transform(chunk, encoding, callback) {
+            chunks.push(chunk);
+            callback(null, chunk);
+          },
+          async flush(callback) {
+            try {
+              const completeData = Buffer.concat(chunks);
+              await handler(pageId, attachment, file, completeData);
+              callback();
+            }
+            catch (err) {
+              logger.error('Error in attach handler:', err);
+              callback(err);
+            }
+          },
+        });
 
 
         try {
         try {
           await pipeline(
           await pipeline(
             readStreamForHandler,
             readStreamForHandler,
-            async(source) => {
-              await handler(pageId, attachment, file, source);
-            },
+            attachedHandlerStream,
           );
           );
         }
         }
         catch (err) {
         catch (err) {
@@ -147,7 +166,7 @@ class AttachmentService {
 
 
   /**
   /**
    * Register a handler that will be called after attachment creation
    * Register a handler that will be called after attachment creation
-   * @param {(pageId: string, file: Express.Multer.File, readable: Readable) => Promise<void>} handler
+   * @param {(pageId: string, attachment: Attachment file: Express.Multer.File, buffer: Buffer) => Promise<void>} handler
    */
    */
   addAttachHandler(handler) {
   addAttachHandler(handler) {
     this.attachHandlers.push(handler);
     this.attachHandlers.push(handler);