yjs.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. import type { IRevisionHasId } from '@growi/core';
  2. import { GlobalSocketEventName } from '@growi/core';
  3. import mongoose from 'mongoose';
  4. import type { Server } from 'socket.io';
  5. import { MongodbPersistence } from 'y-mongodb-provider';
  6. import type { Document, Persistence } from 'y-socket.io/dist/server';
  7. import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
  8. import * as Y from 'yjs';
  9. import { SocketEventName } from '~/interfaces/websocket';
  10. import loggerFactory from '~/utils/logger';
  11. import { Revision } from '../models/revision';
  12. import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
  13. const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
  14. const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
  15. const logger = loggerFactory('growi:service:yjs');
  16. // export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
  17. // const result = ydocId.match(/yjs\/(.*)/);
  18. // return result?.[1];
  19. // };
  20. export interface IYjsService {
  21. hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean>;
  22. // handleYDocSync(pageId: string, initialValue: string): Promise<void>;
  23. handleYDocUpdate(pageId: string, newValue: string): Promise<void>;
  24. getCurrentYdoc(pageId: string): Ydoc | undefined;
  25. }
  26. class YjsService implements IYjsService {
  27. private ysocketio: YSocketIO;
  28. private mdb: MongodbPersistence;
  29. constructor(io: Server) {
  30. const mdb = new MongodbPersistence(
  31. // ignore TS2345: Argument of type '{ client: any; db: any; }' is not assignable to parameter of type 'string'.
  32. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  33. // @ts-ignore
  34. {
  35. // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
  36. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  37. client: mongoose.connection.getClient() as any,
  38. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  39. db: mongoose.connection.db as any,
  40. },
  41. {
  42. collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
  43. flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
  44. },
  45. );
  46. this.mdb = mdb;
  47. const ysocketio = new YSocketIO(io);
  48. this.injectPersistence(ysocketio, mdb);
  49. ysocketio.initialize();
  50. this.ysocketio = ysocketio;
  51. this.createIndexes();
  52. ysocketio.on('document-loaded', async(doc: Document) => {
  53. // const pageId = extractPageIdFromYdocId(doc.name);
  54. const pageId = doc.name;
  55. if (pageId != null && !await this.hasYdocsNewerThanLatestRevision(pageId)) {
  56. logger.debug(`YDoc for the page ('${pageId}') is initialized by the latest revision body`);
  57. const revision = await Revision.findOne({ pageId });
  58. if (revision?.body != null) {
  59. doc.getText('codemirror').insert(0, revision.body);
  60. }
  61. }
  62. });
  63. // io.on('connection', (socket) => {
  64. // ysocketio.on('awareness-update', async(doc: Document) => {
  65. // const pageId = extractPageIdFromYdocId(doc.name);
  66. // if (pageId == null) return;
  67. // const awarenessStateSize = doc.awareness.states.size;
  68. // // Triggered when awareness changes
  69. // io
  70. // .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  71. // .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
  72. // // Triggered when the last user leaves the editor
  73. // if (awarenessStateSize === 0) {
  74. // const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
  75. // io
  76. // .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  77. // .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
  78. // }
  79. // });
  80. // socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
  81. // try {
  82. // await this.handleYDocSync(pageId, initialValue);
  83. // }
  84. // catch (error) {
  85. // logger.warn(error.message);
  86. // socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
  87. // }
  88. // });
  89. // });
  90. }
  91. private injectPersistence(ysocketio: YSocketIO, mdb: MongodbPersistence): void {
  92. const persistece: Persistence = {
  93. provider: mdb,
  94. bindState: async(docName, ydoc) => {
  95. logger.debug('bindState', { docName });
  96. const persistedYdoc = await mdb.getYDoc(docName);
  97. // get the state vector so we can just store the diffs between client and server
  98. const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  99. /* we could also retrieve that sv with a mdb function
  100. * however this takes longer;
  101. * it would also flush the document (which merges all updates into one)
  102. * thats prob a good thing, which is why we always do this on document close (see writeState)
  103. */
  104. // const persistedStateVector = await mdb.getStateVector(docName);
  105. // in the default code the following value gets saved in the db
  106. // this however leads to the case that multiple complete Y.Docs are saved in the db (https://github.com/fadiquader/y-mongodb/issues/7)
  107. // const newUpdates = Y.encodeStateAsUpdate(ydoc);
  108. // better just get the differences and save those:
  109. const diff = Y.encodeStateAsUpdate(ydoc, persistedStateVector);
  110. // store the new data in db (if there is any: empty update is an array of 0s)
  111. if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
  112. mdb.storeUpdate(docName, diff);
  113. mdb.setMeta(docName, 'updatedAt', Date.now());
  114. }
  115. // send the persisted data to clients
  116. Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
  117. // store updates of the document in db
  118. ydoc.on('update', async(update) => {
  119. mdb.storeUpdate(docName, update);
  120. mdb.setMeta(docName, 'updatedAt', Date.now());
  121. });
  122. // cleanup some memory
  123. persistedYdoc.destroy();
  124. },
  125. writeState: async(docName) => {
  126. logger.debug('writeState', { docName });
  127. // This is called when all connections to the document are closed.
  128. // flush document on close to have the smallest possible database
  129. await mdb.flushDocument(docName);
  130. },
  131. };
  132. // foce set to private property
  133. // eslint-disable-next-line dot-notation
  134. ysocketio['persistence'] = persistece;
  135. }
  136. private async createIndexes(): Promise<void> {
  137. const collection = mongoose.connection.collection(MONGODB_PERSISTENCE_COLLECTION_NAME);
  138. try {
  139. await collection.createIndexes([
  140. {
  141. key: {
  142. version: 1,
  143. docName: 1,
  144. action: 1,
  145. clock: 1,
  146. part: 1,
  147. },
  148. },
  149. // for metaKey
  150. {
  151. key: {
  152. version: 1,
  153. docName: 1,
  154. metaKey: 1,
  155. },
  156. },
  157. // for flushDocument / clearDocument
  158. {
  159. key: {
  160. docName: 1,
  161. clock: 1,
  162. },
  163. },
  164. ]);
  165. }
  166. catch (err) {
  167. logger.error('Failed to create Index', err);
  168. throw err;
  169. }
  170. }
  171. public async getYDocStatus(pageId: string): Promise<YDocStatus> {
  172. const dumpLog = (status: YDocStatus, args?: { [key: string]: number }) => {
  173. logger.debug(`getYDocStatus('${pageId}') detected '${status}'`, args ?? {});
  174. };
  175. // get the latest revision createdAt
  176. const result = await Revision
  177. .findOne(
  178. // filter
  179. { pageId },
  180. // projection
  181. { createdAt: 1 },
  182. { sort: { createdAt: -1 } },
  183. );
  184. if (result == null) {
  185. dumpLog(YDocStatus.ISOLATED);
  186. return YDocStatus.ISOLATED;
  187. }
  188. // count yjs-writings documents with updatedAt > latestRevision.updatedAt
  189. const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
  190. if (ydocUpdatedAt == null) {
  191. dumpLog(YDocStatus.NEW);
  192. return YDocStatus.NEW;
  193. }
  194. const { createdAt } = result;
  195. const lastRevisionCreatedAt = createdAt.getTime();
  196. if (lastRevisionCreatedAt < ydocUpdatedAt) {
  197. dumpLog(YDocStatus.DRAFT, { lastRevisionCreatedAt, ydocUpdatedAt });
  198. return YDocStatus.DRAFT;
  199. }
  200. if (lastRevisionCreatedAt === ydocUpdatedAt) {
  201. dumpLog(YDocStatus.SYNCED, { lastRevisionCreatedAt, ydocUpdatedAt });
  202. return YDocStatus.SYNCED;
  203. }
  204. dumpLog(YDocStatus.OUTDATED, { lastRevisionCreatedAt, ydocUpdatedAt });
  205. return YDocStatus.OUTDATED;
  206. }
  207. // public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
  208. // const currentYdoc = this.getCurrentYdoc(pageId);
  209. // if (currentYdoc == null) {
  210. // return;
  211. // }
  212. // const persistedYdoc = await this.getPersistedYdoc(pageId);
  213. // const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  214. // await this.mdb.flushDocument(pageId);
  215. // // If no write operation has been performed, insert initial value
  216. // const clientsSize = persistedYdoc.store.clients.size;
  217. // if (clientsSize === 0) {
  218. // currentYdoc.getText('codemirror').insert(0, initialValue);
  219. // }
  220. // const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
  221. // if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
  222. // this.mdb.storeUpdate(pageId, diff);
  223. // this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  224. // }
  225. // Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
  226. // currentYdoc.on('update', async(update) => {
  227. // this.mdb.storeUpdate(pageId, update);
  228. // this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  229. // });
  230. // currentYdoc.on('destroy', async() => {
  231. // this.mdb.flushDocument(pageId);
  232. // });
  233. // persistedYdoc.destroy();
  234. // }
  235. public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
  236. // TODO: https://redmine.weseek.co.jp/issues/132775
  237. // It's necessary to confirm that the user is not editing the target page in the Editor
  238. const currentYdoc = this.getCurrentYdoc(pageId);
  239. if (currentYdoc == null) {
  240. return;
  241. }
  242. const currentMarkdownLength = currentYdoc.getText('codemirror').length;
  243. currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
  244. currentYdoc.getText('codemirror').insert(0, newValue);
  245. Y.encodeStateAsUpdate(currentYdoc);
  246. }
  247. public getCurrentYdoc(pageId: string): Ydoc | undefined {
  248. const currentYdoc = this.ysocketio.documents.get(pageId);
  249. return currentYdoc;
  250. }
  251. }
  252. let _instance: YjsService;
  253. export const initializeYjsService = (io: Server): void => {
  254. if (_instance != null) {
  255. throw new Error('YjsService is already initialized');
  256. }
  257. if (io == null) {
  258. throw new Error("'io' is required if initialize YjsService");
  259. }
  260. _instance = new YjsService(io);
  261. };
  262. export const getYjsService = (): YjsService => {
  263. if (_instance == null) {
  264. throw new Error('YjsService is not initialized yet');
  265. }
  266. return _instance;
  267. };