Просмотр исходного кода

Merge pull request #9908 from weseek/feat/165326-upload-file-to-vectorstore-when-attachment-uploaded

feat: Upload file to VectorStore when attachment uploaded
Yuki Takei 10 месяцев назад
Родитель
Сommit
528b0cec27

+ 4 - 0
apps/app/src/features/openai/server/services/client-delegator/azure-openai-client-delegator.ts

@@ -78,6 +78,10 @@ export class AzureOpenaiClientDelegator implements IOpenaiClientDelegator {
     return this.client.files.create({ file, purpose: 'assistants' });
   }
 
+  async createVectorStoreFile(vectorStoreId: string, fileId: string): Promise<OpenAI.VectorStores.Files.VectorStoreFile> {
+    return this.client.vectorStores.files.create(vectorStoreId, { file_id: fileId });
+  }
+
   async createVectorStoreFileBatch(vectorStoreId: string, fileIds: string[]): Promise<OpenAI.VectorStores.FileBatches.VectorStoreFileBatch> {
     return this.client.vectorStores.fileBatches.create(vectorStoreId, { file_ids: fileIds });
   }

+ 1 - 0
apps/app/src/features/openai/server/services/client-delegator/interfaces.ts

@@ -13,6 +13,7 @@ export interface IOpenaiClientDelegator {
   createVectorStore(name: string): Promise<OpenAI.VectorStores.VectorStore>
   deleteVectorStore(vectorStoreId: string): Promise<OpenAI.VectorStores.VectorStoreDeleted>
   uploadFile(file: Uploadable): Promise<OpenAI.Files.FileObject>
+  createVectorStoreFile(vectorStoreId: string, fileId: string): Promise<OpenAI.VectorStores.Files.VectorStoreFile>
   createVectorStoreFileBatch(vectorStoreId: string, fileIds: string[]): Promise<OpenAI.VectorStores.FileBatches.VectorStoreFileBatch>
   deleteFile(fileId: string): Promise<OpenAI.Files.FileDeleted>;
   chatCompletion(body: OpenAI.Chat.Completions.ChatCompletionCreateParamsNonStreaming): Promise<OpenAI.Chat.Completions.ChatCompletion>

+ 4 - 0
apps/app/src/features/openai/server/services/client-delegator/openai-client-delegator.ts

@@ -79,6 +79,10 @@ export class OpenaiClientDelegator implements IOpenaiClientDelegator {
     return this.client.files.create({ file, purpose: 'assistants' });
   }
 
+  async createVectorStoreFile(vectorStoreId: string, fileId: string): Promise<OpenAI.VectorStores.Files.VectorStoreFile> {
+    return this.client.vectorStores.files.create(vectorStoreId, { file_id: fileId });
+  }
+
   async createVectorStoreFileBatch(vectorStoreId: string, fileIds: string[]): Promise<OpenAI.VectorStores.FileBatches.VectorStoreFileBatch> {
     return this.client.vectorStores.fileBatches.create(vectorStoreId, { file_ids: fileIds });
   }

+ 60 - 6
apps/app/src/features/openai/server/services/openai.ts

@@ -22,6 +22,7 @@ import VectorStoreFileRelationModel, {
   type VectorStoreFileRelation,
   prepareVectorStoreFileRelations,
 } from '~/features/openai/server/models/vector-store-file-relation';
+import type Crowi from '~/server/crowi';
 import type { PageDocument, PageModel } from '~/server/models/page';
 import UserGroupRelation from '~/server/models/user-group-relation';
 import { configManager } from '~/server/service/config-manager';
@@ -40,6 +41,7 @@ import { removeGlobPath } from '../../utils/remove-glob-path';
 import AiAssistantModel, { type AiAssistantDocument } from '../models/ai-assistant';
 import { convertMarkdownToHtml } from '../utils/convert-markdown-to-html';
 import { generateGlobPatterns } from '../utils/generate-glob-patterns';
+import { isVectorStoreCompatible } from '../utils/is-vector-store-compatible';
 
 import { getClient } from './client-delegator';
 import { openaiApiErrorHandler } from './openai-api-error-handler';
@@ -78,6 +80,7 @@ export interface IOpenaiService {
   createVectorStoreFile(vectorStoreRelation: VectorStoreDocument, pages: PageDocument[]): Promise<void>;
   createVectorStoreFileOnPageCreate(pages: PageDocument[]): Promise<void>;
   updateVectorStoreFileOnPageUpdate(page: HydratedDocument<PageDocument>): Promise<void>;
+  createVectorStoreFileOnUploadAttachment(pageId: string, file: Express.Multer.File, readable: Readable): Promise<void>;
   deleteVectorStoreFile(vectorStoreRelationId: Types.ObjectId, pageId: Types.ObjectId): Promise<void>;
   deleteVectorStoreFilesByPageIds(pageIds: Types.ObjectId[]): Promise<void>;
   deleteObsoleteVectorStoreFile(limit: number, apiCallInterval: number): Promise<void>; // for CronJob
@@ -89,6 +92,11 @@ export interface IOpenaiService {
 }
 class OpenaiService implements IOpenaiService {
 
+  constructor(crowi: Crowi) {
+    this.createVectorStoreFileOnUploadAttachment = this.createVectorStoreFileOnUploadAttachment.bind(this);
+    crowi.attachmentService.addAttachHandler(this.createVectorStoreFileOnUploadAttachment);
+  }
+
   private get client() {
     const openaiServiceType = configManager.getConfig('openai:serviceType');
     return getClient({ openaiServiceType });
@@ -309,6 +317,12 @@ class OpenaiService implements IOpenaiService {
     return uploadedFile;
   }
 
+  private async uploadFileForAttachment(readable: Readable, fileName: string): Promise<OpenAI.Files.FileObject> {
+    const uploadableFile = await toFile(Readable.from(readable), fileName);
+    const uploadedFile = await this.client.uploadFile(uploadableFile);
+    return uploadedFile;
+  }
+
   async deleteVectorStore(vectorStoreRelationId: string): Promise<void> {
     const vectorStoreDocument: VectorStoreDocument | null = await VectorStoreModel.findOne({ _id: vectorStoreRelationId, isDeleted: false });
     if (vectorStoreDocument == null) {
@@ -581,6 +595,45 @@ class OpenaiService implements IOpenaiService {
     }
   }
 
+  async createVectorStoreFileOnUploadAttachment(pageId: string, file: Express.Multer.File, readable: Readable): Promise<void> {
+    if (!isVectorStoreCompatible(file)) {
+      return;
+    }
+
+    const Page = mongoose.model<HydratedDocument<PageDocument>, PageModel>('Page');
+    const page = await Page.findById(pageId);
+    if (page == null) {
+      return;
+    }
+
+    const aiAssistants = await this.findAiAssistantByPagePath([page.path], { shouldPopulateVectorStore: true });
+    if (aiAssistants.length === 0) {
+      return;
+    }
+
+    const uploadedFile = await this.uploadFileForAttachment(readable, file.originalname);
+    logger.debug('Uploaded file', uploadedFile);
+
+    for await (const aiAssistant of aiAssistants) {
+      const pagesToVectorize = await this.filterPagesByAccessScope(aiAssistant, [page]);
+      if (pagesToVectorize.length === 0) {
+        continue;
+      }
+
+      const vectorStoreRelation = aiAssistant.vectorStore;
+      if (vectorStoreRelation == null || !isPopulated(vectorStoreRelation)) {
+        continue;
+      }
+
+      const vectorStoreFileRelationsMap: VectorStoreFileRelationsMap = new Map();
+      prepareVectorStoreFileRelations(vectorStoreRelation._id as Types.ObjectId, page._id, uploadedFile.id, vectorStoreFileRelationsMap);
+      const vectorStoreFileRelations = Array.from(vectorStoreFileRelationsMap.values());
+      await VectorStoreFileRelationModel.upsertVectorStoreFileRelations(vectorStoreFileRelations);
+
+      await this.client.createVectorStoreFile(vectorStoreRelation.vectorStoreId, uploadedFile.id);
+    }
+  }
+
   private async createVectorStoreFileWithStream(vectorStoreRelation: VectorStoreDocument, conditions: mongoose.FilterQuery<PageDocument>): Promise<void> {
     const Page = mongoose.model<HydratedDocument<PageDocument>, PageModel>('Page');
 
@@ -935,15 +988,16 @@ class OpenaiService implements IOpenaiService {
 }
 
 let instance: OpenaiService;
-export const getOpenaiService = (): IOpenaiService | undefined => {
-  if (instance != null) {
-    return instance;
-  }
-
+export const initializeOpenaiService = (crowi: Crowi): void => {
   const aiEnabled = configManager.getConfig('app:aiEnabled');
   const openaiServiceType = configManager.getConfig('openai:serviceType');
   if (aiEnabled && openaiServiceType != null && OpenaiServiceTypes.includes(openaiServiceType)) {
-    instance = new OpenaiService();
+    instance = new OpenaiService(crowi);
+  }
+};
+
+export const getOpenaiService = (): IOpenaiService | undefined => {
+  if (instance != null) {
     return instance;
   }
 

+ 10 - 1
apps/app/src/server/crowi/index.js

@@ -11,6 +11,7 @@ import next from 'next';
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
 import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
 import { startCronIfEnabled as startOpenaiCronIfEnabled } from '~/features/openai/server/services/cron';
+import { initializeOpenaiService } from '~/features/openai/server/services/openai';
 import { checkPageBulkExportJobInProgressCronService } from '~/features/page-bulk-export/server/service/check-page-bulk-export-job-in-progress-cron';
 import instanciatePageBulkExportJobCleanUpCronService, {
   pageBulkExportJobCleanUpCronService,
@@ -177,7 +178,6 @@ Crowi.prototype.init = async function() {
   this.models = await setupModelsDependentOnCrowi(this);
   await this.setupConfigManager();
   await this.setupSessionConfig();
-  this.setupCron();
 
   // setup messaging services
   await this.setupS2sMessagingService();
@@ -223,8 +223,13 @@ Crowi.prototype.init = async function() {
     // depends on passport service
     this.setupExternalAccountService(),
     this.setupExternalUserGroupSyncService(),
+
+    // depends on AttachmentService
+    this.setupOpenaiService(),
   ]);
 
+  this.setupCron();
+
   await normalizeData();
 };
 
@@ -809,4 +814,8 @@ Crowi.prototype.setupExternalUserGroupSyncService = function() {
   this.keycloakUserGroupSyncService = new KeycloakUserGroupSyncService(this.s2sMessagingService, this.socketIoService);
 };
 
+Crowi.prototype.setupOpenaiService = function() {
+  initializeOpenaiService(this);
+};
+
 export default Crowi;

+ 3 - 8
apps/app/src/server/routes/apiv3/attachment.js

@@ -4,7 +4,6 @@ import express from 'express';
 import multer from 'multer';
 import autoReap from 'multer-autoreap';
 
-import { isVectorStoreCompatible } from '~/features/openai/server/utils/is-vector-store-compatible';
 import { SupportedAction } from '~/interfaces/activity';
 import { AttachmentType } from '~/server/interfaces/attachment';
 import { accessTokenParser } from '~/server/middlewares/access-token-parser';
@@ -340,8 +339,9 @@ module.exports = (crowi) => {
    *          500:
    *            $ref: '#/components/responses/500'
    */
-  router.post('/', accessTokenParser, loginRequiredStrictly, excludeReadOnlyUser, uploads.single('file'), autoReap,
+  router.post('/', accessTokenParser, loginRequiredStrictly, excludeReadOnlyUser, uploads.single('file'),
     validator.retrieveAddAttachment, apiV3FormValidator, addActivity,
+    // Removed autoReap middleware to use file data in asynchronous processes. Instead, implemented file deletion after asynchronous processes complete
     async(req, res) => {
 
       const pageId = req.body.page_id;
@@ -361,7 +361,7 @@ module.exports = (crowi) => {
           return res.apiv3Err(`Forbidden to access to the page '${page.id}'`);
         }
 
-        const attachment = await attachmentService.createAttachment(file, req.user, pageId, AttachmentType.WIKI_PAGE);
+        const attachment = await attachmentService.createAttachment(file, req.user, pageId, AttachmentType.WIKI_PAGE, () => autoReap(req, res, () => {}));
 
         const result = {
           page: serializePageSecurely(page),
@@ -369,11 +369,6 @@ module.exports = (crowi) => {
           attachment: attachment.toObject({ virtuals: true }),
         };
 
-        if (isVectorStoreCompatible(file)) {
-          // TODO: https://redmine.weseek.co.jp/issues/165326
-          // Process for uploading to VectorStore
-        }
-
         activityEvent.emit('update', res.locals.activity._id, { action: SupportedAction.ACTION_ATTACHMENT_ADD });
 
         res.apiv3(result);

+ 53 - 8
apps/app/src/server/service/attachment.js

@@ -10,11 +10,22 @@ const mongoose = require('mongoose');
 // eslint-disable-next-line @typescript-eslint/no-unused-vars
 const logger = loggerFactory('growi:service:AttachmentService');
 
+const createReadStream = (filePath) => {
+  return fs.createReadStream(filePath, {
+    flags: 'r', encoding: null, fd: null, mode: '0666', autoClose: true,
+  });
+};
+
 /**
  * the service class for Attachment and file-uploader
  */
 class AttachmentService {
 
+  /** @type {Array<(pageId: string, file: Express.Multer.File, readable: Readable) => Promise<void>>} */
+  attachHandlers = [];
+
+  detachHandlers = [];
+
   /** @type {import('~/server/crowi').default} Crowi instance */
   crowi;
 
@@ -23,7 +34,7 @@ class AttachmentService {
     this.crowi = crowi;
   }
 
-  async createAttachment(file, user, pageId = null, attachmentType) {
+  async createAttachment(file, user, pageId = null, attachmentType, disposeTmpFileCallback) {
     const { fileUploadService } = this.crowi;
 
     // check limit
@@ -32,20 +43,38 @@ class AttachmentService {
       throw new Error(res.errorMessage);
     }
 
-    const fileStream = fs.createReadStream(file.path, {
-      flags: 'r', encoding: null, fd: null, mode: '0666', autoClose: true,
-    });
-
     // create an Attachment document and upload file
     let attachment;
     try {
       attachment = Attachment.createWithoutSave(pageId, user, file.originalname, file.mimetype, file.size, attachmentType);
-      await fileUploadService.uploadAttachment(fileStream, attachment);
+      await fileUploadService.uploadAttachment(createReadStream(file.path), attachment);
       await attachment.save();
+
+      //  Creates a new stream for each operation instead of reusing the original stream.
+      //  REASON: Node.js Readable streams cannot be reused after consumption.
+      //  When a stream is piped or consumed, its internal state changes and the data pointers
+      //  are advanced to the end, making it impossible to read the same data again.
+      let fileStreamForAttachedHandler;
+      if (this.attachHandlers.length !== 0) {
+        fileStreamForAttachedHandler = createReadStream(file.path);
+      }
+
+      const attachedHandlerPromises = this.attachHandlers.map((handler) => {
+        return handler(pageId, file, fileStreamForAttachedHandler);
+      });
+
+      // Do not await, run in background
+      Promise.all(attachedHandlerPromises)
+        .catch((err) => {
+          logger.error('Error while executing attach handler', err);
+        })
+        .finally(() => {
+          disposeTmpFileCallback?.(file);
+        });
     }
     catch (err) {
-      // delete temporary file
-      fs.unlink(file.path, (err) => { if (err) { logger.error('Error while deleting tmp file.') } });
+      logger.error('Error while creating attachment', err);
+      disposeTmpFileCallback?.(file);
       throw err;
     }
 
@@ -88,6 +117,22 @@ class AttachmentService {
     return count >= 1;
   }
 
+  /**
+   * Register a handler that will be called after attachment creation
+   * @param {(pageId: string, file: Express.Multer.File, readable: Readable) => Promise<void>} handler
+   */
+  addAttachHandler(handler) {
+    this.attachHandlers.push(handler);
+  }
+
+  /**
+   * Register a handler that will be called before attachment deletion
+   * @param {(attachment: Attachment) => Promise<void>} handler
+   */
+  addDetachHandler(handler) {
+    this.detachHandlers.push(handler);
+  }
+
 }
 
 module.exports = AttachmentService;