create-mongodb-persistence.ts 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import { YDocStatus } from '@growi/core/dist/consts';
  2. import type { Server } from 'socket.io';
  3. import type { WSSharedDoc, YWebsocketPersistence } from 'y-websocket/bin/utils';
  4. import * as Y from 'yjs';
  5. import { SocketEventName } from '~/interfaces/websocket';
  6. import {
  7. getRoomNameWithId,
  8. RoomPrefix,
  9. } from '~/server/service/socket-io/helper';
  10. import loggerFactory from '~/utils/logger';
  11. import type { MongodbPersistence } from './extended/mongodb-persistence';
  12. import type { syncYDoc as syncYDocType } from './sync-ydoc';
  13. const logger = loggerFactory('growi:service:yjs:create-mongodb-persistence');
  14. type GetYDocStatus = (pageId: string) => Promise<YDocStatus>;
  15. /**
  16. * Creates a y-websocket compatible persistence layer backed by MongoDB.
  17. *
  18. * bindState also handles:
  19. * - sync-on-load (syncYDoc) after persisted state is applied
  20. * - awareness event bridge to Socket.IO rooms
  21. */
  22. export const createMongoDBPersistence = (
  23. mdb: MongodbPersistence,
  24. io: Server,
  25. syncYDoc: typeof syncYDocType,
  26. getYDocStatus: GetYDocStatus,
  27. ): YWebsocketPersistence => {
  28. const persistence: YWebsocketPersistence = {
  29. provider: mdb,
  30. bindState: async (docName: string, ydoc: WSSharedDoc) => {
  31. logger.debug({ docName }, 'bindState');
  32. const persistedYdoc = await mdb.getYDoc(docName);
  33. // get the state vector so we can just store the diffs between client and server
  34. const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  35. const diff = Y.encodeStateAsUpdate(ydoc, persistedStateVector);
  36. // store the new data in db (if there is any: empty update is an array of 0s)
  37. if (diff.some((b) => b !== 0)) {
  38. mdb.storeUpdate(docName, diff);
  39. mdb.setTypedMeta(docName, 'updatedAt', Date.now());
  40. }
  41. // send the persisted data to clients
  42. Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
  43. // cleanup some memory
  44. persistedYdoc.destroy();
  45. // sync with the latest revision after persisted state is applied
  46. const ydocStatus = await getYDocStatus(docName);
  47. syncYDoc(mdb, ydoc, { ydocStatus });
  48. // store updates of the document in db
  49. ydoc.on('update', (update: Uint8Array) => {
  50. mdb.storeUpdate(docName, update);
  51. mdb.setTypedMeta(docName, 'updatedAt', Date.now());
  52. });
  53. // register awareness event bridge to Socket.IO rooms
  54. // Only emit when the awareness state size actually changes (cursor moves
  55. // and other updates fire frequently but don't change the user count)
  56. let lastEmittedSize = -1;
  57. ydoc.awareness.on('update', async () => {
  58. const pageId = docName;
  59. const awarenessStateSize = ydoc.awareness.getStates().size;
  60. if (awarenessStateSize !== lastEmittedSize) {
  61. lastEmittedSize = awarenessStateSize;
  62. io.in(getRoomNameWithId(RoomPrefix.PAGE, pageId)).emit(
  63. SocketEventName.YjsAwarenessStateSizeUpdated,
  64. awarenessStateSize,
  65. );
  66. }
  67. // emit draft status when last user leaves
  68. if (awarenessStateSize === 0) {
  69. const status = await getYDocStatus(pageId);
  70. const hasYdocsNewerThanLatestRevision =
  71. status === YDocStatus.DRAFT || status === YDocStatus.ISOLATED;
  72. io.in(getRoomNameWithId(RoomPrefix.PAGE, pageId)).emit(
  73. SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated,
  74. hasYdocsNewerThanLatestRevision,
  75. );
  76. }
  77. });
  78. },
  79. writeState: async (docName: string) => {
  80. logger.debug({ docName }, 'writeState');
  81. // flush document on close to have the smallest possible database
  82. await mdb.flushDocument(docName);
  83. },
  84. };
  85. return persistence;
  86. };