yjs.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. import type { IncomingMessage } from 'http';
  2. import type { IPage, IUserHasId } from '@growi/core';
  3. import { YDocStatus } from '@growi/core/dist/consts';
  4. import mongoose from 'mongoose';
  5. import type { Server } from 'socket.io';
  6. import { MongodbPersistence } from 'y-mongodb-provider';
  7. import type { Document, Persistence } from 'y-socket.io/dist/server';
  8. import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
  9. import * as Y from 'yjs';
  10. import loggerFactory from '~/utils/logger';
  11. import type { PageModel } from '../models/page';
  12. import { Revision } from '../models/revision';
  13. const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
  14. const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
  15. const logger = loggerFactory('growi:service:yjs');
  16. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  17. type Delta = Array<{insert?:Array<any>|string, delete?:number, retain?:number}>;
  18. type RequestWithUser = IncomingMessage & { user: IUserHasId };
  19. export interface IYjsService {
  20. getYDocStatus(pageId: string): Promise<YDocStatus>;
  21. // handleYDocSync(pageId: string, initialValue: string): Promise<void>;
  22. handleYDocUpdate(pageId: string, newValue: string): Promise<void>;
  23. getCurrentYdoc(pageId: string): Ydoc | undefined;
  24. }
  25. class YjsService implements IYjsService {
  26. private ysocketio: YSocketIO;
  27. private mdb: MongodbPersistence;
  28. constructor(io: Server) {
  29. const mdb = new MongodbPersistence(
  30. // ignore TS2345: Argument of type '{ client: any; db: any; }' is not assignable to parameter of type 'string'.
  31. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  32. // @ts-ignore
  33. {
  34. // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
  35. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  36. client: mongoose.connection.getClient() as any,
  37. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  38. db: mongoose.connection.db as any,
  39. },
  40. {
  41. collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
  42. flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
  43. },
  44. );
  45. this.mdb = mdb;
  46. const ysocketio = new YSocketIO(io);
  47. this.injectPersistence(ysocketio, mdb);
  48. ysocketio.initialize();
  49. this.ysocketio = ysocketio;
  50. this.createIndexes();
  51. ysocketio.on('document-loaded', async(doc: Document) => {
  52. const pageId = doc.name;
  53. if (pageId == null) {
  54. return;
  55. }
  56. const ydocStatus = await this.getYDocStatus(pageId);
  57. const shouldSync = ydocStatus === YDocStatus.NEW || ydocStatus === YDocStatus.OUTDATED;
  58. if (shouldSync) {
  59. logger.debug(`Initialize the page ('${pageId}') with the latest revision body`);
  60. const revision = await Revision
  61. .findOne({ pageId })
  62. .sort({ createdAt: -1 })
  63. .lean();
  64. if (revision?.body != null) {
  65. const ytext = doc.getText('codemirror');
  66. const delta: Delta = (ydocStatus === YDocStatus.OUTDATED && ytext.length > 0)
  67. ? [
  68. { delete: ytext.length },
  69. { insert: revision.body },
  70. ]
  71. : [
  72. { insert: revision.body },
  73. ];
  74. ytext.applyDelta(delta, { sanitize: false });
  75. }
  76. mdb.setMeta(doc.name, 'updatedAt', revision?.createdAt.getTime() ?? Date.now());
  77. }
  78. });
  79. // io.on('connection', (socket) => {
  80. // ysocketio.on('awareness-update', async(doc: Document) => {
  81. // const pageId = extractPageIdFromYdocId(doc.name);
  82. // if (pageId == null) return;
  83. // const awarenessStateSize = doc.awareness.states.size;
  84. // // Triggered when awareness changes
  85. // io
  86. // .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  87. // .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
  88. // // Triggered when the last user leaves the editor
  89. // if (awarenessStateSize === 0) {
  90. // const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
  91. // io
  92. // .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  93. // .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
  94. // }
  95. // });
  96. // socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
  97. // try {
  98. // await this.handleYDocSync(pageId, initialValue);
  99. // }
  100. // catch (error) {
  101. // logger.warn(error.message);
  102. // socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
  103. // }
  104. // });
  105. // });
  106. }
  107. private injectPersistence(ysocketio: YSocketIO, mdb: MongodbPersistence): void {
  108. const persistece: Persistence = {
  109. provider: mdb,
  110. bindState: async(docName, ydoc) => {
  111. logger.debug('bindState', { docName });
  112. const persistedYdoc = await mdb.getYDoc(docName);
  113. // get the state vector so we can just store the diffs between client and server
  114. const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  115. const diff = Y.encodeStateAsUpdate(ydoc, persistedStateVector);
  116. // store the new data in db (if there is any: empty update is an array of 0s)
  117. if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
  118. mdb.storeUpdate(docName, diff);
  119. mdb.setMeta(docName, 'updatedAt', Date.now());
  120. }
  121. // send the persisted data to clients
  122. Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
  123. // store updates of the document in db
  124. ydoc.on('update', async(update) => {
  125. mdb.storeUpdate(docName, update);
  126. mdb.setMeta(docName, 'updatedAt', Date.now());
  127. });
  128. // cleanup some memory
  129. persistedYdoc.destroy();
  130. },
  131. writeState: async(docName) => {
  132. logger.debug('writeState', { docName });
  133. // This is called when all connections to the document are closed.
  134. // flush document on close to have the smallest possible database
  135. await mdb.flushDocument(docName);
  136. },
  137. };
  138. // foce set to private property
  139. // eslint-disable-next-line dot-notation
  140. ysocketio['persistence'] = persistece;
  141. }
  142. private async createIndexes(): Promise<void> {
  143. const collection = mongoose.connection.collection(MONGODB_PERSISTENCE_COLLECTION_NAME);
  144. try {
  145. await collection.createIndexes([
  146. {
  147. key: {
  148. version: 1,
  149. docName: 1,
  150. action: 1,
  151. clock: 1,
  152. part: 1,
  153. },
  154. },
  155. // for metaKey
  156. {
  157. key: {
  158. version: 1,
  159. docName: 1,
  160. metaKey: 1,
  161. },
  162. },
  163. // for flushDocument / clearDocument
  164. {
  165. key: {
  166. docName: 1,
  167. clock: 1,
  168. },
  169. },
  170. ]);
  171. }
  172. catch (err) {
  173. logger.error('Failed to create Index', err);
  174. throw err;
  175. }
  176. }
  177. public async getYDocStatus(pageId: string): Promise<YDocStatus> {
  178. const dumpLog = (status: YDocStatus, args?: { [key: string]: number }) => {
  179. logger.debug(`getYDocStatus('${pageId}') detected '${status}'`, args ?? {});
  180. };
  181. // get the latest revision createdAt
  182. const result = await Revision
  183. .findOne(
  184. // filter
  185. { pageId },
  186. // projection
  187. { createdAt: 1 },
  188. { sort: { createdAt: -1 } },
  189. );
  190. if (result == null) {
  191. dumpLog(YDocStatus.ISOLATED);
  192. return YDocStatus.ISOLATED;
  193. }
  194. // count yjs-writings documents with updatedAt > latestRevision.updatedAt
  195. const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
  196. if (ydocUpdatedAt == null) {
  197. dumpLog(YDocStatus.NEW);
  198. return YDocStatus.NEW;
  199. }
  200. const { createdAt } = result;
  201. const lastRevisionCreatedAt = createdAt.getTime();
  202. if (lastRevisionCreatedAt < ydocUpdatedAt) {
  203. dumpLog(YDocStatus.DRAFT, { lastRevisionCreatedAt, ydocUpdatedAt });
  204. return YDocStatus.DRAFT;
  205. }
  206. if (lastRevisionCreatedAt === ydocUpdatedAt) {
  207. dumpLog(YDocStatus.SYNCED, { lastRevisionCreatedAt, ydocUpdatedAt });
  208. return YDocStatus.SYNCED;
  209. }
  210. dumpLog(YDocStatus.OUTDATED, { lastRevisionCreatedAt, ydocUpdatedAt });
  211. return YDocStatus.OUTDATED;
  212. }
  213. // public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
  214. // const currentYdoc = this.getCurrentYdoc(pageId);
  215. // if (currentYdoc == null) {
  216. // return;
  217. // }
  218. // const persistedYdoc = await this.getPersistedYdoc(pageId);
  219. // const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  220. // await this.mdb.flushDocument(pageId);
  221. // // If no write operation has been performed, insert initial value
  222. // const clientsSize = persistedYdoc.store.clients.size;
  223. // if (clientsSize === 0) {
  224. // currentYdoc.getText('codemirror').insert(0, initialValue);
  225. // }
  226. // const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
  227. // if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
  228. // this.mdb.storeUpdate(pageId, diff);
  229. // this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  230. // }
  231. // Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
  232. // currentYdoc.on('update', async(update) => {
  233. // this.mdb.storeUpdate(pageId, update);
  234. // this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  235. // });
  236. // currentYdoc.on('destroy', async() => {
  237. // this.mdb.flushDocument(pageId);
  238. // });
  239. // persistedYdoc.destroy();
  240. // }
  241. public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
  242. // TODO: https://redmine.weseek.co.jp/issues/132775
  243. // It's necessary to confirm that the user is not editing the target page in the Editor
  244. const currentYdoc = this.getCurrentYdoc(pageId);
  245. if (currentYdoc == null) {
  246. return;
  247. }
  248. const currentMarkdownLength = currentYdoc.getText('codemirror').length;
  249. currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
  250. currentYdoc.getText('codemirror').insert(0, newValue);
  251. Y.encodeStateAsUpdate(currentYdoc);
  252. }
  253. public getCurrentYdoc(pageId: string): Ydoc | undefined {
  254. const currentYdoc = this.ysocketio.documents.get(pageId);
  255. return currentYdoc;
  256. }
  257. }
  258. let _instance: YjsService;
  259. export const initializeYjsService = (io: Server): void => {
  260. if (_instance != null) {
  261. throw new Error('YjsService is already initialized');
  262. }
  263. if (io == null) {
  264. throw new Error("'io' is required if initialize YjsService");
  265. }
  266. _instance = new YjsService(io);
  267. };
  268. export const getYjsService = (): YjsService => {
  269. if (_instance == null) {
  270. throw new Error('YjsService is not initialized yet');
  271. }
  272. return _instance;
  273. };