Explorar o código

Merge pull request #8959 from weseek/imprv/yjs-performance

imprv: Yjs performance
Yuki Takei hai 1 ano
pai
achega
be679ab9c7

+ 2 - 0
apps/app/config/logger/config.dev.js

@@ -17,6 +17,8 @@ module.exports = {
   'growi:middleware:safe-redirect': 'debug',
   'growi:service:PassportService': 'debug',
   'growi:service:s2s-messaging:*': 'debug',
+  'growi:service:yjs': 'debug',
+  'growi:service:yjs:*': 'debug',
   // 'growi:service:socket-io': 'debug',
   // 'growi:service:ConfigManager': 'debug',
   // 'growi:service:mail': 'debug',

+ 4 - 1
apps/app/src/server/models/page.ts

@@ -67,7 +67,10 @@ export type CreateMethod = (path: string, body: string, user, options: IOptionsF
 
 export interface PageModel extends Model<PageDocument> {
   [x: string]: any; // for obsolete static methods
-  findByIdsAndViewer(pageIds: ObjectIdLike[], user, userGroups?, includeEmpty?: boolean, includeAnyoneWithTheLink?: boolean): Promise<PageDocument[]>
+  findByIdAndViewer(pageId: ObjectIdLike, user, userGroups?, includeEmpty?: boolean): Promise<PageDocument & HasObjectId>
+  findByIdsAndViewer(
+    pageIds: ObjectIdLike[], user, userGroups?, includeEmpty?: boolean, includeAnyoneWithTheLink?: boolean,
+  ): Promise<(PageDocument & HasObjectId)[]>
   findByPath(path: string, includeEmpty?: boolean): Promise<PageDocument | null>
   findByPathAndViewer(path: string | null, user, userGroups?, useFindOne?: true, includeEmpty?: boolean): Promise<PageDocument & HasObjectId | null>
   findByPathAndViewer(path: string | null, user, userGroups?, useFindOne?: false, includeEmpty?: boolean): Promise<(PageDocument & HasObjectId)[]>

+ 4 - 2
apps/app/src/server/routes/apiv3/page/index.ts

@@ -3,6 +3,7 @@ import path from 'path';
 import type { IPage } from '@growi/core';
 import {
   AllSubscriptionStatusType, PageGrant, SubscriptionStatusType,
+  getIdForRef,
 } from '@growi/core';
 import { ErrorV3 } from '@growi/core/dist/models';
 import { convertToNewAffiliationPath } from '@growi/core/dist/utils/page-path-utils';
@@ -592,7 +593,8 @@ module.exports = (crowi) => {
     } = page;
     let isGrantNormalized = false;
     try {
-      isGrantNormalized = await pageGrantService.isGrantNormalized(req.user, path, grant, grantedUsers, grantedGroups, false, false);
+      const grantedUsersId = grantedUsers.map(ref => getIdForRef(ref));
+      isGrantNormalized = await pageGrantService.isGrantNormalized(req.user, path, grant, grantedUsersId, grantedGroups, false, false);
     }
     catch (err) {
       logger.error('Error occurred while processing isGrantNormalized.', err);
@@ -615,7 +617,7 @@ module.exports = (crowi) => {
       return res.apiv3({ isGrantNormalized, grantData });
     }
 
-    const parentPage = await Page.findByIdAndViewer(page.parent, req.user, null, false);
+    const parentPage = await Page.findByIdAndViewer(getIdForRef(page.parent), req.user, null, false);
 
     // user isn't allowed to see parent's grant
     if (parentPage == null) {

+ 1 - 1
apps/app/src/server/routes/apiv3/page/update-page.ts

@@ -68,7 +68,7 @@ export const updatePageHandlersFactory: UpdatePageHandlersFactory = (crowi) => {
     const origin = req.body.origin;
     if (origin === Origin.View || origin === undefined) {
       const yjsService = getYjsService();
-      await yjsService.handleYDocUpdate(req.body.pageId, req.body.body);
+      await yjsService.syncWithTheLatestRevisionForce(req.body.pageId);
     }
 
     // persist activity

+ 3 - 2
apps/app/src/server/service/page/index.ts

@@ -7,7 +7,7 @@ import type {
   IPage, IPageInfo, IPageInfoAll, IPageInfoForEntity, IPageWithMeta, IGrantedGroup, IRevisionHasId,
 } from '@growi/core';
 import {
-  PageGrant, PageStatus, getIdForRef,
+  PageGrant, PageStatus, YDocStatus, getIdForRef,
 } from '@growi/core';
 import {
   pagePathUtils, pathUtils,
@@ -4438,7 +4438,8 @@ class PageService implements IPageService {
     const yjsService = getYjsService();
 
     const currentYdoc = yjsService.getCurrentYdoc(pageId);
-    const hasYdocsNewerThanLatestRevision = await yjsService.hasYdocsNewerThanLatestRevision(pageId);
+    const ydocStatus = await yjsService.getYDocStatus(pageId);
+    const hasYdocsNewerThanLatestRevision = ydocStatus === YDocStatus.DRAFT;
 
     return {
       hasYdocsNewerThanLatestRevision,

+ 0 - 247
apps/app/src/server/service/yjs.ts

@@ -1,247 +0,0 @@
-import type { IRevisionHasId } from '@growi/core';
-import { GlobalSocketEventName } from '@growi/core';
-import mongoose from 'mongoose';
-import type { Server } from 'socket.io';
-import { MongodbPersistence } from 'y-mongodb-provider';
-import type { Document } from 'y-socket.io/dist/server';
-import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
-import * as Y from 'yjs';
-
-import { SocketEventName } from '~/interfaces/websocket';
-import loggerFactory from '~/utils/logger';
-
-import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
-
-
-const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
-const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
-
-
-const logger = loggerFactory('growi:service:yjs');
-
-
-export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
-  const result = ydocId.match(/yjs\/(.*)/);
-  return result?.[1];
-};
-
-export interface IYjsService {
-  hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean>;
-  handleYDocSync(pageId: string, initialValue: string): Promise<void>;
-  handleYDocUpdate(pageId: string, newValue: string): Promise<void>;
-  getCurrentYdoc(pageId: string): Ydoc | undefined;
-  getPersistedYdoc(pageId: string): Promise<Y.Doc>;
-}
-
-class YjsService implements IYjsService {
-
-  private ysocketio: YSocketIO;
-
-  private mdb: MongodbPersistence;
-
-  constructor(io: Server) {
-    const ysocketio = new YSocketIO(io);
-    ysocketio.initialize();
-    this.ysocketio = ysocketio;
-
-    this.mdb = new MongodbPersistence(
-      // ignore TS2345: Argument of type '{ client: any; db: any; }' is not assignable to parameter of type 'string'.
-      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
-      // @ts-ignore
-      {
-        // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
-        // eslint-disable-next-line @typescript-eslint/no-explicit-any
-        client: mongoose.connection.getClient() as any,
-        // eslint-disable-next-line @typescript-eslint/no-explicit-any
-        db: mongoose.connection.db as any,
-      },
-      {
-        collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
-        flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
-      },
-    );
-
-    this.createIndexes();
-
-    io.on('connection', (socket) => {
-
-      ysocketio.on('awareness-update', async(doc: Document) => {
-        const pageId = extractPageIdFromYdocId(doc.name);
-
-        if (pageId == null) return;
-
-        const awarenessStateSize = doc.awareness.states.size;
-
-        // Triggered when awareness changes
-        io
-          .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
-          .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
-
-        // Triggered when the last user leaves the editor
-        if (awarenessStateSize === 0) {
-          const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
-          io
-            .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
-            .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
-        }
-      });
-
-      socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
-        try {
-          await this.handleYDocSync(pageId, initialValue);
-        }
-        catch (error) {
-          logger.warn(error.message);
-          socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
-        }
-      });
-    });
-  }
-
-  private async createIndexes(): Promise<void> {
-
-    const collection = mongoose.connection.collection(MONGODB_PERSISTENCE_COLLECTION_NAME);
-
-    try {
-      await collection.createIndexes([
-        {
-          key: {
-            version: 1,
-            docName: 1,
-            action: 1,
-            clock: 1,
-            part: 1,
-          },
-        },
-        // for metaKey
-        {
-          key: {
-            version: 1,
-            docName: 1,
-            metaKey: 1,
-          },
-        },
-        // for flushDocument / clearDocument
-        {
-          key: {
-            docName: 1,
-            clock: 1,
-          },
-        },
-      ]);
-    }
-    catch (err) {
-      logger.error('Failed to create Index', err);
-      throw err;
-    }
-  }
-
-  public async hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean> {
-    // get the latest revision createdAt
-    const Revision = mongoose.model<IRevisionHasId>('Revision');
-    const result = await Revision
-      .findOne(
-        // filter
-        { pageId },
-        // projection
-        { createdAt: 1 },
-        { sort: { createdAt: -1 } },
-      );
-
-    const lastRevisionCreatedAt = (result == null)
-      ? 0
-      : result.createdAt.getTime();
-
-    // count yjs-writings documents with updatedAt > latestRevision.updatedAt
-    const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
-
-    return ydocUpdatedAt == null
-      ? false
-      : ydocUpdatedAt > lastRevisionCreatedAt;
-  }
-
-  public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
-    const currentYdoc = this.getCurrentYdoc(pageId);
-    if (currentYdoc == null) {
-      return;
-    }
-
-    const persistedYdoc = await this.getPersistedYdoc(pageId);
-    const persistedStateVector = Y.encodeStateVector(persistedYdoc);
-
-    await this.mdb.flushDocument(pageId);
-
-    // If no write operation has been performed, insert initial value
-    const clientsSize = persistedYdoc.store.clients.size;
-    if (clientsSize === 0) {
-      currentYdoc.getText('codemirror').insert(0, initialValue);
-    }
-
-    const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
-
-    if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
-      this.mdb.storeUpdate(pageId, diff);
-      this.mdb.setMeta(pageId, 'updatedAt', Date.now());
-    }
-
-    Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
-
-    currentYdoc.on('update', async(update) => {
-      this.mdb.storeUpdate(pageId, update);
-      this.mdb.setMeta(pageId, 'updatedAt', Date.now());
-    });
-
-    currentYdoc.on('destroy', async() => {
-      this.mdb.flushDocument(pageId);
-    });
-
-    persistedYdoc.destroy();
-  }
-
-  public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
-    // TODO: https://redmine.weseek.co.jp/issues/132775
-    // It's necessary to confirm that the user is not editing the target page in the Editor
-    const currentYdoc = this.getCurrentYdoc(pageId);
-    if (currentYdoc == null) {
-      return;
-    }
-
-    const currentMarkdownLength = currentYdoc.getText('codemirror').length;
-    currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
-    currentYdoc.getText('codemirror').insert(0, newValue);
-    Y.encodeStateAsUpdate(currentYdoc);
-  }
-
-  public getCurrentYdoc(pageId: string): Ydoc | undefined {
-    const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
-    return currentYdoc;
-  }
-
-  public async getPersistedYdoc(pageId: string): Promise<Y.Doc> {
-    const persistedYdoc = await this.mdb.getYDoc(pageId);
-    return persistedYdoc;
-  }
-
-}
-
-let _instance: YjsService;
-
-export const initializeYjsService = (io: Server): void => {
-  if (_instance != null) {
-    throw new Error('YjsService is already initialized');
-  }
-
-  if (io == null) {
-    throw new Error("'io' is required if initialize YjsService");
-  }
-
-  _instance = new YjsService(io);
-};
-
-export const getYjsService = (): YjsService => {
-  if (_instance == null) {
-    throw new Error('YjsService is not initialized yet');
-  }
-
-  return _instance;
-};

+ 43 - 0
apps/app/src/server/service/yjs/create-indexes.ts

@@ -0,0 +1,43 @@
+import mongoose from 'mongoose';
+
+import loggerFactory from '~/utils/logger';
+
+const logger = loggerFactory('growi:service:yjs:create-indexes');
+
+export const createIndexes = async(collectionName: string): Promise<void> => {
+
+  const collection = mongoose.connection.collection(collectionName);
+
+  try {
+    await collection.createIndexes([
+      {
+        key: {
+          version: 1,
+          docName: 1,
+          action: 1,
+          clock: 1,
+          part: 1,
+        },
+      },
+      // for metaKey
+      {
+        key: {
+          version: 1,
+          docName: 1,
+          metaKey: 1,
+        },
+      },
+      // for flushDocument / clearDocument
+      {
+        key: {
+          docName: 1,
+          clock: 1,
+        },
+      },
+    ]);
+  }
+  catch (err) {
+    logger.error('Failed to create Index', err);
+    throw err;
+  }
+};

+ 55 - 0
apps/app/src/server/service/yjs/create-mongodb-persistence.ts

@@ -0,0 +1,55 @@
+import type { Persistence } from 'y-socket.io/dist/server';
+import * as Y from 'yjs';
+
+import loggerFactory from '~/utils/logger';
+
+import type { MongodbPersistence } from './extended/mongodb-persistence';
+
+const logger = loggerFactory('growi:service:yjs:create-mongodb-persistence');
+
+/**
+ * Based on the example by https://github.com/MaxNoetzold/y-mongodb-provider?tab=readme-ov-file#an-other-example
+ * @param mdb
+ * @returns
+ */
+export const createMongoDBPersistence = (mdb: MongodbPersistence): Persistence => {
+  const persistece: Persistence = {
+    provider: mdb,
+    bindState: async(docName, ydoc) => {
+      logger.debug('bindState', { docName });
+
+      const persistedYdoc = await mdb.getYDoc(docName);
+
+      // get the state vector so we can just store the diffs between client and server
+      const persistedStateVector = Y.encodeStateVector(persistedYdoc);
+      const diff = Y.encodeStateAsUpdate(ydoc, persistedStateVector);
+
+      // store the new data in db (if there is any: empty update is an array of 0s)
+      if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
+        mdb.storeUpdate(docName, diff);
+        mdb.setTypedMeta(docName, 'updatedAt', Date.now());
+      }
+
+      // send the persisted data to clients
+      Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
+
+      // store updates of the document in db
+      ydoc.on('update', async(update) => {
+        mdb.storeUpdate(docName, update);
+        mdb.setTypedMeta(docName, 'updatedAt', Date.now());
+      });
+
+      // cleanup some memory
+      persistedYdoc.destroy();
+    },
+    writeState: async(docName) => {
+      logger.debug('writeState', { docName });
+      // This is called when all connections to the document are closed.
+
+      // flush document on close to have the smallest possible database
+      await mdb.flushDocument(docName);
+    },
+  };
+
+  return persistece;
+};

+ 19 - 0
apps/app/src/server/service/yjs/extended/mongodb-persistence.ts

@@ -0,0 +1,19 @@
+import { MongodbPersistence as Original } from 'y-mongodb-provider';
+
+export type MetadataTypesMap = {
+  updatedAt: number,
+}
+type MetadataKeys = keyof MetadataTypesMap;
+
+
+export class MongodbPersistence extends Original {
+
+  async setTypedMeta<K extends MetadataKeys>(docName: string, key: K, value: MetadataTypesMap[K]): Promise<void> {
+    return this.setMeta(docName, key, value);
+  }
+
+  async getTypedMeta<K extends MetadataKeys>(docName: string, key: K): Promise<MetadataTypesMap[K] | undefined> {
+    return await this.getMeta(docName, key) as MetadataTypesMap[K] | undefined;
+  }
+
+}

+ 1 - 0
apps/app/src/server/service/yjs/index.ts

@@ -0,0 +1 @@
+export * from './yjs';

+ 81 - 0
apps/app/src/server/service/yjs/sync-ydoc.ts

@@ -0,0 +1,81 @@
+import { Origin, YDocStatus } from '@growi/core';
+import type { Document } from 'y-socket.io/dist/server';
+
+import loggerFactory from '~/utils/logger';
+
+import { Revision } from '../../models/revision';
+
+import type { MongodbPersistence } from './extended/mongodb-persistence';
+
+const logger = loggerFactory('growi:service:yjs:sync-ydoc');
+
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+type Delta = Array<{insert?:Array<any>|string, delete?:number, retain?:number}>;
+
+type Context = {
+  ydocStatus: YDocStatus,
+}
+
+/**
+ * Sync the text and the meta data with the latest revision body
+ * @param mdb
+ * @param doc
+ * @param context true to force sync
+ */
+export const syncYDoc = async(mdb: MongodbPersistence, doc: Document, context: true | Context): Promise<void> => {
+  const pageId = doc.name;
+
+  const revision = await Revision
+    .findOne(
+      // filter
+      { pageId },
+      // projection
+      { body: 1, createdAt: 1, origin: 1 },
+      // options
+      { sort: { createdAt: -1 } },
+    )
+    .lean();
+
+  if (revision == null) {
+    logger.warn(`Synchronization has been canceled since the revision of the page ('${pageId}') could not be found`);
+    return;
+  }
+
+  const shouldSync = context === true
+    || (() => {
+      switch (context.ydocStatus) {
+        case YDocStatus.NEW:
+          return true;
+        case YDocStatus.OUTDATED:
+          // should skip when the YDoc is outdated and the latest revision is created by the editor
+          return revision.origin !== Origin.Editor;
+        default:
+          return false;
+      }
+    })();
+
+  if (shouldSync) {
+    logger.debug(`YDoc for the page ('${pageId}') is synced with the latest revision body`);
+
+    const ytext = doc.getText('codemirror');
+    const delta: Delta = [];
+
+    if (ytext.length > 0) {
+      delta.push({ delete: ytext.length });
+    }
+    if (revision.body != null) {
+      delta.push({ insert: revision.body });
+    }
+
+    ytext.applyDelta(delta, { sanitize: false });
+  }
+
+  const shouldSyncMeta = context === true
+    || context.ydocStatus === YDocStatus.NEW
+    || context.ydocStatus === YDocStatus.OUTDATED;
+
+  if (shouldSyncMeta) {
+    mdb.setMeta(doc.name, 'updatedAt', revision.createdAt.getTime() ?? Date.now());
+  }
+};

+ 44 - 22
apps/app/src/server/service/yjs.integ.ts → apps/app/src/server/service/yjs/yjs.integ.ts

@@ -1,16 +1,18 @@
+import { YDocStatus } from '@growi/core/dist/consts';
 import { Types } from 'mongoose';
 import type { Server } from 'socket.io';
 import { mock } from 'vitest-mock-extended';
-import type { MongodbPersistence } from 'y-mongodb-provider';
 
-import { Revision } from '../models/revision';
+import { Revision } from '../../models/revision';
 
+import type { MongodbPersistence } from './extended/mongodb-persistence';
 import type { IYjsService } from './yjs';
 import { getYjsService, initializeYjsService } from './yjs';
 
 
 vi.mock('y-socket.io/dist/server', () => {
   const YSocketIO = vi.fn();
+  YSocketIO.prototype.on = vi.fn();
   YSocketIO.prototype.initialize = vi.fn();
   return { YSocketIO };
 });
@@ -26,7 +28,7 @@ const getPrivateMdbInstance = (yjsService: IYjsService): MongodbPersistence => {
 
 describe('YjsService', () => {
 
-  describe('hasYdocsNewerThanLatestRevision()', () => {
+  describe('getYDocStatus()', () => {
 
     beforeAll(async() => {
       const ioMock = mock<Server>();
@@ -45,36 +47,53 @@ describe('YjsService', () => {
       await privateMdb.flushDB();
     });
 
-    it('returns false when neither revisions nor YDocs exists', async() => {
+    it('returns ISOLATED when neither revisions nor YDocs exists', async() => {
       // arrange
       const yjsService = getYjsService();
 
       const pageId = new ObjectId();
 
       // act
-      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+      const result = await yjsService.getYDocStatus(pageId.toString());
 
       // assert
-      expect(result).toBe(false);
+      expect(result).toBe(YDocStatus.ISOLATED);
     });
 
-    it('returns true when no revisions exist', async() => {
+    it('returns ISOLATED when no revisions exist', async() => {
       // arrange
       const yjsService = getYjsService();
 
       const pageId = new ObjectId();
 
       const privateMdb = getPrivateMdbInstance(yjsService);
-      await privateMdb.setMeta(pageId.toString(), 'updatedAt', 1000);
+      await privateMdb.setTypedMeta(pageId.toString(), 'updatedAt', 1000);
 
       // act
-      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+      const result = await yjsService.getYDocStatus(pageId.toString());
 
       // assert
-      expect(result).toBe(true);
+      expect(result).toBe(YDocStatus.ISOLATED);
     });
 
-    it('returns false when the latest revision is newer than meta data', async() => {
+    it('returns NEW when no YDocs exist', async() => {
+      // arrange
+      const yjsService = getYjsService();
+
+      const pageId = new ObjectId();
+
+      await Revision.insertMany([
+        { pageId, body: '' },
+      ]);
+
+      // act
+      const result = await yjsService.getYDocStatus(pageId.toString());
+
+      // assert
+      expect(result).toBe(YDocStatus.NEW);
+    });
+
+    it('returns DRAFT when the newer YDocs exist', async() => {
       // arrange
       const yjsService = getYjsService();
 
@@ -85,33 +104,36 @@ describe('YjsService', () => {
       ]);
 
       const privateMdb = getPrivateMdbInstance(yjsService);
-      await privateMdb.setMeta(pageId.toString(), 'updatedAt', (new Date(2024, 1, 1)).getTime());
+      await privateMdb.setTypedMeta(pageId.toString(), 'updatedAt', (new Date(2034, 1, 1)).getTime());
 
       // act
-      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+      const result = await yjsService.getYDocStatus(pageId.toString());
 
       // assert
-      expect(result).toBe(false);
+      expect(result).toBe(YDocStatus.DRAFT);
     });
 
-    it('returns false when no YDocs exist', async() => {
+    it('returns SYNCED', async() => {
       // arrange
       const yjsService = getYjsService();
 
       const pageId = new ObjectId();
 
       await Revision.insertMany([
-        { pageId, body: '' },
+        { pageId, body: '', createdAt: new Date(2025, 1, 1) },
       ]);
 
+      const privateMdb = getPrivateMdbInstance(yjsService);
+      await privateMdb.setTypedMeta(pageId.toString(), 'updatedAt', (new Date(2025, 1, 1)).getTime());
+
       // act
-      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+      const result = await yjsService.getYDocStatus(pageId.toString());
 
       // assert
-      expect(result).toBe(false);
+      expect(result).toBe(YDocStatus.SYNCED);
     });
 
-    it('returns true when the newer YDocs exist', async() => {
+    it('returns OUTDATED when the latest revision is newer than meta data', async() => {
       // arrange
       const yjsService = getYjsService();
 
@@ -122,13 +144,13 @@ describe('YjsService', () => {
       ]);
 
       const privateMdb = getPrivateMdbInstance(yjsService);
-      await privateMdb.setMeta(pageId.toString(), 'updatedAt', (new Date(2034, 1, 1)).getTime());
+      await privateMdb.setTypedMeta(pageId.toString(), 'updatedAt', (new Date(2024, 1, 1)).getTime());
 
       // act
-      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+      const result = await yjsService.getYDocStatus(pageId.toString());
 
       // assert
-      expect(result).toBe(true);
+      expect(result).toBe(YDocStatus.OUTDATED);
     });
 
   });

+ 220 - 0
apps/app/src/server/service/yjs/yjs.ts

@@ -0,0 +1,220 @@
+import type { IncomingMessage } from 'http';
+
+import type { IPage, IUserHasId } from '@growi/core';
+import { YDocStatus } from '@growi/core/dist/consts';
+import mongoose from 'mongoose';
+import type { Server } from 'socket.io';
+import type { Document } from 'y-socket.io/dist/server';
+import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
+
+import { SocketEventName } from '~/interfaces/websocket';
+import { RoomPrefix, getRoomNameWithId } from '~/server/util/socket-io-helpers';
+import loggerFactory from '~/utils/logger';
+
+import type { PageModel } from '../../models/page';
+import { Revision } from '../../models/revision';
+
+import { createIndexes } from './create-indexes';
+import { createMongoDBPersistence } from './create-mongodb-persistence';
+import { MongodbPersistence } from './extended/mongodb-persistence';
+import { syncYDoc } from './sync-ydoc';
+
+
+const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
+const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
+
+
+const logger = loggerFactory('growi:service:yjs');
+
+
+type RequestWithUser = IncomingMessage & { user: IUserHasId };
+
+
+export interface IYjsService {
+  getYDocStatus(pageId: string): Promise<YDocStatus>;
+  syncWithTheLatestRevisionForce(pageId: string): Promise<void>;
+  getCurrentYdoc(pageId: string): Ydoc | undefined;
+}
+
+
+class YjsService implements IYjsService {
+
+  private ysocketio: YSocketIO;
+
+  private mdb: MongodbPersistence;
+
+  constructor(io: Server) {
+
+    const mdb = new MongodbPersistence(
+      // ignore TS2345: Argument of type '{ client: any; db: any; }' is not assignable to parameter of type 'string'.
+      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
+      // @ts-ignore
+      {
+        // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
+        // eslint-disable-next-line @typescript-eslint/no-explicit-any
+        client: mongoose.connection.getClient() as any,
+        // eslint-disable-next-line @typescript-eslint/no-explicit-any
+        db: mongoose.connection.db as any,
+      },
+      {
+        collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
+        flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
+      },
+    );
+    this.mdb = mdb;
+
+    // initialize YSocketIO
+    const ysocketio = new YSocketIO(io);
+    this.injectPersistence(ysocketio, mdb);
+    ysocketio.initialize();
+    this.ysocketio = ysocketio;
+
+    // create indexes
+    createIndexes(MONGODB_PERSISTENCE_COLLECTION_NAME);
+
+    // register middlewares
+    this.registerAccessiblePageChecker(ysocketio);
+
+    ysocketio.on('document-loaded', async(doc: Document) => {
+      const pageId = doc.name;
+
+      const ydocStatus = await this.getYDocStatus(pageId);
+
+      syncYDoc(mdb, doc, { ydocStatus });
+    });
+
+    ysocketio.on('awareness-update', async(doc: Document) => {
+      const pageId = doc.name;
+
+      if (pageId == null) return;
+
+      const awarenessStateSize = doc.awareness.states.size;
+
+      // Triggered when awareness changes
+      io
+        .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
+        .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
+
+      // Triggered when the last user leaves the editor
+      if (awarenessStateSize === 0) {
+        const ydocStatus = await this.getYDocStatus(pageId);
+        const hasYdocsNewerThanLatestRevision = ydocStatus === YDocStatus.DRAFT || ydocStatus === YDocStatus.ISOLATED;
+
+        io
+          .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
+          .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
+      }
+    });
+
+  }
+
+  private injectPersistence(ysocketio: YSocketIO, mdb: MongodbPersistence): void {
+    const persistece = createMongoDBPersistence(mdb);
+
+    // foce set to private property
+    // eslint-disable-next-line dot-notation
+    ysocketio['persistence'] = persistece;
+  }
+
+  private registerAccessiblePageChecker(ysocketio: YSocketIO): void {
+    // check accessible page
+    ysocketio.nsp?.use(async(socket, next) => {
+      // extract page id from namespace
+      const pageId = socket.nsp.name.replace(/\/yjs\|/, '');
+      const user = (socket.request as RequestWithUser).user; // should be injected by SocketIOService
+
+      const Page = mongoose.model<IPage, PageModel>('Page');
+      const isAccessible = await Page.isAccessiblePageByViewer(pageId, user);
+
+      if (!isAccessible) {
+        return next(new Error('Forbidden'));
+      }
+
+      return next();
+    });
+  }
+
+  public async getYDocStatus(pageId: string): Promise<YDocStatus> {
+    const dumpLog = (status: YDocStatus, args?: { [key: string]: number }) => {
+      logger.debug(`getYDocStatus('${pageId}') detected '${status}'`, args ?? {});
+    };
+
+    // get the latest revision createdAt
+    const result = await Revision
+      .findOne(
+        // filter
+        { pageId },
+        // projection
+        { createdAt: 1 },
+        { sort: { createdAt: -1 } },
+      )
+      .lean();
+
+    if (result == null) {
+      dumpLog(YDocStatus.ISOLATED);
+      return YDocStatus.ISOLATED;
+    }
+
+    // count yjs-writings documents with updatedAt > latestRevision.updatedAt
+    const ydocUpdatedAt = await this.mdb.getTypedMeta(pageId, 'updatedAt');
+
+    if (ydocUpdatedAt == null) {
+      dumpLog(YDocStatus.NEW);
+      return YDocStatus.NEW;
+    }
+
+    const { createdAt } = result;
+    const lastRevisionCreatedAt = createdAt.getTime();
+
+    if (lastRevisionCreatedAt < ydocUpdatedAt) {
+      dumpLog(YDocStatus.DRAFT, { lastRevisionCreatedAt, ydocUpdatedAt });
+      return YDocStatus.DRAFT;
+    }
+
+    if (lastRevisionCreatedAt === ydocUpdatedAt) {
+      dumpLog(YDocStatus.SYNCED, { lastRevisionCreatedAt, ydocUpdatedAt });
+      return YDocStatus.SYNCED;
+    }
+
+    dumpLog(YDocStatus.OUTDATED, { lastRevisionCreatedAt, ydocUpdatedAt });
+    return YDocStatus.OUTDATED;
+  }
+
+  public async syncWithTheLatestRevisionForce(pageId: string): Promise<void> {
+    const doc = this.ysocketio.documents.get(pageId);
+
+    if (doc == null) {
+      return;
+    }
+
+    syncYDoc(this.mdb, doc, true);
+  }
+
+  public getCurrentYdoc(pageId: string): Ydoc | undefined {
+    const currentYdoc = this.ysocketio.documents.get(pageId);
+    return currentYdoc;
+  }
+
+}
+
+let _instance: YjsService;
+
+export const initializeYjsService = (io: Server): void => {
+  if (_instance != null) {
+    throw new Error('YjsService is already initialized');
+  }
+
+  if (io == null) {
+    throw new Error("'io' is required if initialize YjsService");
+  }
+
+  _instance = new YjsService(io);
+};
+
+export const getYjsService = (): YjsService => {
+  if (_instance == null) {
+    throw new Error('YjsService is not initialized yet');
+  }
+
+  return _instance;
+};

+ 12 - 3
apps/app/src/stores/yjs.ts

@@ -15,7 +15,13 @@ type CurrentPageYjsDataUtils = {
 }
 
 export const useCurrentPageYjsData = (): SWRResponse<CurrentPageYjsData, Error> & CurrentPageYjsDataUtils => {
-  const swrResponse = useSWRStatic<CurrentPageYjsData, Error>('currentPageYjsData', undefined);
+  const { data: currentPageId } = useCurrentPageId();
+
+  const key = currentPageId != null
+    ? `/page/${currentPageId}/yjs-data`
+    : null;
+
+  const swrResponse = useSWRStatic<CurrentPageYjsData, Error>(key, undefined);
 
   const updateHasYdocsNewerThanLatestRevision = useCallback((hasYdocsNewerThanLatestRevision: boolean) => {
     swrResponse.mutate({ ...swrResponse.data, hasYdocsNewerThanLatestRevision });
@@ -29,12 +35,15 @@ export const useCurrentPageYjsData = (): SWRResponse<CurrentPageYjsData, Error>
 };
 
 export const useSWRMUTxCurrentPageYjsData = (): SWRMutationResponse<CurrentPageYjsData, Error> => {
-  const key = 'currentPageYjsData';
   const { data: currentPageId } = useCurrentPageId();
 
+  const key = currentPageId != null
+    ? `/page/${currentPageId}/yjs-data`
+    : null;
+
   return useSWRMutation(
     key,
-    () => apiv3Get<{ yjsData: CurrentPageYjsData }>(`/page/${currentPageId}/yjs-data`).then(result => result.data.yjsData),
+    ([endpoint]) => apiv3Get<{ yjsData: CurrentPageYjsData }>(endpoint).then(result => result.data.yjsData),
     { populateCache: true, revalidate: false },
   );
 };

+ 1 - 0
packages/core/src/consts/index.ts

@@ -1,2 +1,3 @@
 export * from './accepted-upload-file-type';
 export * from './growi-plugin';
+export * from './ydoc-status';

+ 15 - 0
packages/core/src/consts/ydoc-status.ts

@@ -0,0 +1,15 @@
+/**
+ * NEW: The document is newly created and not yet synced with the latest revision.
+ * SYNCED: The document is synced with the latest revision.
+ * DRAFT: The document advances as a draft compared to the latest revision
+ * OUTDATED: The document is outdated and needs to be synced with the latest revision.
+ * ISOLATED: The latest revision does not exist and the document is isolated from the page.
+ */
+export const YDocStatus = {
+  NEW: 'new',
+  SYNCED: 'synced',
+  DRAFT: 'draft',
+  OUTDATED: 'outdated',
+  ISOLATED: 'isolated',
+} as const;
+export type YDocStatus = typeof YDocStatus[keyof typeof YDocStatus]

+ 0 - 1
packages/core/src/interfaces/index.ts

@@ -13,4 +13,3 @@ export * from './subscription';
 export * from './tag';
 export * from './user';
 export * from './vite';
-export * from './websocket';

+ 0 - 6
packages/core/src/interfaces/websocket.ts

@@ -1,6 +0,0 @@
-export const GlobalSocketEventName = {
-  // YDoc
-  YDocSync: 'ydoc:sync',
-  YDocSyncError: 'ydoc:sync:error',
-} as const;
-export type GlobalSocketEventName = typeof GlobalSocketEventName[keyof typeof GlobalSocketEventName];

+ 1 - 0
packages/editor/package.json

@@ -18,6 +18,7 @@
     "lint": "npm-run-all -p lint:*"
   },
   "dependencies": {
+    "lib0": "^0.2.94",
     "markdown-table": "^3.0.3",
     "react": "^18.2.0",
     "react-dom": "^18.2.0"

+ 12 - 11
packages/editor/src/client/stores/use-collaborative-editor-mode.ts

@@ -1,8 +1,8 @@
 import { useEffect, useState } from 'react';
 
 import { keymap } from '@codemirror/view';
-import { GlobalSocketEventName, type IUserHasId } from '@growi/core/dist/interfaces';
-import { useGlobalSocket, GLOBAL_SOCKET_NS } from '@growi/core/dist/swr';
+import type { IUserHasId } from '@growi/core/dist/interfaces';
+import { useGlobalSocket } from '@growi/core/dist/swr';
 import { yCollab, yUndoManagerKeymap } from 'y-codemirror.next';
 import { SocketIOProvider } from 'y-socket.io';
 import * as Y from 'yjs';
@@ -44,9 +44,6 @@ export const useCollaborativeEditorMode = (
     // so only awareness is destroyed here
     provider?.awareness.destroy();
 
-    // TODO: catch ydoc:sync:error GlobalSocketEventName.YDocSyncError
-    socket?.off(GlobalSocketEventName.YDocSync);
-
     setCPageId(pageId);
 
     // reset editors
@@ -70,15 +67,18 @@ export const useCollaborativeEditorMode = (
 
   // Setup provider
   useEffect(() => {
-    if (provider != null || ydoc == null || socket == null || onEditorsUpdated == null) {
+    if (provider != null || pageId == null || ydoc == null || socket == null || onEditorsUpdated == null) {
       return;
     }
 
     const socketIOProvider = new SocketIOProvider(
-      GLOBAL_SOCKET_NS,
-      `yjs/${pageId}`,
+      '/',
+      pageId,
       ydoc,
-      { autoConnect: true },
+      {
+        autoConnect: true,
+        resyncInterval: 3000,
+      },
     );
 
     const userLocalState: UserLocalState = {
@@ -92,14 +92,13 @@ export const useCollaborativeEditorMode = (
 
     socketIOProvider.on('sync', (isSync: boolean) => {
       if (isSync) {
-        socket.emit(GlobalSocketEventName.YDocSync, { pageId, initialValue });
         const userList: IUserHasId[] = Array.from(socketIOProvider.awareness.states.values(), value => value.user.user && value.user.user);
         onEditorsUpdated(userList);
       }
     });
 
     // update args type see: SocketIOProvider.Awareness.awarenessUpdate
-    socketIOProvider.awareness.on('update', (update: any) => {
+    socketIOProvider.awareness.on('update', (update: { added: unknown[]; removed: unknown[]; }) => {
       const { added, removed } = update;
       if (added.length > 0 || removed.length > 0) {
         const userList: IUserHasId[] = Array.from(socketIOProvider.awareness.states.values(), value => value.user.user && value.user.user);
@@ -131,6 +130,8 @@ export const useCollaborativeEditorMode = (
     return () => {
       cleanupYUndoManagerKeymap?.();
       cleanupYCollab?.();
+      // clean up editor
+      codeMirrorEditor.initDoc('');
     };
   }, [codeMirrorEditor, provider, ydoc]);
 };