yjs.ts 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import type http from 'node:http';
  2. import { YDocStatus } from '@growi/core/dist/consts';
  3. import mongoose from 'mongoose';
  4. import type { Server } from 'socket.io';
  5. import { WebSocketServer } from 'ws';
  6. import type { WSSharedDoc } from 'y-websocket/bin/utils';
  7. import { docs, setPersistence, setupWSConnection } from 'y-websocket/bin/utils';
  8. import type { SessionConfig } from '~/interfaces/session-config';
  9. import type { SyncLatestRevisionBody } from '~/interfaces/yjs';
  10. import loggerFactory from '~/utils/logger';
  11. import { Revision } from '../../models/revision';
  12. import { normalizeLatestRevisionIfBroken } from '../revision/normalize-latest-revision-if-broken';
  13. import { createIndexes } from './create-indexes';
  14. import { createMongoDBPersistence } from './create-mongodb-persistence';
  15. import { MongodbPersistence } from './extended/mongodb-persistence';
  16. import { syncYDoc } from './sync-ydoc';
  17. import { createUpgradeHandler } from './upgrade-handler';
  18. const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
  19. const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
  20. const YJS_PATH_PREFIX = '/yjs/';
  21. const logger = loggerFactory('growi:service:yjs');
  22. export interface IYjsService {
  23. getYDocStatus(pageId: string): Promise<YDocStatus>;
  24. syncWithTheLatestRevisionForce(
  25. pageId: string,
  26. editingMarkdownLength?: number,
  27. ): Promise<SyncLatestRevisionBody>;
  28. getCurrentYdoc(pageId: string): WSSharedDoc | undefined;
  29. }
  30. class YjsService implements IYjsService {
  31. private mdb: MongodbPersistence;
  32. constructor(
  33. httpServer: http.Server,
  34. io: Server,
  35. sessionConfig: SessionConfig,
  36. ) {
  37. const mdb = new MongodbPersistence(
  38. {
  39. // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
  40. client: mongoose.connection.getClient() as any,
  41. db: mongoose.connection.db as any,
  42. },
  43. {
  44. collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
  45. flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
  46. },
  47. );
  48. this.mdb = mdb;
  49. // create indexes
  50. createIndexes(MONGODB_PERSISTENCE_COLLECTION_NAME);
  51. // setup y-websocket persistence (includes awareness bridge and sync-on-load)
  52. const persistence = createMongoDBPersistence(mdb, io, syncYDoc, (pageId) =>
  53. this.getYDocStatus(pageId),
  54. );
  55. setPersistence(persistence);
  56. // setup WebSocket server
  57. const wss = new WebSocketServer({ noServer: true });
  58. const handleUpgrade = createUpgradeHandler(sessionConfig);
  59. httpServer.on('upgrade', async (request, socket, head) => {
  60. const url = request.url ?? '';
  61. // Only handle /yjs/ paths; let Socket.IO and others pass through
  62. if (!url.startsWith(YJS_PATH_PREFIX)) {
  63. return;
  64. }
  65. const result = await handleUpgrade(request, socket, head);
  66. if (!result.authorized) {
  67. return;
  68. }
  69. wss.handleUpgrade(result.request, socket, head, (ws) => {
  70. wss.emit('connection', ws, result.request);
  71. setupWSConnection(ws, result.request, { docName: result.pageId });
  72. });
  73. });
  74. logger.info('YjsService initialized with y-websocket');
  75. }
  76. public async getYDocStatus(pageId: string): Promise<YDocStatus> {
  77. const dumpLog = (status: YDocStatus, args?: { [key: string]: unknown }) => {
  78. logger.debug(
  79. `getYDocStatus('${pageId}') detected '${status}'`,
  80. args ?? {},
  81. );
  82. };
  83. // Normalize the latest revision which was borken by the migration script '20211227060705-revision-path-to-page-id-schema-migration--fixed-7549.js' provided by v6.1.0 - v7.0.15
  84. await normalizeLatestRevisionIfBroken(pageId);
  85. // get the latest revision createdAt
  86. const result = await Revision.findOne(
  87. // filter
  88. { pageId },
  89. // projection
  90. { createdAt: 1 },
  91. { sort: { createdAt: -1 } },
  92. ).lean();
  93. if (result == null) {
  94. dumpLog(YDocStatus.ISOLATED, { result });
  95. return YDocStatus.ISOLATED;
  96. }
  97. // count yjs-writings documents with updatedAt > latestRevision.updatedAt
  98. const ydocUpdatedAt = await this.mdb.getTypedMeta(pageId, 'updatedAt');
  99. if (ydocUpdatedAt == null) {
  100. dumpLog(YDocStatus.NEW);
  101. return YDocStatus.NEW;
  102. }
  103. const { createdAt } = result;
  104. const lastRevisionCreatedAt = createdAt.getTime();
  105. if (lastRevisionCreatedAt < ydocUpdatedAt) {
  106. dumpLog(YDocStatus.DRAFT, { lastRevisionCreatedAt, ydocUpdatedAt });
  107. return YDocStatus.DRAFT;
  108. }
  109. if (lastRevisionCreatedAt === ydocUpdatedAt) {
  110. dumpLog(YDocStatus.SYNCED, { lastRevisionCreatedAt, ydocUpdatedAt });
  111. return YDocStatus.SYNCED;
  112. }
  113. dumpLog(YDocStatus.OUTDATED, { lastRevisionCreatedAt, ydocUpdatedAt });
  114. return YDocStatus.OUTDATED;
  115. }
  116. public async syncWithTheLatestRevisionForce(
  117. pageId: string,
  118. editingMarkdownLength?: number,
  119. ): Promise<SyncLatestRevisionBody> {
  120. const doc = docs.get(pageId);
  121. if (doc == null) {
  122. return { synced: false };
  123. }
  124. const ytextLength = doc.getText('codemirror').length;
  125. await syncYDoc(this.mdb, doc, true);
  126. return {
  127. synced: true,
  128. isYjsDataBroken:
  129. editingMarkdownLength != null
  130. ? editingMarkdownLength !== ytextLength
  131. : undefined,
  132. };
  133. }
  134. public getCurrentYdoc(pageId: string): WSSharedDoc | undefined {
  135. return docs.get(pageId);
  136. }
  137. }
  138. let _instance: YjsService;
  139. export const initializeYjsService = (
  140. httpServer: http.Server,
  141. io: Server,
  142. sessionConfig: SessionConfig,
  143. ): void => {
  144. if (_instance != null) {
  145. throw new Error('YjsService is already initialized');
  146. }
  147. if (httpServer == null) {
  148. throw new Error("'httpServer' is required to initialize YjsService");
  149. }
  150. if (io == null) {
  151. throw new Error("'io' is required to initialize YjsService");
  152. }
  153. _instance = new YjsService(httpServer, io, sessionConfig);
  154. };
  155. export const getYjsService = (): YjsService => {
  156. if (_instance == null) {
  157. throw new Error('YjsService is not initialized yet');
  158. }
  159. return _instance;
  160. };