openai.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. import assert from 'node:assert';
  2. import { Readable, Transform } from 'stream';
  3. import { PageGrant, isPopulated } from '@growi/core';
  4. import type { HydratedDocument, Types } from 'mongoose';
  5. import mongoose from 'mongoose';
  6. import type OpenAI from 'openai';
  7. import { toFile } from 'openai';
  8. import ThreadRelationModel from '~/features/openai/server/models/thread-relation';
  9. import VectorStoreModel, { VectorStoreScopeType, type VectorStoreDocument } from '~/features/openai/server/models/vector-store';
  10. import VectorStoreFileRelationModel, {
  11. type VectorStoreFileRelation,
  12. prepareVectorStoreFileRelations,
  13. } from '~/features/openai/server/models/vector-store-file-relation';
  14. import type { PageDocument, PageModel } from '~/server/models/page';
  15. import { configManager } from '~/server/service/config-manager';
  16. import { createBatchStream } from '~/server/util/batch-stream';
  17. import loggerFactory from '~/utils/logger';
  18. import { OpenaiServiceTypes } from '../../interfaces/ai';
  19. import { getClient } from './client-delegator';
  20. import { oepnaiApiErrorHandler } from './openai-api-error-handler';
  21. const BATCH_SIZE = 100;
  22. const logger = loggerFactory('growi:service:openai');
  23. let isVectorStoreForPublicScopeExist = false;
  24. export interface IOpenaiService {
  25. getOrCreateThread(userId: string, vectorStoreId?: string, threadId?: string): Promise<OpenAI.Beta.Threads.Thread | undefined>;
  26. getOrCreateVectorStoreForPublicScope(): Promise<VectorStoreDocument>;
  27. deleteExpiredThreads(limit: number, apiCallInterval: number): Promise<void>;
  28. createVectorStoreFile(pages: PageDocument[]): Promise<void>;
  29. deleteVectorStoreFile(pageId: Types.ObjectId): Promise<void>;
  30. rebuildVectorStoreAll(): Promise<void>;
  31. rebuildVectorStore(page: HydratedDocument<PageDocument>): Promise<void>;
  32. }
  33. class OpenaiService implements IOpenaiService {
  34. private get client() {
  35. const openaiServiceType = configManager.getConfig('crowi', 'openai:serviceType');
  36. return getClient({ openaiServiceType });
  37. }
  38. public async getOrCreateThread(userId: string, vectorStoreId?: string, threadId?: string): Promise<OpenAI.Beta.Threads.Thread> {
  39. if (vectorStoreId != null && threadId == null) {
  40. try {
  41. const thread = await this.client.createThread(vectorStoreId);
  42. await ThreadRelationModel.create({ userId, threadId: thread.id });
  43. return thread;
  44. }
  45. catch (err) {
  46. throw new Error(err);
  47. }
  48. }
  49. const threadRelation = await ThreadRelationModel.findOne({ threadId });
  50. if (threadRelation == null) {
  51. throw new Error('ThreadRelation document is not exists');
  52. }
  53. // Check if a thread entity exists
  54. // If the thread entity does not exist, the thread-relation document is deleted
  55. try {
  56. const thread = await this.client.retrieveThread(threadRelation.threadId);
  57. // Update expiration date if thread entity exists
  58. await threadRelation.updateThreadExpiration();
  59. return thread;
  60. }
  61. catch (err) {
  62. await oepnaiApiErrorHandler(err, { notFoundError: async() => { await threadRelation.remove() } });
  63. throw new Error(err);
  64. }
  65. }
  66. public async deleteExpiredThreads(limit: number, apiCallInterval: number): Promise<void> {
  67. const expiredThreadRelations = await ThreadRelationModel.getExpiredThreadRelations(limit);
  68. if (expiredThreadRelations == null) {
  69. return;
  70. }
  71. const deletedThreadIds: string[] = [];
  72. for await (const expiredThreadRelation of expiredThreadRelations) {
  73. try {
  74. const deleteThreadResponse = await this.client.deleteThread(expiredThreadRelation.threadId);
  75. logger.debug('Delete thread', deleteThreadResponse);
  76. deletedThreadIds.push(expiredThreadRelation.threadId);
  77. // sleep
  78. await new Promise(resolve => setTimeout(resolve, apiCallInterval));
  79. }
  80. catch (err) {
  81. logger.error(err);
  82. }
  83. }
  84. await ThreadRelationModel.deleteMany({ threadId: { $in: deletedThreadIds } });
  85. }
  86. public async getOrCreateVectorStoreForPublicScope(): Promise<VectorStoreDocument> {
  87. const vectorStoreDocument = await VectorStoreModel.findOne({ scorpeType: VectorStoreScopeType.PUBLIC });
  88. if (vectorStoreDocument != null && isVectorStoreForPublicScopeExist) {
  89. return vectorStoreDocument;
  90. }
  91. if (vectorStoreDocument != null && !isVectorStoreForPublicScopeExist) {
  92. try {
  93. // Check if vector store entity exists
  94. // If the vector store entity does not exist, the vector store document is deleted
  95. await this.client.retrieveVectorStore(vectorStoreDocument.vectorStoreId);
  96. isVectorStoreForPublicScopeExist = true;
  97. return vectorStoreDocument;
  98. }
  99. catch (err) {
  100. await oepnaiApiErrorHandler(err, { notFoundError: async() => { await vectorStoreDocument.remove() } });
  101. throw new Error(err);
  102. }
  103. }
  104. const newVectorStore = await this.client.createVectorStore(VectorStoreScopeType.PUBLIC);
  105. const newVectorStoreDocument = await VectorStoreModel.create({
  106. vectorStoreId: newVectorStore.id,
  107. scorpeType: VectorStoreScopeType.PUBLIC,
  108. });
  109. isVectorStoreForPublicScopeExist = true;
  110. return newVectorStoreDocument;
  111. }
  112. private async uploadFile(pageId: Types.ObjectId, body: string): Promise<OpenAI.Files.FileObject> {
  113. const file = await toFile(Readable.from(body), `${pageId}.md`);
  114. const uploadedFile = await this.client.uploadFile(file);
  115. return uploadedFile;
  116. }
  117. async createVectorStoreFile(pages: Array<HydratedDocument<PageDocument>>): Promise<void> {
  118. const vectorStoreFileRelationsMap: Map<string, VectorStoreFileRelation> = new Map();
  119. const processUploadFile = async(page: PageDocument) => {
  120. if (page._id != null && page.grant === PageGrant.GRANT_PUBLIC && page.revision != null) {
  121. if (isPopulated(page.revision) && page.revision.body.length > 0) {
  122. const uploadedFile = await this.uploadFile(page._id, page.revision.body);
  123. prepareVectorStoreFileRelations(page._id, uploadedFile.id, vectorStoreFileRelationsMap);
  124. return;
  125. }
  126. const pagePopulatedToShowRevision = await page.populateDataToShowRevision();
  127. if (pagePopulatedToShowRevision.revision != null && pagePopulatedToShowRevision.revision.body.length > 0) {
  128. const uploadedFile = await this.uploadFile(page._id, pagePopulatedToShowRevision.revision.body);
  129. prepareVectorStoreFileRelations(page._id, uploadedFile.id, vectorStoreFileRelationsMap);
  130. }
  131. }
  132. };
  133. // Start workers to process results
  134. const workers = pages.map(processUploadFile);
  135. // Wait for all processing to complete.
  136. assert(workers.length <= BATCH_SIZE, 'workers.length must be less than or equal to BATCH_SIZE');
  137. const fileUploadResult = await Promise.allSettled(workers);
  138. fileUploadResult.forEach((result) => {
  139. if (result.status === 'rejected') {
  140. logger.error(result.reason);
  141. }
  142. });
  143. const vectorStoreFileRelations = Array.from(vectorStoreFileRelationsMap.values());
  144. const uploadedFileIds = vectorStoreFileRelations.map(data => data.fileIds).flat();
  145. if (uploadedFileIds.length === 0) {
  146. return;
  147. }
  148. try {
  149. // Save vector store file relation
  150. await VectorStoreFileRelationModel.upsertVectorStoreFileRelations(vectorStoreFileRelations);
  151. // Create vector store file
  152. const vectorStore = await this.getOrCreateVectorStoreForPublicScope();
  153. const createVectorStoreFileBatchResponse = await this.client.createVectorStoreFileBatch(vectorStore.vectorStoreId, uploadedFileIds);
  154. logger.debug('Create vector store file', createVectorStoreFileBatchResponse);
  155. }
  156. catch (err) {
  157. logger.error(err);
  158. // Delete all uploaded files if createVectorStoreFileBatch fails
  159. const pageIds = pages.map(page => page._id);
  160. for await (const pageId of pageIds) {
  161. await this.deleteVectorStoreFile(pageId);
  162. }
  163. }
  164. }
  165. async deleteVectorStoreFile(pageId: Types.ObjectId): Promise<void> {
  166. // Delete vector store file and delete vector store file relation
  167. const vectorStoreFileRelation = await VectorStoreFileRelationModel.findOne({ pageId });
  168. if (vectorStoreFileRelation == null) {
  169. return;
  170. }
  171. const deletedFileIds: string[] = [];
  172. for await (const fileId of vectorStoreFileRelation.fileIds) {
  173. try {
  174. const deleteFileResponse = await this.client.deleteFile(fileId);
  175. logger.debug('Delete vector store file', deleteFileResponse);
  176. deletedFileIds.push(fileId);
  177. }
  178. catch (err) {
  179. logger.error(err);
  180. }
  181. }
  182. const undeletedFileIds = vectorStoreFileRelation.fileIds.filter(fileId => !deletedFileIds.includes(fileId));
  183. if (undeletedFileIds.length === 0) {
  184. await vectorStoreFileRelation.remove();
  185. return;
  186. }
  187. vectorStoreFileRelation.fileIds = undeletedFileIds;
  188. await vectorStoreFileRelation.save();
  189. }
  190. async rebuildVectorStoreAll() {
  191. // TODO: https://redmine.weseek.co.jp/issues/154364
  192. // Create all public pages VectorStoreFile
  193. const Page = mongoose.model<HydratedDocument<PageDocument>, PageModel>('Page');
  194. const pagesStream = Page.find({ grant: PageGrant.GRANT_PUBLIC }).populate('revision').cursor({ batch_size: BATCH_SIZE });
  195. const batchStrem = createBatchStream(BATCH_SIZE);
  196. const createVectorStoreFile = this.createVectorStoreFile.bind(this);
  197. const createVectorStoreFileStream = new Transform({
  198. objectMode: true,
  199. async transform(chunk: HydratedDocument<PageDocument>[], encoding, callback) {
  200. await createVectorStoreFile(chunk);
  201. this.push(chunk);
  202. callback();
  203. },
  204. });
  205. pagesStream
  206. .pipe(batchStrem)
  207. .pipe(createVectorStoreFileStream);
  208. }
  209. async rebuildVectorStore(page: HydratedDocument<PageDocument>) {
  210. await this.deleteVectorStoreFile(page._id);
  211. await this.createVectorStoreFile([page]);
  212. }
  213. }
  214. let instance: OpenaiService;
  215. export const getOpenaiService = (): IOpenaiService | undefined => {
  216. if (instance != null) {
  217. return instance;
  218. }
  219. const aiEnabled = configManager.getConfig('crowi', 'app:aiEnabled');
  220. const openaiServiceType = configManager.getConfig('crowi', 'openai:serviceType');
  221. if (aiEnabled && openaiServiceType != null && OpenaiServiceTypes.includes(openaiServiceType)) {
  222. instance = new OpenaiService();
  223. return instance;
  224. }
  225. return;
  226. };