Yuki Takei 1 год назад
Родитель
Сommit
e1d8691220

+ 39 - 51
apps/app/src/features/openai/server/services/editor-assistant/llm-response-stream-processor.ts

@@ -80,82 +80,69 @@ export class LlmResponseStreamProcessor {
         // Index of the last element in current content
         const currentContentIndex = contents.length - 1;
 
-        // Process messages - only process elements that might have changed
-        // Start from the last processed length to avoid re-processing
-        const startProcessingIndex = Math.min(this.lastProcessedContentLength, contents.length);
+        // Calculate processing start index - to avoid reprocessing known elements
+        const startProcessingIndex = Math.max(0, Math.min(this.lastProcessedContentLength, contents.length) - 1);
 
-        // Process newly added elements and last element from previous batch (which might have changed)
-        const processStartIndex = Math.max(0, startProcessingIndex - 1);
+        // Process both messages and diffs in a single loop
+        let diffUpdated = false;
+        let processedDiffIndex = -1;
 
-        for (let i = processStartIndex; i < contents.length; i++) {
+        // Unified loop for processing both messages and diffs
+        for (let i = startProcessingIndex; i < contents.length; i++) {
           const item = contents[i];
+
+          // Process message items
           if (isMessageItem(item)) {
             const currentMessage = item.message;
             const previousMessage = this.processedMessages.get(i);
 
-            // Only process if the message is new or changed
             if (previousMessage !== currentMessage) {
               let appendedContent: string;
 
               if (previousMessage == null) {
-                // First occurrence of this message element - send the entire message
                 appendedContent = currentMessage;
               }
               else {
-                // Calculate the appended content since last update
                 appendedContent = this.getAppendedContent(previousMessage, currentMessage);
               }
 
-              // Update the stored message for this index
               this.processedMessages.set(i, currentMessage);
-
-              // Update final message (for backward compatibility)
               this.message = currentMessage;
 
-              // Only send if there's something to append
               if (appendedContent) {
                 this.options?.messageCallback?.(appendedContent);
               }
             }
           }
-        }
-
-        // Update last processed content length for next iteration
-        this.lastProcessedContentLength = contents.length;
-
-        // Process diffs
-        let diffUpdated = false;
-        let processedDiffIndex = -1;
-
-        // Check if diffs are included
-        for (let i = 0; i < contents.length; i++) {
-          const item = contents[i];
-          if (!isDiffItem(item)) continue;
-
-          const validDiff = LlmEditorAssistantDiffSchema.safeParse(item);
-          if (!validDiff.success) continue;
-
-          const diff = validDiff.data;
-          const key = this.getDiffKey(diff, i);
-
-          // Check if this diff has already been sent
-          if (this.sentDiffKeys.has(key)) continue;
-
-          // If the last element has changed, or if this is not the last element
-          // → Consider the diff as finalized
-          if (i < currentContentIndex || currentContentIndex > this.lastContentIndex) {
-            this.replacements.push(diff);
-            this.sentDiffKeys.add(key);
-            diffUpdated = true;
-            processedDiffIndex = Math.max(processedDiffIndex, i);
+          // Process diff items
+          else if (isDiffItem(item)) {
+            const validDiff = LlmEditorAssistantDiffSchema.safeParse(item);
+            if (!validDiff.success) continue;
+
+            const diff = validDiff.data;
+            const key = this.getDiffKey(diff, i);
+
+            // Skip if already sent
+            if (this.sentDiffKeys.has(key)) continue;
+
+            // Consider the diff as finalized if:
+            // 1. This is not the last element OR
+            // 2. The last element has changed from previous parsing
+            if (i < currentContentIndex || currentContentIndex > this.lastContentIndex) {
+              this.replacements.push(diff);
+              this.sentDiffKeys.add(key);
+              diffUpdated = true;
+              processedDiffIndex = Math.max(processedDiffIndex, i);
+            }
           }
         }
 
-        // Update last index
+        // Update tracking variables for next iteration
         this.lastContentIndex = currentContentIndex;
+        this.lastProcessedContentLength = contents.length;
 
+        // Send diff notification if new diffs were detected
         if (diffUpdated && processedDiffIndex > this.lastSentDiffIndex) {
-          // For diffs, only notify if a new index diff is confirmed
           this.lastSentDiffIndex = processedDiffIndex;
           this.options?.diffDetectedCallback?.(this.replacements[this.replacements.length - 1]);
         }
@@ -205,16 +192,15 @@ export class LlmResponseStreamProcessor {
       if (parsedJson?.contents && Array.isArray(parsedJson.contents)) {
         const contents = parsedJson.contents;
 
-        // Add any unsent diffs
-        for (let i = 0; i < contents.length; i++) {
-          const item = contents[i];
+        // Add any unsent diffs in a single loop
+        for (const item of contents) {
           if (!isDiffItem(item)) continue;
 
           const validDiff = LlmEditorAssistantDiffSchema.safeParse(item);
           if (!validDiff.success) continue;
 
           const diff = validDiff.data;
-          const key = this.getDiffKey(diff, i);
+          const key = this.getDiffKey(diff, contents.indexOf(item));
 
           // Add any diffs that haven't been sent yet
           if (!this.sentDiffKeys.has(key)) {
@@ -225,13 +211,15 @@ export class LlmResponseStreamProcessor {
       }
 
       // Final notification
-      this.options?.dataFinalizedCallback?.(this.message, this.replacements);
+      const fullMessage = Array.from(this.processedMessages.values()).join('\n\n');
+      this.options?.dataFinalizedCallback?.(fullMessage, this.replacements);
     }
     catch (e) {
       logger.debug('Failed to parse final JSON response:', e);
 
       // Send final notification even on error
-      this.options?.dataFinalizedCallback?.(this.message, this.replacements);
+      const fullMessage = Array.from(this.processedMessages.values()).join('\n\n');
+      this.options?.dataFinalizedCallback?.(fullMessage, this.replacements);
     }
   }