浏览代码

Use stream

Shun Miyazawa 1 年之前
父节点
当前提交
16fb5cfce0
共有 1 个文件被更改,包括 21 次插入4 次删除
  1. 21 4
      apps/app/src/server/service/openai/openai.ts

+ 21 - 4
apps/app/src/server/service/openai/openai.ts

@@ -1,4 +1,4 @@
-import { Readable } from 'stream';
+import { Readable, Transform } from 'stream';
 
 
 import { PageGrant, isPopulated } from '@growi/core';
 import { PageGrant, isPopulated } from '@growi/core';
 import type { HydratedDocument, Types } from 'mongoose';
 import type { HydratedDocument, Types } from 'mongoose';
@@ -9,10 +9,14 @@ import type { FileLike } from 'openai/uploads.mjs';
 import { OpenaiServiceTypes } from '~/interfaces/ai';
 import { OpenaiServiceTypes } from '~/interfaces/ai';
 import type { PageDocument, PageModel } from '~/server/models/page';
 import type { PageDocument, PageModel } from '~/server/models/page';
 import { configManager } from '~/server/service/config-manager';
 import { configManager } from '~/server/service/config-manager';
+import { createBatchStream } from '~/server/util/batch-stream';
 import loggerFactory from '~/utils/logger';
 import loggerFactory from '~/utils/logger';
 
 
+
 import { getClient } from './client-delegator';
 import { getClient } from './client-delegator';
 
 
+const BATCH_SIZE = 100;
+
 const logger = loggerFactory('growi:service:openai');
 const logger = loggerFactory('growi:service:openai');
 
 
 const createFileForVectorStore = async(pageId: Types.ObjectId, body: string): Promise<FileLike> => {
 const createFileForVectorStore = async(pageId: Types.ObjectId, body: string): Promise<FileLike> => {
@@ -60,10 +64,23 @@ class OpenaiService implements IOpenaiService {
     // TODO: https://redmine.weseek.co.jp/issues/154364
     // TODO: https://redmine.weseek.co.jp/issues/154364
 
 
     // Create all public pages VectorStoreFile
     // Create all public pages VectorStoreFile
-    const page = mongoose.model<HydratedDocument<PageDocument>, PageModel>('Page');
-    const allPublicPages = await page.find({ grant: PageGrant.GRANT_PUBLIC }).populate('revision') as PageDocument[];
+    const Page = mongoose.model<HydratedDocument<PageDocument>, PageModel>('Page');
+    const pagesStream = Page.find({ grant: PageGrant.GRANT_PUBLIC }).populate('revision').cursor({ batch_size: BATCH_SIZE });
+    const batchStrem = createBatchStream(BATCH_SIZE);
+
+    const createVectorStoreFile = this.createVectorStoreFile.bind(this);
+    const createVectorStoreFileStream = new Transform({
+      objectMode: true,
+      async transform(chunk: PageDocument[], encoding, callback) {
+        await createVectorStoreFile(chunk);
+        this.push(chunk);
+        callback();
+      },
+    });
 
 
-    await this.createVectorStoreFile(allPublicPages);
+    pagesStream
+      .pipe(batchStrem)
+      .pipe(createVectorStoreFileStream);
   }
   }
 
 
   async rebuildVectorStore(page: PageDocument) {
   async rebuildVectorStore(page: PageDocument) {