Преглед изворни кода

refactor AiAssistantChatSidebar so that verified that it is working properly

Yuki Takei пре 1 година
родитељ
комит
2ba5defabc
1 измењених фајлова са 204 додато и 41 уклоњено
  1. 204 41
      apps/app/src/features/openai/server/routes/edit.ts

+ 204 - 41
apps/app/src/features/openai/server/routes/edit.ts

@@ -6,6 +6,7 @@ import type { Request, RequestHandler, Response } from 'express';
 import type { ValidationChain } from 'express-validator';
 import type { ValidationChain } from 'express-validator';
 import { body } from 'express-validator';
 import { body } from 'express-validator';
 import { zodResponseFormat } from 'openai/helpers/zod';
 import { zodResponseFormat } from 'openai/helpers/zod';
+import type { AssistantStream } from 'openai/lib/AssistantStream';
 import type { MessageDelta } from 'openai/resources/beta/threads/messages.mjs';
 import type { MessageDelta } from 'openai/resources/beta/threads/messages.mjs';
 import { parser } from 'stream-json';
 import { parser } from 'stream-json';
 import { streamValues } from 'stream-json/streamers/StreamValues';
 import { streamValues } from 'stream-json/streamers/StreamValues';
@@ -120,6 +121,163 @@ const writeSSEError = (res: Response, message: string, code?: StreamErrorCode) =
   res.write(`error: ${JSON.stringify({ code, message })}\n\n`);
   res.write(`error: ${JSON.stringify({ code, message })}\n\n`);
 };
 };
 
 
