yjs.ts 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import type http from 'node:http';
  2. import { YDocStatus, YJS_WEBSOCKET_BASE_PATH } from '@growi/core/dist/consts';
  3. import mongoose from 'mongoose';
  4. import type { Server } from 'socket.io';
  5. import { WebSocketServer } from 'ws';
  6. import type { WSSharedDoc } from 'y-websocket/bin/utils';
  7. import { docs, setPersistence, setupWSConnection } from 'y-websocket/bin/utils';
  8. import type { SessionConfig } from '~/interfaces/session-config';
  9. import type { SyncLatestRevisionBody } from '~/interfaces/yjs';
  10. import loggerFactory from '~/utils/logger';
  11. import { Revision } from '../../models/revision';
  12. import { normalizeLatestRevisionIfBroken } from '../revision/normalize-latest-revision-if-broken';
  13. import { createIndexes } from './create-indexes';
  14. import { createMongoDBPersistence } from './create-mongodb-persistence';
  15. import { MongodbPersistence } from './extended/mongodb-persistence';
  16. import { guardSocket } from './guard-socket';
  17. import { syncYDoc } from './sync-ydoc';
  18. import { createUpgradeHandler } from './upgrade-handler';
  19. const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
  20. const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
  21. const YJS_PATH_PREFIX = `${YJS_WEBSOCKET_BASE_PATH}/`;
  22. const logger = loggerFactory('growi:service:yjs');
  23. export interface IYjsService {
  24. getYDocStatus(pageId: string): Promise<YDocStatus>;
  25. syncWithTheLatestRevisionForce(
  26. pageId: string,
  27. editingMarkdownLength?: number,
  28. ): Promise<SyncLatestRevisionBody>;
  29. getCurrentYdoc(pageId: string): WSSharedDoc | undefined;
  30. }
  31. class YjsService implements IYjsService {
  32. private mdb: MongodbPersistence;
  33. constructor(
  34. httpServer: http.Server,
  35. io: Server,
  36. sessionConfig: SessionConfig,
  37. ) {
  38. const mdb = new MongodbPersistence(
  39. {
  40. // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
  41. client: mongoose.connection.getClient() as any,
  42. db: mongoose.connection.db as any,
  43. },
  44. {
  45. collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
  46. flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
  47. },
  48. );
  49. this.mdb = mdb;
  50. // create indexes
  51. createIndexes(MONGODB_PERSISTENCE_COLLECTION_NAME);
  52. // setup y-websocket persistence (includes awareness bridge and sync-on-load)
  53. const persistence = createMongoDBPersistence(mdb, io, syncYDoc, (pageId) =>
  54. this.getYDocStatus(pageId),
  55. );
  56. setPersistence(persistence);
  57. // setup WebSocket server
  58. const wss = new WebSocketServer({ noServer: true });
  59. const handleUpgrade = createUpgradeHandler(sessionConfig);
  60. httpServer.on('upgrade', async (request, socket, head) => {
  61. const url = request.url ?? '';
  62. // Only handle /yjs/ paths; let Socket.IO and others pass through
  63. if (!url.startsWith(YJS_PATH_PREFIX)) {
  64. return;
  65. }
  66. // Guard the socket against being closed by other upgrade handlers
  67. // (e.g. Next.js's NextCustomServer.upgradeHandler) that run synchronously
  68. // after this async handler yields at the first await.
  69. const guard = guardSocket(socket);
  70. try {
  71. const result = await handleUpgrade(request, socket, head);
  72. // Restore original socket methods now that all synchronous
  73. // upgrade handlers have finished
  74. guard.restore();
  75. if (!result.authorized) {
  76. // rejectUpgrade already wrote the HTTP error response but
  77. // socket.destroy() was a no-op during the guard; clean up now
  78. socket.destroy();
  79. return;
  80. }
  81. wss.handleUpgrade(result.request, socket, head, (ws) => {
  82. wss.emit('connection', ws, result.request);
  83. setupWSConnection(ws, result.request, { docName: result.pageId });
  84. });
  85. } catch (err) {
  86. guard.restore();
  87. logger.error({ url, err }, 'Yjs upgrade handler failed unexpectedly');
  88. if (socket.writable) {
  89. socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n');
  90. }
  91. socket.destroy();
  92. }
  93. });
  94. logger.info('YjsService initialized with y-websocket');
  95. }
  96. public async getYDocStatus(pageId: string): Promise<YDocStatus> {
  97. const dumpLog = (status: YDocStatus, args?: { [key: string]: unknown }) => {
  98. logger.debug(
  99. args ?? {},
  100. `getYDocStatus('${pageId}') detected '${status}'`,
  101. );
  102. };
  103. // Normalize the latest revision which was borken by the migration script '20211227060705-revision-path-to-page-id-schema-migration--fixed-7549.js' provided by v6.1.0 - v7.0.15
  104. await normalizeLatestRevisionIfBroken(pageId);
  105. // get the latest revision createdAt
  106. const result = await Revision.findOne(
  107. // filter
  108. { pageId },
  109. // projection
  110. { createdAt: 1 },
  111. { sort: { createdAt: -1 } },
  112. ).lean();
  113. if (result == null) {
  114. dumpLog(YDocStatus.ISOLATED, { result });
  115. return YDocStatus.ISOLATED;
  116. }
  117. // count yjs-writings documents with updatedAt > latestRevision.updatedAt
  118. const ydocUpdatedAt = await this.mdb.getTypedMeta(pageId, 'updatedAt');
  119. if (ydocUpdatedAt == null) {
  120. dumpLog(YDocStatus.NEW);
  121. return YDocStatus.NEW;
  122. }
  123. const { createdAt } = result;
  124. const lastRevisionCreatedAt = createdAt.getTime();
  125. if (lastRevisionCreatedAt < ydocUpdatedAt) {
  126. dumpLog(YDocStatus.DRAFT, { lastRevisionCreatedAt, ydocUpdatedAt });
  127. return YDocStatus.DRAFT;
  128. }
  129. if (lastRevisionCreatedAt === ydocUpdatedAt) {
  130. dumpLog(YDocStatus.SYNCED, { lastRevisionCreatedAt, ydocUpdatedAt });
  131. return YDocStatus.SYNCED;
  132. }
  133. dumpLog(YDocStatus.OUTDATED, { lastRevisionCreatedAt, ydocUpdatedAt });
  134. return YDocStatus.OUTDATED;
  135. }
  136. public async syncWithTheLatestRevisionForce(
  137. pageId: string,
  138. editingMarkdownLength?: number,
  139. ): Promise<SyncLatestRevisionBody> {
  140. const doc = docs.get(pageId);
  141. if (doc == null) {
  142. return { synced: false };
  143. }
  144. const ytextLength = doc.getText('codemirror').length;
  145. await syncYDoc(this.mdb, doc, true);
  146. return {
  147. synced: true,
  148. isYjsDataBroken:
  149. editingMarkdownLength != null
  150. ? editingMarkdownLength !== ytextLength
  151. : undefined,
  152. };
  153. }
  154. public getCurrentYdoc(pageId: string): WSSharedDoc | undefined {
  155. return docs.get(pageId);
  156. }
  157. }
  158. let _instance: YjsService;
  159. export const initializeYjsService = (
  160. httpServer: http.Server,
  161. io: Server,
  162. sessionConfig: SessionConfig,
  163. ): void => {
  164. if (_instance != null) {
  165. throw new Error('YjsService is already initialized');
  166. }
  167. _instance = new YjsService(httpServer, io, sessionConfig);
  168. };
  169. export const getYjsService = (): YjsService => {
  170. if (_instance == null) {
  171. throw new Error('YjsService is not initialized yet');
  172. }
  173. return _instance;
  174. };