Yuki Takei 1 rok temu
rodzic
commit
c53a90a219

+ 203 - 186
apps/app/src/features/openai/server/routes/edit.ts → apps/app/src/features/openai/server/routes/edit/index.ts

@@ -8,10 +8,9 @@ import { body } from 'express-validator';
 import { zodResponseFormat } from 'openai/helpers/zod';
 import type { AssistantStream } from 'openai/lib/AssistantStream';
 import type { MessageDelta } from 'openai/resources/beta/threads/messages.mjs';
-import { parser } from 'stream-json';
-import { streamValues } from 'stream-json/streamers/StreamValues';
 import { z } from 'zod';
 
+// 必要なインポート
 import { getOrCreateEditorAssistant } from '~/features/openai/server/services/assistant';
 import type Crowi from '~/server/crowi';
 import { accessTokenParser } from '~/server/middlewares/access-token-parser';
@@ -19,16 +18,21 @@ import { apiV3FormValidator } from '~/server/middlewares/apiv3-form-validator';
 import type { ApiV3Response } from '~/server/routes/apiv3/interfaces/apiv3-response';
 import loggerFactory from '~/utils/logger';
 
-import { MessageErrorCode, type StreamErrorCode } from '../../interfaces/message-error';
-import { openaiClient } from '../services/client';
-import { getStreamErrorCode } from '../services/getStreamErrorCode';
-import { getOpenaiService } from '../services/openai';
-import { replaceAnnotationWithPageLink } from '../services/replace-annotation-with-page-link';
-
-import { certifyAiService } from './middlewares/certify-ai-service';
+import { MessageErrorCode } from '../../../interfaces/message-error';
+import { openaiClient } from '../../services/client';
+import { getStreamErrorCode } from '../../services/getStreamErrorCode';
+import { getOpenaiService } from '../../services/openai';
+import { replaceAnnotationWithPageLink } from '../../services/replace-annotation-with-page-link';
+import { certifyAiService } from '../middlewares/certify-ai-service';
+import { JsonStreamProcessor } from '../utils/json-stream-processor';
+import { SseHelper } from '../utils/sse-helper';
 
 const logger = loggerFactory('growi:routes:apiv3:openai:message');
 