+/**
+ * 単一の JSON パーサーインスタンスを作成して継続的に使用する
+ * これにより、メモリリークやイベントリスナーの過剰な登録を防止する
+ */
+class JsonStreamProcessor {
+
+  // パーサーコンポーネント
+  private jsonParser = parser({ jsonStreaming: true });
+
+  private jsonValueStream = streamValues();
+
+  // 内部状態
+  private messages: string[] = [];
+
+  private replacements: EditorAssistantDiff[] = [];
+
+  private dataCallback: ((messages: string[], replacements: EditorAssistantDiff[]) => void) | null = null;
+
+  private isSetup = false;
+
+  constructor() {
+    // リスナー上限を増やしてワーニングを防止
+    this.jsonParser.setMaxListeners(20);
+    this.jsonValueStream.setMaxListeners(20);
+
+    // エラーハンドリング設定
+    this.jsonParser.on('error', (err) => {
+      logger.debug('JSON parser error (expected for partial data):', err.message);
+    });
+
+    this.jsonValueStream.on('error', (err) => {
+      logger.debug('Stream values error (expected for partial data):', err.message);
+    });
+
+    // データハンドラ
+    this.jsonValueStream.on('data', ({ value }) => {
+      if (!value) return;
+
+      try {
+        // コンテンツアレイから抽出
+        if (value.contents && Array.isArray(value.contents)) {
+          // 全コンテンツを抽出
+          const extracted = extractContentItems(value.contents);
+
+          // 新しいメッセージを追加
+          extracted.messages.forEach((message) => {
+            if (!this.messages.includes(message)) {
+              this.messages.push(message);
+            }
+          });
+
+          // 新しい差分を追加/更新
+          extracted.replacements.forEach((diff) => {
+            const existingIndex = this.replacements.findIndex(r => r.start === diff.start && r.end === diff.end);
+            if (existingIndex >= 0) {
+              this.replacements[existingIndex] = diff;
+            }
+            else {
+              this.replacements.push(diff);
+            }
+          });
+
+          // データが見つかったらコールバックを実行
+          if ((extracted.messages.length > 0 || extracted.replacements.length > 0) && this.dataCallback) {
+            this.dataCallback(this.messages, this.replacements);
+          }
+        }
+        // 個別のメッセージアイテム
+        else if (isMessageItem(value)) {
+          if (!this.messages.includes(value.message)) {
+            this.messages.push(value.message);
+            if (this.dataCallback) this.dataCallback(this.messages, this.replacements);
+          }
+        }
+        // 個別の差分アイテム
+        else if (isDiffItem(value)) {
+          const validDiff = EditorAssistantDiffSchema.safeParse(value);
+          if (validDiff.success) {
+            const diff = validDiff.data;
+            const existingIndex = this.replacements.findIndex(r => r.start === diff.start && r.end === diff.end);
+            if (existingIndex >= 0) {
+              this.replacements[existingIndex] = diff;
+            }
+            else {
+              this.replacements.push(diff);
+            }
+            if (this.dataCallback) this.dataCallback(this.messages, this.replacements);
+          }
+        }
+      }
+      catch (e) {
+        logger.debug('Error processing JSON data:', e);
+      }
+    });
+
+    // パイプライン接続
+    this.jsonParser.pipe(this.jsonValueStream);
+    this.isSetup = true;
+  }
+
+  /**
+   * データ処理のコールバックを設定
+   */
+  onData(callback: (messages: string[], replacements: EditorAssistantDiff[]) => void): void {
+    this.dataCallback = callback;
+  }
+
+  /**
+   * JSON文字列を処理
+   */
+  process(jsonString: string): void {
+    try {
+      // パーサーにデータを書き込む
+      // 注: pipe()は使わず、直接write()することでリスナーの増加を防ぐ
+      this.jsonParser.write(jsonString);
+    }
+    catch (e) {
+      logger.debug('Error processing JSON data:', e);
+    }
+  }
+
+  /**
+   * リソースを解放
+   */
+  destroy(): void {
+    try {
+      if (this.isSetup) {
+        this.jsonParser.unpipe(this.jsonValueStream);
+        this.jsonParser.removeAllListeners();
+        this.jsonValueStream.removeAllListeners();
+        this.jsonParser.end();
+        this.jsonValueStream.end();
+        this.dataCallback = null;
+        this.isSetup = false;
+      }
+    }
+    catch (e) {
+      logger.debug('Error destroying JSON processor:', e);
+    }
+  }
+
+  /**
+   * 現在のメッセージリスト
+   */
+  get messagesList(): string[] {
+    return this.messages;
+  }
+
+  /**
+   * 現在の差分リスト
+   */
+  get replacementsList(): EditorAssistantDiff[] {
+    return this.replacements;
+  }
+
+}
+
 export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (crowi) => {
 export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (crowi) => {
   const loginRequiredStrictly = require('~/server/middlewares/login-required')(crowi);
   const loginRequiredStrictly = require('~/server/middlewares/login-required')(crowi);
 
 
@@ -154,24 +312,30 @@ export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (cro
         return res.apiv3Err(new ErrorV3('GROWI AI is not enabled'), 501);
         return res.apiv3Err(new ErrorV3('GROWI AI is not enabled'), 501);
       }
       }
 
 
-      // レスポンスデータ格納用
-      const messages: string[] = [];
-      const replacements: EditorAssistantDiff[] = [];
+      // レスポンスヘッダー設定
+      res.writeHead(200, {
+        'Content-Type': 'text/event-stream;charset=utf-8',
+        'Cache-Control': 'no-cache, no-transform',
+      });
+
+      // JSONプロセッサ作成 - 単一インスタンスを管理
+      const jsonProcessor = new JsonStreamProcessor();
+
+      // データが見つかるたびにクライアントに送信
+      jsonProcessor.onData((messages, replacements) => {
+        writeSSEData(res, createEditorResponse(messages, replacements));
+      });
+
+      let stream: AssistantStream;
       let rawBuffer = '';
       let rawBuffer = '';
 
 
       try {
       try {
-        // レスポンスヘッダー設定
-        res.writeHead(200, {
-          'Content-Type': 'text/event-stream;charset=utf-8',
-          'Cache-Control': 'no-cache, no-transform',
-        });
-
         // アシスタント取得とスレッド処理
         // アシスタント取得とスレッド処理
         const assistant = await getOrCreateEditorAssistant();
         const assistant = await getOrCreateEditorAssistant();
         const thread = await openaiClient.beta.threads.retrieve(threadId);
         const thread = await openaiClient.beta.threads.retrieve(threadId);
 
 
         // ストリーム作成
         // ストリーム作成
-        const stream = openaiClient.beta.threads.runs.stream(thread.id, {
+        stream = openaiClient.beta.threads.runs.stream(thread.id, {
           assistant_id: assistant.id,
           assistant_id: assistant.id,
           additional_messages: [
           additional_messages: [
             {
             {
@@ -220,28 +384,8 @@ export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (cro
             const chunk = content.text.value;
             const chunk = content.text.value;
             rawBuffer += chunk;
             rawBuffer += chunk;
 
 
-            // JSONパース処理
-            try {
-              // ストリームから解析
-              const bufferStream = Readable.from([rawBuffer]);
-              const jsonParser = bufferStream.pipe(parser()).pipe(streamValues());
-
-              jsonParser.on('data', ({ value }) => {
-                // contentsアレイの処理
-                if (value?.contents && Array.isArray(value.contents)) {
-                  // メッセージと差分情報の抽出
-                  const extracted = extractContentItems(value.contents);
-                  messages.push(...extracted.messages);
-                  replacements.push(...extracted.replacements);
-
-                  // データ送信
-                  writeSSEData(res, createEditorResponse(messages, replacements));
-                }
-              });
-            }
-            catch (e) {
-              // JSON解析中のエラーは無視(おそらく不完全なJSONデータ)
-            }
+            // JSONプロセッサでデータを処理
+            jsonProcessor.process(rawBuffer);
 
 
             // 元のデルタも送信
             // 元のデルタも送信
             writeSSEData(res, delta);
             writeSSEData(res, delta);
@@ -277,42 +421,61 @@ export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (cro
 
 
               // 最終データ送信
               // 最終データ送信
               writeSSEData(res, createEditorResponse(
               writeSSEData(res, createEditorResponse(
-                extracted.messages.length > 0 ? extracted.messages : messages,
-                extracted.replacements.length > 0 ? extracted.replacements : replacements,
+                extracted.messages.length > 0 ? extracted.messages : jsonProcessor.messagesList,
+                extracted.replacements.length > 0 ? extracted.replacements : jsonProcessor.replacementsList,
                 true,
                 true,
               ));
               ));
             }
             }
-            else if (messages.length > 0 || replacements.length > 0) {
+            else if (jsonProcessor.messagesList.length > 0 || jsonProcessor.replacementsList.length > 0) {
               // パース結果が期待形式でなくても、部分的なデータがあれば送信
               // パース結果が期待形式でなくても、部分的なデータがあれば送信
-              writeSSEData(res, createEditorResponse(messages, replacements, true));
+              writeSSEData(res, createEditorResponse(jsonProcessor.messagesList, jsonProcessor.replacementsList, true));
             }
             }
           }
           }
           catch (e) {
           catch (e) {
-            logger.error('Failed to parse final JSON response', e);
+            logger.debug('Failed to parse final JSON response', e);
 
 
-            if (messages.length > 0 || replacements.length > 0) {
+            if (jsonProcessor.messagesList.length > 0 || jsonProcessor.replacementsList.length > 0) {
               // パース失敗でも、既存データがあれば送信
               // パース失敗でも、既存データがあれば送信
-              writeSSEData(res, createEditorResponse(messages, replacements, true));
+              writeSSEData(res, createEditorResponse(jsonProcessor.messagesList, jsonProcessor.replacementsList, true));
             }
             }
             else {
             else {
               writeSSEError(res, 'Failed to parse assistant response as JSON', 'INVALID_RESPONSE_FORMAT');
               writeSSEError(res, 'Failed to parse assistant response as JSON', 'INVALID_RESPONSE_FORMAT');
             }
             }
           }
           }
 
 
+          // JSONプロセッサとストリームのクリーンアップ
+          jsonProcessor.destroy();
           stream.off('messageDelta', messageDeltaHandler);
           stream.off('messageDelta', messageDeltaHandler);
           res.end();
           res.end();
         });
         });
 
 
         // エラーハンドラ
         // エラーハンドラ
         stream.once('error', (err) => {
         stream.once('error', (err) => {
-          logger.error(err);
+          logger.error('Stream error:', err);
+
+          // JSONプロセッサとストリームのクリーンアップ
+          jsonProcessor.destroy();
           stream.off('messageDelta', messageDeltaHandler);
           stream.off('messageDelta', messageDeltaHandler);
           writeSSEError(res, 'An error occurred while processing your request');
           writeSSEError(res, 'An error occurred while processing your request');
           res.end();
           res.end();
         });
         });
+
+        // クリーンアップ関数を設定
+        req.on('close', () => {
+          jsonProcessor.destroy();
+
+          if (stream) {
+            stream.off('messageDelta', () => {});
+            stream.off('event', () => {});
+          }
+
+          logger.debug('Connection closed by client');
+        });
       }
       }
       catch (err) {
       catch (err) {
-        logger.error(err);
+        // エラー発生時のクリーンアップ
+        jsonProcessor.destroy();
+        logger.error('Unexpected error:', err);
         return res.status(500).send(err.message);
         return res.status(500).send(err.message);
       }
       }
     },
     },