yjs.ts 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. import type { IRevisionHasId } from '@growi/core';
  2. import { GlobalSocketEventName } from '@growi/core';
  3. import mongoose from 'mongoose';
  4. import type { Server } from 'socket.io';
  5. import { MongodbPersistence } from 'y-mongodb-provider';
  6. import type { Document } from 'y-socket.io/dist/server';
  7. import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
  8. import * as Y from 'yjs';
  9. import { SocketEventName } from '~/interfaces/websocket';
  10. import loggerFactory from '~/utils/logger';
  11. import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
  12. const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
  13. const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
  14. const logger = loggerFactory('growi:service:yjs');
  15. export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
  16. const result = ydocId.match(/yjs\/(.*)/);
  17. return result?.[1];
  18. };
  19. export interface IYjsService {
  20. hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean>;
  21. handleYDocSync(pageId: string, initialValue: string): Promise<void>;
  22. handleYDocUpdate(pageId: string, newValue: string): Promise<void>;
  23. getCurrentYdoc(pageId: string): Ydoc | undefined;
  24. getPersistedYdoc(pageId: string): Promise<Y.Doc>;
  25. }
  26. class YjsService implements IYjsService {
  27. private ysocketio: YSocketIO;
  28. private mdb: MongodbPersistence;
  29. constructor(io: Server) {
  30. const ysocketio = new YSocketIO(io);
  31. ysocketio.initialize();
  32. this.ysocketio = ysocketio;
  33. this.mdb = new MongodbPersistence(
  34. {
  35. // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
  36. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  37. client: mongoose.connection.getClient() as any,
  38. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  39. db: mongoose.connection.db as any,
  40. },
  41. {
  42. collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
  43. flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
  44. },
  45. );
  46. this.createIndexes();
  47. io.on('connection', (socket) => {
  48. ysocketio.on('awareness-update', async(doc: Document) => {
  49. const pageId = extractPageIdFromYdocId(doc.name);
  50. if (pageId == null) return;
  51. const awarenessStateSize = doc.awareness.states.size;
  52. // Triggered when awareness changes
  53. io
  54. .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  55. .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
  56. // Triggered when the last user leaves the editor
  57. if (awarenessStateSize === 0) {
  58. const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
  59. io
  60. .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  61. .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
  62. }
  63. });
  64. socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
  65. try {
  66. await this.handleYDocSync(pageId, initialValue);
  67. }
  68. catch (error) {
  69. logger.warn(error.message);
  70. socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
  71. }
  72. });
  73. });
  74. }
  75. private async createIndexes(): Promise<void> {
  76. const collection = mongoose.connection.collection(MONGODB_PERSISTENCE_COLLECTION_NAME);
  77. try {
  78. await collection.createIndexes([
  79. {
  80. key: {
  81. version: 1,
  82. docName: 1,
  83. action: 1,
  84. clock: 1,
  85. part: 1,
  86. },
  87. },
  88. // for metaKey
  89. {
  90. key: {
  91. version: 1,
  92. docName: 1,
  93. metaKey: 1,
  94. },
  95. },
  96. // for flushDocument / clearDocument
  97. {
  98. key: {
  99. docName: 1,
  100. clock: 1,
  101. },
  102. },
  103. ]);
  104. }
  105. catch (err) {
  106. logger.error('Failed to create Index', err);
  107. throw err;
  108. }
  109. }
  110. public async hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean> {
  111. // get the latest revision createdAt
  112. const Revision = mongoose.model<IRevisionHasId>('Revision');
  113. const result = await Revision
  114. .findOne(
  115. // filter
  116. { pageId },
  117. // projection
  118. { createdAt: 1 },
  119. { sort: { createdAt: -1 } },
  120. );
  121. const lastRevisionCreatedAt = (result == null)
  122. ? 0
  123. : result.createdAt.getTime();
  124. // count yjs-writings documents with updatedAt > latestRevision.updatedAt
  125. const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
  126. return ydocUpdatedAt == null
  127. ? false
  128. : ydocUpdatedAt > lastRevisionCreatedAt;
  129. }
  130. public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
  131. const currentYdoc = this.getCurrentYdoc(pageId);
  132. if (currentYdoc == null) {
  133. return;
  134. }
  135. const persistedYdoc = await this.getPersistedYdoc(pageId);
  136. const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  137. await this.mdb.flushDocument(pageId);
  138. // If no write operation has been performed, insert initial value
  139. const clientsSize = persistedYdoc.store.clients.size;
  140. if (clientsSize === 0) {
  141. currentYdoc.getText('codemirror').insert(0, initialValue);
  142. }
  143. const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
  144. if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
  145. this.mdb.storeUpdate(pageId, diff);
  146. this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  147. }
  148. Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
  149. currentYdoc.on('update', async(update) => {
  150. this.mdb.storeUpdate(pageId, update);
  151. this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  152. });
  153. currentYdoc.on('destroy', async() => {
  154. this.mdb.flushDocument(pageId);
  155. });
  156. persistedYdoc.destroy();
  157. }
  158. public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
  159. // TODO: https://redmine.weseek.co.jp/issues/132775
  160. // It's necessary to confirm that the user is not editing the target page in the Editor
  161. const currentYdoc = this.getCurrentYdoc(pageId);
  162. if (currentYdoc == null) {
  163. return;
  164. }
  165. const currentMarkdownLength = currentYdoc.getText('codemirror').length;
  166. currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
  167. currentYdoc.getText('codemirror').insert(0, newValue);
  168. Y.encodeStateAsUpdate(currentYdoc);
  169. }
  170. public getCurrentYdoc(pageId: string): Ydoc | undefined {
  171. const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
  172. return currentYdoc;
  173. }
  174. public async getPersistedYdoc(pageId: string): Promise<Y.Doc> {
  175. const persistedYdoc = await this.mdb.getYDoc(pageId);
  176. return persistedYdoc;
  177. }
  178. }
  179. let _instance: YjsService;
  180. export const initializeYjsService = (io: Server): void => {
  181. if (_instance != null) {
  182. throw new Error('YjsService is already initialized');
  183. }
  184. if (io == null) {
  185. throw new Error("'io' is required if initialize YjsService");
  186. }
  187. _instance = new YjsService(io);
  188. };
  189. export const getYjsService = (): YjsService => {
  190. if (_instance == null) {
  191. throw new Error('YjsService is not initialized yet');
  192. }
  193. return _instance;
  194. };