|
|
@@ -20,7 +20,7 @@ import type { PageModel } from '../../models/page';
|
|
|
import { createBatchStream } from '../../util/batch-stream';
|
|
|
import { configManager } from '../config-manager';
|
|
|
import type { UpdateOrInsertPagesOpts } from '../interfaces/search';
|
|
|
-
|
|
|
+import { openaiService } from '../openai';
|
|
|
|
|
|
import { aggregatePipelineToIndex } from './aggregate-to-index';
|
|
|
import type { AggregatedPage, BulkWriteBody, BulkWriteCommand } from './bulk-write';
|
|
|
@@ -380,6 +380,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
const document: BulkWriteBody = {
|
|
|
path: page.path,
|
|
|
body: page.revision.body,
|
|
|
+ body_embedded: page.revisionBodyEmbedded,
|
|
|
username: page.creator?.username,
|
|
|
comments: page.commentsCount > 0 ? page.comments : undefined,
|
|
|
comment_count: page.commentsCount,
|
|
|
@@ -479,6 +480,19 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
},
|
|
|
});
|
|
|
|
|
|
+ const appendEmbeddingStream = new Transform({
|
|
|
+ objectMode: true,
|
|
|
+ async transform(chunk: AggregatedPage[], encoding, callback) {
|
|
|
+ // append embedding
|
|
|
+ for await (const doc of chunk) {
|
|
|
+ doc.revisionBodyEmbedded = (await openaiService.embed(doc.creator.username, doc.revision.body))[0].embedding;
|
|
|
+ }
|
|
|
+
|
|
|
+ this.push(chunk);
|
|
|
+ callback();
|
|
|
+ },
|
|
|
+ });
|
|
|
+
|
|
|
let count = 0;
|
|
|
const writeStream = new Writable({
|
|
|
objectMode: true,
|
|
|
@@ -532,6 +546,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
readStream
|
|
|
.pipe(batchStream)
|
|
|
.pipe(appendTagNamesStream)
|
|
|
+ .pipe(appendEmbeddingStream)
|
|
|
.pipe(writeStream);
|
|
|
|
|
|
return streamToPromise(writeStream);
|