+// -----------------------------------------------------------------------------
+// 型定義
+// -----------------------------------------------------------------------------
+
 // スキーマ定義
 const EditorAssistantMessageSchema = z.object({
   message: z.string().describe('A friendly message explaining what changes were made or suggested'),
@@ -54,10 +58,14 @@ type ReqBody = {
   aiAssistantId?: string,
   threadId?: string,
 }
+
 type Req = Request<undefined, Response, ReqBody> & {
   user: IUserHasId,
 }
-type PostMessageHandlersFactory = (crowi: Crowi) => RequestHandler[];
+
+// -----------------------------------------------------------------------------
+// ユーティリティ関数 (外形のみ)
+// -----------------------------------------------------------------------------
 
 /**
  * 型ガード: メッセージ型かどうかを判定する
@@ -107,158 +115,191 @@ const createEditorResponse = (messages: string[], replacements: EditorAssistantD
   ...(isDone ? { isDone: true } : {}),
 });
 
-/**
- * SSEフォーマットでデータを送信する
- */
-const writeSSEData = (res: Response, data: unknown) => {
-  res.write(`data: ${JSON.stringify(data)}\n\n`);
-};
+// -----------------------------------------------------------------------------
+// エディターデータプロセッサ実装 (外形のみ)
+// -----------------------------------------------------------------------------
 
 /**
- * SSEフォーマットでエラーを送信する
+ * EditorAssistant用のストリームデータプロセッサ
+ * JSONストリームから編集用のメッセージと差分を抽出する
  */
-const writeSSEError = (res: Response, message: string, code?: StreamErrorCode) => {
-  res.write(`error: ${JSON.stringify({ code, message })}\n\n`);
-};
-
-/**
- * 単一の JSON パーサーインスタンスを作成して継続的に使用する
- * これにより、メモリリークやイベントリスナーの過剰な登録を防止する
- */
-class JsonStreamProcessor {
+class EditorStreamProcessor {
 
-  // パーサーコンポーネント
-  private jsonParser = parser({ jsonStreaming: true });
+  // JSONプロセッサ
+  private jsonProcessor: JsonStreamProcessor;
 
-  private jsonValueStream = streamValues();
-
-  // 内部状態
+  // 確認済みのデータ
   private messages: string[] = [];
 
   private replacements: EditorAssistantDiff[] = [];
 
-  private dataCallback: ((messages: string[], replacements: EditorAssistantDiff[]) => void) | null = null;
+  // ハンドラーID
+  private handlerIds: string[] = [];
+
+  constructor(private sseHelper: SseHelper) {
+    // JsonStreamProcessorの初期化
+    this.jsonProcessor = new JsonStreamProcessor();
+    this.setupHandlers();
+  }
+
+  /**
+   * 処理ハンドラーを設定
+   */
+  private setupHandlers(): void {
+    // コンテンツ配列ハンドラー
+    this.handlerIds.push(
+      this.jsonProcessor.registerHandler(
+        (value): value is { contents: unknown[] } => {
+          return typeof value === 'object' && value !== null && 'contents' in value && Array.isArray(value.contents);
+        },
+        data => this.handleContents(data.contents),
+      ),
+    );
+
+    // 単一メッセージハンドラー
+    this.handlerIds.push(
+      this.jsonProcessor.registerHandler(
+        isMessageItem,
+        message => this.handleMessage(message.message),
+      ),
+    );
+
+    // 単一差分ハンドラー
+    this.handlerIds.push(
+      this.jsonProcessor.registerHandler(
+        isDiffItem,
+        diff => this.handleDiff(diff),
+      ),
+    );
+  }
 
-  private isSetup = false;
+  /**
+   * JSON文字列を処理
+   */
+  process(jsonString: string): void {
+    this.jsonProcessor.process(jsonString);
+  }
 
-  constructor() {
-    // リスナー上限を増やしてワーニングを防止
-    this.jsonParser.setMaxListeners(20);
-    this.jsonValueStream.setMaxListeners(20);
+  /**
+   * コンテンツ配列を処理
+   */
+  private handleContents(contents: unknown[]): void {
+    const extracted = extractContentItems(contents);
 
-    // エラーハンドリング設定
-    this.jsonParser.on('error', (err) => {
-      logger.debug('JSON parser error (expected for partial data):', err.message);
-    });
+    let hasUpdates = false;
 
-    this.jsonValueStream.on('error', (err) => {
-      logger.debug('Stream values error (expected for partial data):', err.message);
+    // メッセージ処理
+    extracted.messages.forEach((msg) => {
+      if (!this.messages.includes(msg)) {
+        this.messages.push(msg);
+        hasUpdates = true;
+      }
     });
 
-    // データハンドラ
-    this.jsonValueStream.on('data', ({ value }) => {
-      if (!value) return;
-
-      try {
-        // コンテンツアレイから抽出
-        if (value.contents && Array.isArray(value.contents)) {
-          // 全コンテンツを抽出
-          const extracted = extractContentItems(value.contents);
-
-          // 新しいメッセージを追加
-          extracted.messages.forEach((message) => {
-            if (!this.messages.includes(message)) {
-              this.messages.push(message);
-            }
-          });
-
-          // 新しい差分を追加/更新
-          extracted.replacements.forEach((diff) => {
-            const existingIndex = this.replacements.findIndex(r => r.start === diff.start && r.end === diff.end);
-            if (existingIndex >= 0) {
-              this.replacements[existingIndex] = diff;
-            }
-            else {
-              this.replacements.push(diff);
-            }
-          });
-
-          // データが見つかったらコールバックを実行
-          if ((extracted.messages.length > 0 || extracted.replacements.length > 0) && this.dataCallback) {
-            this.dataCallback(this.messages, this.replacements);
-          }
-        }
-        // 個別のメッセージアイテム
-        else if (isMessageItem(value)) {
-          if (!this.messages.includes(value.message)) {
-            this.messages.push(value.message);
-            if (this.dataCallback) this.dataCallback(this.messages, this.replacements);
-          }
-        }
-        // 個別の差分アイテム
-        else if (isDiffItem(value)) {
-          const validDiff = EditorAssistantDiffSchema.safeParse(value);
-          if (validDiff.success) {
-            const diff = validDiff.data;
-            const existingIndex = this.replacements.findIndex(r => r.start === diff.start && r.end === diff.end);
-            if (existingIndex >= 0) {
-              this.replacements[existingIndex] = diff;
-            }
-            else {
-              this.replacements.push(diff);
-            }
-            if (this.dataCallback) this.dataCallback(this.messages, this.replacements);
-          }
-        }
+    // 差分処理
+    extracted.replacements.forEach((diff) => {
+      const existingIndex = this.replacements.findIndex(r => r.start === diff.start && r.end === diff.end);
+      if (existingIndex >= 0) {
+        this.replacements[existingIndex] = diff;
       }
-      catch (e) {
-        logger.debug('Error processing JSON data:', e);
+      else {
+        this.replacements.push(diff);
       }
+      hasUpdates = true;
     });
 
-    // パイプライン接続
-    this.jsonParser.pipe(this.jsonValueStream);
-    this.isSetup = true;
+    // 更新があればクライアントに通知
+    if (hasUpdates) {
+      this.notifyClient();
+    }
   }
 
   /**
-   * データ処理のコールバックを設定
+   * 単一メッセージを処理
    */
-  onData(callback: (messages: string[], replacements: EditorAssistantDiff[]) => void): void {
-    this.dataCallback = callback;
+  private handleMessage(message: string): void {
+    if (!this.messages.includes(message)) {
+      this.messages.push(message);
+      this.notifyClient();
+    }
   }
 
   /**
-   * JSON文字列を処理
+   * 単一差分を処理
    */
-  process(jsonString: string): void {
-    try {
-      // パーサーにデータを書き込む
-      // 注: pipe()は使わず、直接write()することでリスナーの増加を防ぐ
-      this.jsonParser.write(jsonString);
+  private handleDiff(diff: EditorAssistantDiff): void {
+    const validDiff = EditorAssistantDiffSchema.safeParse(diff);
+    if (!validDiff.success) return;
+
+    const existingIndex = this.replacements.findIndex(r => r.start === diff.start && r.end === diff.end);
+    if (existingIndex >= 0) {
+      this.replacements[existingIndex] = validDiff.data;
     }
-    catch (e) {
-      logger.debug('Error processing JSON data:', e);
+    else {
+      this.replacements.push(validDiff.data);
     }
+
+    this.notifyClient();
+  }
+
+  /**
+   * クライアントに通知
+   */
+  private notifyClient(): void {
+    this.sseHelper.writeData(createEditorResponse(this.messages, this.replacements));
   }
 
   /**
    * リソースを解放
    */
   destroy(): void {
+    // ハンドラー登録解除
+    this.handlerIds.forEach((id) => {
+      this.jsonProcessor.unregisterHandler(id);
+    });
+
+    // プロセッサ解放
+    this.jsonProcessor.destroy();
+  }
+
+  /**
+   * 最終結果を送信
+   */
+  sendFinalResult(rawBuffer: string): void {
     try {
-      if (this.isSetup) {
-        this.jsonParser.unpipe(this.jsonValueStream);
-        this.jsonParser.removeAllListeners();
-        this.jsonValueStream.removeAllListeners();
-        this.jsonParser.end();
-        this.jsonValueStream.end();
-        this.dataCallback = null;
-        this.isSetup = false;
+      // 完全なJSONをパース
+      const parsedJson = JSON.parse(rawBuffer);
+
+      if (parsedJson?.contents && Array.isArray(parsedJson.contents)) {
+        // 最終的なコンテンツを抽出
+        const extracted = extractContentItems(parsedJson.contents);
+
+        // 最終データを送信(完全なデータがあればそちらを優先)
+        this.sseHelper.writeData(createEditorResponse(
+          extracted.messages.length > 0 ? extracted.messages : this.messages,
+          extracted.replacements.length > 0 ? extracted.replacements : this.replacements,
+          true, // 完了フラグ
+        ));
+      }
+      else if (this.messages.length > 0 || this.replacements.length > 0) {
+        // パース結果が期待形式でなくても蓄積データがあれば送信
+        this.sseHelper.writeData(createEditorResponse(this.messages, this.replacements, true));
+      }
+      else {
+        // データがない場合はエラー
+        this.sseHelper.writeError('Failed to parse assistant response as JSON', 'INVALID_RESPONSE_FORMAT');
       }
     }
     catch (e) {
-      logger.debug('Error destroying JSON processor:', e);
+      logger.debug('Failed to parse final JSON response:', e);
+
+      // パースエラー時も蓄積データがあれば送信
+      if (this.messages.length > 0 || this.replacements.length > 0) {
+        this.sseHelper.writeData(createEditorResponse(this.messages, this.replacements, true));
+      }
+      else {
+        this.sseHelper.writeError('Failed to parse assistant response as JSON', 'INVALID_RESPONSE_FORMAT');
+      }
     }
   }
 
@@ -278,9 +319,19 @@ class JsonStreamProcessor {
 
 }
 
+// -----------------------------------------------------------------------------
+// エンドポイントハンドラーファクトリ
+// -----------------------------------------------------------------------------
+
+type PostMessageHandlersFactory = (crowi: Crowi) => RequestHandler[];
+
+/**
+ * エディタアシスタントのエンドポイントハンドラを作成する
+ */
 export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (crowi) => {
   const loginRequiredStrictly = require('~/server/middlewares/login-required')(crowi);
 
+  // バリデータ設定
   const validator: ValidationChain[] = [
     body('userMessage')
       .isString()
@@ -312,30 +363,25 @@ export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (cro
         return res.apiv3Err(new ErrorV3('GROWI AI is not enabled'), 501);
       }
 
-      // レスポンスヘッダー設定
-      res.writeHead(200, {
-        'Content-Type': 'text/event-stream;charset=utf-8',
-        'Cache-Control': 'no-cache, no-transform',
-      });
+      // SSEヘルパーとストリームプロセッサの初期化
+      const sseHelper = new SseHelper(res);
+      const streamProcessor = new EditorStreamProcessor(sseHelper);
 
-      // JSONプロセッサ作成 - 単一インスタンスを管理
-      const jsonProcessor = new JsonStreamProcessor();
-
-      // データが見つかるたびにクライアントに送信
-      jsonProcessor.onData((messages, replacements) => {
-        writeSSEData(res, createEditorResponse(messages, replacements));
-      });
+      try {
+        // レスポンスヘッダー設定
+        res.writeHead(200, {
+          'Content-Type': 'text/event-stream;charset=utf-8',
+          'Cache-Control': 'no-cache, no-transform',
+        });
 
-      let stream: AssistantStream;
-      let rawBuffer = '';
+        let rawBuffer = '';
 
-      try {
         // アシスタント取得とスレッド処理
         const assistant = await getOrCreateEditorAssistant();
         const thread = await openaiClient.beta.threads.retrieve(threadId);
 
         // ストリーム作成
-        stream = openaiClient.beta.threads.runs.stream(thread.id, {
+        const stream = openaiClient.beta.threads.runs.stream(thread.id, {
           assistant_id: assistant.id,
           additional_messages: [
             {
@@ -385,13 +431,13 @@ export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (cro
             rawBuffer += chunk;
 
             // JSONプロセッサでデータを処理
-            jsonProcessor.process(rawBuffer);
+            streamProcessor.process(rawBuffer);
 
             // 元のデルタも送信
-            writeSSEData(res, delta);
+            sseHelper.writeData(delta);
           }
           else {
-            writeSSEData(res, delta);
+            sseHelper.writeData(delta);
           }
         };
 
@@ -405,64 +451,35 @@ export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (cro
             if (errorMessage == null) return;
 
             logger.error(errorMessage);
-            writeSSEError(res, errorMessage, getStreamErrorCode(errorMessage));
+            sseHelper.writeError(errorMessage, getStreamErrorCode(errorMessage));
           }
         });
 
         // 完了ハンドラ
         stream.once('messageDone', () => {
-          // 最終確認として完全なJSONをパース
-          try {
-            const parsedJson = JSON.parse(rawBuffer);
-
-            if (parsedJson?.contents && Array.isArray(parsedJson.contents)) {
-              // 最終的なメッセージと差分を収集
-              const extracted = extractContentItems(parsedJson.contents);
-
-              // 最終データ送信
-              writeSSEData(res, createEditorResponse(
-                extracted.messages.length > 0 ? extracted.messages : jsonProcessor.messagesList,
-                extracted.replacements.length > 0 ? extracted.replacements : jsonProcessor.replacementsList,
-                true,
-              ));
-            }
-            else if (jsonProcessor.messagesList.length > 0 || jsonProcessor.replacementsList.length > 0) {
-              // パース結果が期待形式でなくても、部分的なデータがあれば送信
-              writeSSEData(res, createEditorResponse(jsonProcessor.messagesList, jsonProcessor.replacementsList, true));
-            }
-          }
-          catch (e) {
-            logger.debug('Failed to parse final JSON response', e);
-
-            if (jsonProcessor.messagesList.length > 0 || jsonProcessor.replacementsList.length > 0) {
-              // パース失敗でも、既存データがあれば送信
-              writeSSEData(res, createEditorResponse(jsonProcessor.messagesList, jsonProcessor.replacementsList, true));
-            }
-            else {
-              writeSSEError(res, 'Failed to parse assistant response as JSON', 'INVALID_RESPONSE_FORMAT');
-            }
-          }
+          // 最終結果を処理して送信
+          streamProcessor.sendFinalResult(rawBuffer);
 
-          // JSONプロセッサとストリームのクリーンアップ
-          jsonProcessor.destroy();
+          // ストリームのクリーンアップ
+          streamProcessor.destroy();
           stream.off('messageDelta', messageDeltaHandler);
-          res.end();
+          sseHelper.end();
         });
 
         // エラーハンドラ
         stream.once('error', (err) => {
           logger.error('Stream error:', err);
 
-          // JSONプロセッサとストリームのクリーンアップ
-          jsonProcessor.destroy();
+          // クリーンアップ
+          streamProcessor.destroy();
           stream.off('messageDelta', messageDeltaHandler);
-          writeSSEError(res, 'An error occurred while processing your request');
-          res.end();
+          sseHelper.writeError('An error occurred while processing your request');
+          sseHelper.end();
         });
 
-        // クリーンアップ関数を設定
+        // クライアント切断時のクリーンアップ
         req.on('close', () => {
-          jsonProcessor.destroy();
+          streamProcessor.destroy();
 
           if (stream) {
             stream.off('messageDelta', () => {});
@@ -473,9 +490,9 @@ export const postMessageToEditHandlersFactory: PostMessageHandlersFactory = (cro
         });
       }
       catch (err) {
-        // エラー発生時のクリーンアップ
-        jsonProcessor.destroy();
-        logger.error('Unexpected error:', err);
+        // エラー発生時のクリーンアップと応答
+        logger.error('Error in edit handler:', err);
+        streamProcessor.destroy();
         return res.status(500).send(err.message);
       }
     },

+ 174 - 0
apps/app/src/features/openai/server/routes/utils/json-stream-processor.ts

@@ -0,0 +1,174 @@
+import { parser } from 'stream-json';
+import { streamValues } from 'stream-json/streamers/StreamValues';
+import { v4 as uuidv4 } from 'uuid';
+
+import loggerFactory from '~/utils/logger';
+
+const logger = loggerFactory('growi:routes:apiv3:openai:json-stream-processor');
+
+/**
+ * ハンドラー登録情報
+ */
+interface HandlerRegistration<T> {
+  /** ハンドラーID */
+  id: string;
+  /** 型フィルター */
+  filter: (value: unknown) => value is T;
+  /** データハンドラー */
+  handler: (data: T) => void;
+}
+
+/**
+ * 汎用的なJSONストリーム処理のためのインターフェース
+ */
+export interface IJsonStreamProcessor {
+  /**
+   * ハンドラーを登録する
+   * @param filter 型フィルター関数
+   * @param handler データハンドラー関数
+   * @returns ハンドラーID(登録解除時に使用)
+   */
+  registerHandler<T>(filter: (value: unknown) => value is T, handler: (data: T) => void): string;
+
+  /**
+   * 登録済みハンドラーを解除する
+   * @param handlerId ハンドラーID
+   * @returns 解除に成功したか
+   */
+  unregisterHandler(handlerId: string): boolean;
+
+  /**
+   * JSON文字列を処理する
+   * @param jsonString 処理するJSON文字列
+   */
+  process(jsonString: string): void;
+
+  /**
+   * リソースを解放する
+   */
+  destroy(): void;
+}
+
+/**
+ * 汎用JSONストリームプロセッサ
+ * 不完全なJSONをストリーミング処理し、完全なオブジェクトが見つかったらハンドラーに通知する
+ */
+export class JsonStreamProcessor implements IJsonStreamProcessor {
+
+  // パーサーコンポーネント
+  private jsonParser = parser({ jsonStreaming: true });
+
+  private jsonValueStream = streamValues();
+
+  // ハンドラー登録情報
+  private handlers: HandlerRegistration<unknown>[] = [];
+
+  private isSetup = false;
+
+  constructor() {
+    // パーサー設定と初期化
+    this.setup();
+  }
+
+  /**
+   * パーサーの初期設定を行う
+   */
+  private setup(): void {
+    if (this.isSetup) return;
+
+    // リスナー上限を増やしてワーニングを防止
+    this.jsonParser.setMaxListeners(20);
+    this.jsonValueStream.setMaxListeners(20);
+
+    // エラーハンドリング
+    this.jsonParser.on('error', (err) => {
+      logger.debug('JSON parser error (expected for partial data):', err.message);
+    });
+
+    this.jsonValueStream.on('error', (err) => {
+      logger.debug('Stream values error (expected for partial data):', err.message);
+    });
+
+    // データハンドラ
+    this.jsonValueStream.on('data', ({ value }) => {
+      if (!value) return;
+      this.notifyHandlers(value);
+    });
+
+    // パイプライン接続
+    this.jsonParser.pipe(this.jsonValueStream);
+    this.isSetup = true;
+  }
+
+  /**
+   * ハンドラーを登録する
+   */
+  registerHandler<T>(filter: (value: unknown) => value is T, handler: (data: T) => void): string {
+    const id = uuidv4();
+    this.handlers.push({
+      id,
+      filter: filter as (value: unknown) => value is unknown,
+      handler: handler as (data: unknown) => void,
+    });
+    return id;
+  }
+
+  /**
+   * 登録済みハンドラーを解除する
+   */
+  unregisterHandler(handlerId: string): boolean {
+    const initialLength = this.handlers.length;
+    this.handlers = this.handlers.filter(h => h.id !== handlerId);
+    return this.handlers.length < initialLength;
+  }
+
+  /**
+   * JSON文字列を処理する
+   */
+  process(jsonString: string): void {
+    try {
+      // パーサーに直接データを書き込む
+      this.jsonParser.write(jsonString);
+    }
+    catch (e) {
+      logger.debug('Error processing JSON data:', e);
+    }
+  }
+
+  /**
+   * リソースを解放する
+   */
+  destroy(): void {
+    try {
+      if (this.isSetup) {
+        this.jsonParser.unpipe(this.jsonValueStream);
+        this.jsonParser.removeAllListeners();
+        this.jsonValueStream.removeAllListeners();
+        this.handlers = [];
+        this.jsonParser.end();
+        this.jsonValueStream.end();
+        this.isSetup = false;
+      }
+    }
+    catch (e) {
+      logger.debug('Error destroying JSON processor:', e);
+    }
+  }
+
+  /**
+   * 登録済みハンドラーに通知する
+   */
+  private notifyHandlers(value: unknown): void {
+    for (const { filter, handler } of this.handlers) {
+      try {
+        if (filter(value)) {
+          handler(value);
+        }
+      }
+      catch (e) {
+        logger.debug('Error in handler:', e);
+      }
+    }
+  }
+
+}

+ 54 - 0
apps/app/src/features/openai/server/routes/utils/sse-helper.ts

@@ -0,0 +1,54 @@
+import type { Response } from 'express';
+
+import type { StreamErrorCode } from '../../../interfaces/message-error';
+
+/**
+ * SSE通信を簡略化するためのインターフェース
+ */
+export interface ISseHelper {
+  /**
+   * SSEフォーマットでデータを送信する
+   */
+  writeData(data: unknown): void;
+
+  /**
+   * SSEフォーマットでエラーを送信する
+   */
+  writeError(message: string, code?: StreamErrorCode): void;
+
+  /**
+   * レスポンスを終了する
+   */
+  end(): void;
+}
+
+/**
+ * SSEヘルパークラス
+ * レスポンスオブジェクトにSSEフォーマットでデータを書き込む機能を提供
+ */
+export class SseHelper implements ISseHelper {
+
+  constructor(private res: Response) {}
+
+  /**
+   * SSEフォーマットでデータを送信する
+   */
+  writeData(data: unknown): void {
+    this.res.write(`data: ${JSON.stringify(data)}\n\n`);
+  }
+
+  /**
+   * SSEフォーマットでエラーを送信する
+   */
+  writeError(message: string, code?: StreamErrorCode): void {
+    this.res.write(`error: ${JSON.stringify({ code, message })}\n\n`);
+  }
+
+  /**
+   * レスポンスを終了する
+   */
+  end(): void {
+    this.res.end();
+  }
+
+}