Просмотр исходного кода

Refactor attachment handling to remove buffer parameter from attach handlers

Shun Miyazawa 10 месяцев назад
Родитель
Сommit
ed652c4491

+ 1 - 1
apps/app/src/features/openai/server/services/openai.ts

@@ -737,7 +737,7 @@ class OpenaiService implements IOpenaiService {
   }
   }
 
 
   private async createVectorStoreFileOnUploadAttachment(
   private async createVectorStoreFileOnUploadAttachment(
-      pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File, buffer: Buffer,
+      pageId: string, attachment: HydratedDocument<IAttachmentDocument>, file: Express.Multer.File,
   ): Promise<void> {
   ): Promise<void> {
     if (!isVectorStoreCompatible(file.originalname, file.mimetype)) {
     if (!isVectorStoreCompatible(file.originalname, file.mimetype)) {
       return;
       return;

+ 8 - 49
apps/app/src/server/service/attachment.js

@@ -24,7 +24,7 @@ const createReadStream = (filePath) => {
  */
  */
 class AttachmentService {
 class AttachmentService {
 
 
-  /** @type {Array<(pageId: string, attachment: Attachment, file: Express.Multer.File, buffer: Buffer) => Promise<void>>} */
+  /** @type {Array<(pageId: string, attachment: Attachment, file: Express.Multer.File) => Promise<void>>} */
   attachHandlers = [];
   attachHandlers = [];
 
 
   /** @type {Array<(attachmentId: string) => Promise<void>>} */
   /** @type {Array<(attachmentId: string) => Promise<void>>} */
@@ -47,15 +47,6 @@ class AttachmentService {
       throw new Error(res.errorMessage);
       throw new Error(res.errorMessage);
     }
     }
 
 
-    // Always call only once
-    let isDisposedTmpFile = false;
-    const safeDisposeTmpFile = () => {
-      if (!isDisposedTmpFile && disposeTmpFileCallback) {
-        isDisposedTmpFile = true;
-        disposeTmpFileCallback?.(file);
-      }
-    };
-
     // create an Attachment document and upload file
     // create an Attachment document and upload file
     let attachment;
     let attachment;
     let readStreamForCreateAttachmentDocument;
     let readStreamForCreateAttachmentDocument;
@@ -65,54 +56,22 @@ class AttachmentService {
       await fileUploadService.uploadAttachment(readStreamForCreateAttachmentDocument, attachment);
       await fileUploadService.uploadAttachment(readStreamForCreateAttachmentDocument, attachment);
       await attachment.save();
       await attachment.save();
 
 
-      if (this.attachHandlers.length === 0) {
-        safeDisposeTmpFile();
-        return attachment;
-      }
-
-      const readStreamForAttachHandler = createReadStream(file.path);
-      const chunks = [];
-      const attachHandlers = this.attachHandlers;
-
-      const attachedHandlerStream = new Transform({
-        objectMode: true,
-        transform(chunk, encoding, callback) {
-          chunks.push(chunk);
-          callback(null, chunk);
-        },
-
-        async flush(callback) {
-          // At this point we have the complete file as a Buffer
-          // This approach assumes handler needs the complete file data
-          const completeData = Buffer.concat(chunks);
-
-          const promises = attachHandlers.map((handler) => {
-            return handler(pageId, attachment, file, completeData);
-          });
-
-          await Promise.all(promises)
-            .then(() => {
-              callback();
-            })
-            .catch((err) => {
-              logger.error('Error while executing attach handler', err);
-              callback(err);
-            });
-        },
+      const attachHandlerPromises = this.attachHandlers.map((handler) => {
+        return handler(pageId, attachment, file);
       });
       });
 
 
       // Do not await, run in background
       // Do not await, run in background
-      pipeline(readStreamForAttachHandler, attachedHandlerStream)
+      Promise.all(attachHandlerPromises)
         .catch((err) => {
         .catch((err) => {
-          logger.error('Error in stream processing', err);
+          logger.error('Error while executing attach handler', err);
         })
         })
         .finally(() => {
         .finally(() => {
-          safeDisposeTmpFile();
+          disposeTmpFileCallback?.(file);
         });
         });
     }
     }
     catch (err) {
     catch (err) {
       logger.error('Error while creating attachment', err);
       logger.error('Error while creating attachment', err);
-      safeDisposeTmpFile();
+      disposeTmpFileCallback?.(file);
       throw err;
       throw err;
     }
     }
     finally {
     finally {
@@ -170,7 +129,7 @@ class AttachmentService {
 
 
   /**
   /**
    * Register a handler that will be called after attachment creation
    * Register a handler that will be called after attachment creation
-   * @param {(pageId: string, attachment: Attachment, file: Express.Multer.File, buffer: Buffer) => Promise<void>} handler
+   * @param {(pageId: string, attachment: Attachment, file: Express.Multer.File) => Promise<void>} handler
    */
    */
   addAttachHandler(handler) {
   addAttachHandler(handler) {
     this.attachHandlers.push(handler);
     this.attachHandlers.push(handler);