yjs.ts 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import type { IRevisionHasId } from '@growi/core';
  2. import { GlobalSocketEventName } from '@growi/core';
  3. import mongoose from 'mongoose';
  4. import type { Server } from 'socket.io';
  5. import { MongodbPersistence } from 'y-mongodb-provider';
  6. import type { Document } from 'y-socket.io/dist/server';
  7. import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
  8. import * as Y from 'yjs';
  9. import { SocketEventName } from '~/interfaces/websocket';
  10. import loggerFactory from '~/utils/logger';
  11. import { getMongoUri } from '../util/mongoose-utils';
  12. import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
  13. const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
  14. const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
  15. const logger = loggerFactory('growi:service:yjs');
  16. export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
  17. const result = ydocId.match(/yjs\/(.*)/);
  18. return result?.[1];
  19. };
  20. class YjsService {
  21. private ysocketio: YSocketIO;
  22. private mdb: MongodbPersistence;
  23. constructor(io: Server) {
  24. const ysocketio = new YSocketIO(io);
  25. ysocketio.initialize();
  26. this.ysocketio = ysocketio;
  27. this.mdb = new MongodbPersistence(getMongoUri(), {
  28. collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
  29. flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
  30. });
  31. this.createIndexes();
  32. io.on('connection', (socket) => {
  33. ysocketio.on('awareness-update', async(doc: Document) => {
  34. const pageId = extractPageIdFromYdocId(doc.name);
  35. if (pageId == null) return;
  36. const awarenessStateSize = doc.awareness.states.size;
  37. // Triggered when awareness changes
  38. io
  39. .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  40. .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
  41. // Triggered when the last user leaves the editor
  42. if (awarenessStateSize === 0) {
  43. const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
  44. io
  45. .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
  46. .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
  47. }
  48. });
  49. socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
  50. try {
  51. await this.handleYDocSync(pageId, initialValue);
  52. }
  53. catch (error) {
  54. logger.warn(error.message);
  55. socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
  56. }
  57. });
  58. });
  59. }
  60. private async createIndexes(): Promise<void> {
  61. const collection = mongoose.connection.collection(MONGODB_PERSISTENCE_COLLECTION_NAME);
  62. try {
  63. await collection.createIndexes([
  64. {
  65. key: {
  66. version: 1,
  67. docName: 1,
  68. action: 1,
  69. clock: 1,
  70. part: 1,
  71. },
  72. },
  73. // for metaKey
  74. {
  75. key: {
  76. version: 1,
  77. docName: 1,
  78. metaKey: 1,
  79. },
  80. },
  81. // for flushDocument / clearDocument
  82. {
  83. key: {
  84. docName: 1,
  85. clock: 1,
  86. },
  87. },
  88. ]);
  89. }
  90. catch (err) {
  91. logger.error('Failed to create Index', err);
  92. throw err;
  93. }
  94. }
  95. public async hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean> {
  96. // get the latest revision createdAt
  97. const Revision = mongoose.model<IRevisionHasId>('Revision');
  98. const result = await Revision
  99. .findOne(
  100. { pageId },
  101. { createdAt: 1 },
  102. { sort: { createdAt: -1 } },
  103. );
  104. const lastRevisionCreatedAt = (result == null)
  105. ? 0
  106. : result.createdAt.getTime();
  107. // count yjs-writings documents with updatedAt > latestRevision.updatedAt
  108. const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
  109. return ydocUpdatedAt == null
  110. ? false
  111. : ydocUpdatedAt > lastRevisionCreatedAt;
  112. }
  113. public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
  114. const currentYdoc = this.getCurrentYdoc(pageId);
  115. if (currentYdoc == null) {
  116. return;
  117. }
  118. const persistedYdoc = await this.getPersistedYdoc(pageId);
  119. const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  120. await this.mdb.flushDocument(pageId);
  121. // If no write operation has been performed, insert initial value
  122. const clientsSize = persistedYdoc.store.clients.size;
  123. if (clientsSize === 0) {
  124. currentYdoc.getText('codemirror').insert(0, initialValue);
  125. }
  126. const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
  127. if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
  128. this.mdb.storeUpdate(pageId, diff);
  129. this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  130. }
  131. Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
  132. currentYdoc.on('update', async(update) => {
  133. this.mdb.storeUpdate(pageId, update);
  134. this.mdb.setMeta(pageId, 'updatedAt', Date.now());
  135. });
  136. currentYdoc.on('destroy', async() => {
  137. this.mdb.flushDocument(pageId);
  138. });
  139. persistedYdoc.destroy();
  140. }
  141. public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
  142. // TODO: https://redmine.weseek.co.jp/issues/132775
  143. // It's necessary to confirm that the user is not editing the target page in the Editor
  144. const currentYdoc = this.getCurrentYdoc(pageId);
  145. if (currentYdoc == null) {
  146. return;
  147. }
  148. const currentMarkdownLength = currentYdoc.getText('codemirror').length;
  149. currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
  150. currentYdoc.getText('codemirror').insert(0, newValue);
  151. Y.encodeStateAsUpdate(currentYdoc);
  152. }
  153. public getCurrentYdoc(pageId: string): Ydoc | undefined {
  154. const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
  155. return currentYdoc;
  156. }
  157. public async getPersistedYdoc(pageId: string): Promise<Y.Doc> {
  158. const persistedYdoc = await this.mdb.getYDoc(pageId);
  159. return persistedYdoc;
  160. }
  161. }
  162. let _instance: YjsService;
  163. export const initializeYjsService = (io: Server): void => {
  164. if (_instance != null) {
  165. throw new Error('YjsService is already initialized');
  166. }
  167. if (io == null) {
  168. throw new Error("'io' is required if initialize YjsService");
  169. }
  170. _instance = new YjsService(io);
  171. };
  172. export const getYjsService = (): YjsService => {
  173. if (_instance == null) {
  174. throw new Error('YjsService is not initialized yet');
  175. }
  176. return _instance;
  177. };