Przeglądaj źródła

Merge pull request #8949 from weseek/support/refactor-yjs

support: Refactor Yjs service
Yuki Takei 1 rok temu
rodzic
commit
cca5d3ef23

+ 2 - 1
apps/app/nodemon.json

@@ -9,6 +9,7 @@
     "src/**/client",
     "test",
     "test-with-vite",
-    "tmp"
+    "tmp",
+    "*.mongodb.js"
   ]
 }

+ 2 - 2
apps/app/src/client/components/Navbar/PageEditorModeManager.tsx

@@ -93,11 +93,11 @@ export const PageEditorModeManager = (props: Props): JSX.Element => {
   const _isBtnDisabled = isCreating || isBtnDisabled;
 
   const circleColor = useMemo(() => {
-    if (currentPageYjsData?.awarenessStateSize != null && currentPageYjsData.awarenessStateSize > 0) {
+    if ((currentPageYjsData?.awarenessStateSize ?? 0) > 0) {
       return 'bg-primary';
     }
 
-    if (currentPageYjsData?.hasRevisionBodyDiff != null && currentPageYjsData.hasRevisionBodyDiff) {
+    if (currentPageYjsData?.hasYdocsNewerThanLatestRevision ?? false) {
       return 'bg-secondary';
     }
   }, [currentPageYjsData]);

+ 7 - 15
apps/app/src/client/services/side-effects/yjs.ts

@@ -1,4 +1,4 @@
-import { useCallback, useEffect } from 'react';
+import { useEffect } from 'react';
 
 import { useGlobalSocket } from '@growi/core/dist/swr';
 
@@ -7,27 +7,19 @@ import { useCurrentPageYjsData } from '~/stores/yjs';
 
 export const useCurrentPageYjsDataEffect = (): void => {
   const { data: socket } = useGlobalSocket();
-  const { updateHasRevisionBodyDiff, updateAwarenessStateSize } = useCurrentPageYjsData();
-
-  const hasRevisionBodyDiffUpdateHandler = useCallback((hasRevisionBodyDiff: boolean) => {
-    updateHasRevisionBodyDiff(hasRevisionBodyDiff);
-  }, [updateHasRevisionBodyDiff]);
-
-  const awarenessStateSizeUpdateHandler = useCallback(((awarenessStateSize: number) => {
-    updateAwarenessStateSize(awarenessStateSize);
-  }), [updateAwarenessStateSize]);
+  const { updateHasYdocsNewerThanLatestRevision, updateAwarenessStateSize } = useCurrentPageYjsData();
 
   useEffect(() => {
 
     if (socket == null) { return }
 
-    socket.on(SocketEventName.YjsHasRevisionBodyDiffUpdated, hasRevisionBodyDiffUpdateHandler);
-    socket.on(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSizeUpdateHandler);
+    socket.on(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, updateHasYdocsNewerThanLatestRevision);
+    socket.on(SocketEventName.YjsAwarenessStateSizeUpdated, updateAwarenessStateSize);
 
     return () => {
-      socket.off(SocketEventName.YjsHasRevisionBodyDiffUpdated, hasRevisionBodyDiffUpdateHandler);
-      socket.off(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSizeUpdateHandler);
+      socket.off(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, updateHasYdocsNewerThanLatestRevision);
+      socket.off(SocketEventName.YjsAwarenessStateSizeUpdated, updateAwarenessStateSize);
     };
 
-  }, [socket, awarenessStateSizeUpdateHandler, hasRevisionBodyDiffUpdateHandler]);
+  }, [socket, updateAwarenessStateSize, updateHasYdocsNewerThanLatestRevision]);
 };

+ 1 - 1
apps/app/src/interfaces/websocket.ts

@@ -51,7 +51,7 @@ export const SocketEventName = {
 
   // Yjs
   YjsAwarenessStateSizeUpdated: 'yjs:awareness-state-size-update',
-  YjsHasRevisionBodyDiffUpdated: 'yjs:has-revision-body-diff-update',
+  YjsHasYdocsNewerThanLatestRevisionUpdated: 'yjs:has-ydocs-newer-than-latest-revision-update',
 } as const;
 export type SocketEventName = typeof SocketEventName[keyof typeof SocketEventName];
 

+ 1 - 1
apps/app/src/interfaces/yjs.ts

@@ -1,4 +1,4 @@
 export type CurrentPageYjsData = {
-  hasRevisionBodyDiff?: boolean,
+  hasYdocsNewerThanLatestRevision?: boolean,
   awarenessStateSize?: number,
 }

+ 3 - 4
apps/app/src/server/crowi/index.js

@@ -36,7 +36,7 @@ import SearchService from '../service/search';
 import { SlackIntegrationService } from '../service/slack-integration';
 import UserGroupService from '../service/user-group';
 import { UserNotificationService } from '../service/user-notification';
-import { instantiateYjsConnectionManager } from '../service/yjs-connection-manager';
+import { initializeYjsService } from '../service/yjs';
 import { getMongoUri, mongoOptions } from '../util/mongoose-utils';
 
 
@@ -475,9 +475,8 @@ Crowi.prototype.start = async function() {
   // attach to socket.io
   this.socketIoService.attachServer(httpServer);
 
-  // Initialization YjsConnectionManager
-  instantiateYjsConnectionManager(this.socketIoService.io);
-  this.socketIoService.setupYjsConnection();
+  // Initialization YjsService
+  initializeYjsService(this.socketIoService.io);
 
   await this.autoInstall();
 

+ 3 - 3
apps/app/src/server/routes/apiv3/page/update-page.ts

@@ -18,7 +18,7 @@ import {
 } from '~/server/models';
 import type { PageDocument, PageModel } from '~/server/models/page';
 import { preNotifyService } from '~/server/service/pre-notify';
-import { getYjsConnectionManager } from '~/server/service/yjs-connection-manager';
+import { getYjsService } from '~/server/service/yjs';
 import { generalXssFilter } from '~/services/general-xss-filter';
 import loggerFactory from '~/utils/logger';
 
@@ -67,8 +67,8 @@ export const updatePageHandlersFactory: UpdatePageHandlersFactory = (crowi) => {
     // Reflect the updates in ydoc
     const origin = req.body.origin;
     if (origin === Origin.View || origin === undefined) {
-      const yjsConnectionManager = getYjsConnectionManager();
-      await yjsConnectionManager.handleYDocUpdate(req.body.pageId, req.body.body);
+      const yjsService = getYjsService();
+      await yjsService.handleYDocUpdate(req.body.pageId, req.body.body);
     }
 
     // persist activity

+ 5 - 22
apps/app/src/server/service/page/index.ts

@@ -41,7 +41,6 @@ import {
 import type { PageTagRelationDocument } from '~/server/models/page-tag-relation';
 import PageTagRelation from '~/server/models/page-tag-relation';
 import type { UserGroupDocument } from '~/server/models/user-group';
-import { getYjsConnectionManager } from '~/server/service/yjs-connection-manager';
 import { createBatchStream } from '~/server/util/batch-stream';
 import { collectAncestorPaths } from '~/server/util/collect-ancestor-paths';
 import { generalXssFilter } from '~/services/general-xss-filter';
@@ -64,6 +63,7 @@ import { divideByType } from '../../util/granted-group';
 import { configManager } from '../config-manager';
 import type { IPageGrantService } from '../page-grant';
 import { preNotifyService } from '../pre-notify';
+import { getYjsService } from '../yjs';
 
 import { BULK_REINDEX_SIZE, LIMIT_FOR_MULTIPLE_PAGE_OP } from './consts';
 import type { IPageService } from './page-service';
@@ -4435,34 +4435,17 @@ class PageService implements IPageService {
   }
 
   async getYjsData(pageId: string): Promise<CurrentPageYjsData> {
-    const yjsConnectionManager = getYjsConnectionManager();
+    const yjsService = getYjsService();
 
-    const currentYdoc = yjsConnectionManager.getCurrentYdoc(pageId);
-    const persistedYdoc = await yjsConnectionManager.getPersistedYdoc(pageId);
-
-    const yjsDraft = (currentYdoc ?? persistedYdoc)?.getText('codemirror').toString();
-    const hasRevisionBodyDiff = await this.hasRevisionBodyDiff(pageId, yjsDraft);
+    const currentYdoc = yjsService.getCurrentYdoc(pageId);
+    const hasYdocsNewerThanLatestRevision = await yjsService.hasYdocsNewerThanLatestRevision(pageId);
 
     return {
-      hasRevisionBodyDiff,
+      hasYdocsNewerThanLatestRevision,
       awarenessStateSize: currentYdoc?.awareness.states.size,
     };
   }
 
-  async hasRevisionBodyDiff(pageId: string, comparisonTarget?: string): Promise<boolean> {
-    if (comparisonTarget == null) {
-      return false;
-    }
-
-    const revision = await Revision.findOne({ pageId }).sort({ createdAt: -1 });
-
-    if (revision == null) {
-      return false;
-    }
-
-    return revision.body !== comparisonTarget;
-  }
-
   async createTtlIndex(): Promise<void> {
     const wipPageExpirationSeconds = configManager.getConfig('crowi', 'app:wipPageExpirationSeconds') ?? 172800;
     const collection = mongoose.connection.collection('pages');

+ 0 - 1
apps/app/src/server/service/page/page-service.ts

@@ -32,5 +32,4 @@ export interface IPageService {
     page: PageDocument, creatorId: ObjectIdLike | null, operator: any | null, userRelatedGroups: PopulatedGrantedGroup[]
   ): boolean,
   getYjsData(pageId: string, revisionBody?: string): Promise<CurrentPageYjsData>,
-  hasRevisionBodyDiff(pageId: string, comparisonTarget?: string): Promise<boolean>,
 }

+ 0 - 43
apps/app/src/server/service/socket-io.ts

@@ -1,12 +1,10 @@
 import type { IncomingMessage } from 'http';
 
 import type { IUserHasId } from '@growi/core/dist/interfaces';
-import { GlobalSocketEventName } from '@growi/core/dist/interfaces';
 import expressSession from 'express-session';
 import passport from 'passport';
 import type { Namespace } from 'socket.io';
 import { Server } from 'socket.io';
-import type { Document } from 'y-socket.io/dist/server';
 
 import { SocketEventName } from '~/interfaces/websocket';
 import loggerFactory from '~/utils/logger';
@@ -15,7 +13,6 @@ import type Crowi from '../crowi';
 import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
 
 import { configManager } from './config-manager';
-import { getYjsConnectionManager, extractPageIdFromYdocId } from './yjs-connection-manager';
 
 
 const logger = loggerFactory('growi:service:socket-io');
@@ -179,46 +176,6 @@ class SocketIoService {
     });
   }
 
-  setupYjsConnection() {
-    const yjsConnectionManager = getYjsConnectionManager();
-
-    this.io.on('connection', (socket) => {
-
-      yjsConnectionManager.ysocketioInstance.on('awareness-update', async(doc: Document) => {
-        const pageId = extractPageIdFromYdocId(doc.name);
-
-        if (pageId == null) return;
-
-        const awarenessStateSize = doc.awareness.states.size;
-
-        // Triggered when awareness changes
-        this.io
-          .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
-          .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
-
-        // Triggered when the last user leaves the editor
-        if (awarenessStateSize === 0) {
-          const currentYdoc = yjsConnectionManager.getCurrentYdoc(pageId);
-          const yjsDraft = currentYdoc?.getText('codemirror').toString();
-          const hasRevisionBodyDiff = await this.crowi.pageService.hasRevisionBodyDiff(pageId, yjsDraft);
-          this.io
-            .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
-            .emit(SocketEventName.YjsHasRevisionBodyDiffUpdated, hasRevisionBodyDiff);
-        }
-      });
-
-      socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
-        try {
-          await yjsConnectionManager.handleYDocSync(pageId, initialValue);
-        }
-        catch (error) {
-          logger.warn(error.message);
-          socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
-        }
-      });
-    });
-  }
-
   async checkConnectionLimitsForAdmin(socket, next) {
     const namespaceName = socket.nsp.name;
 

+ 0 - 128
apps/app/src/server/service/yjs-connection-manager.ts

@@ -1,128 +0,0 @@
-import mongoose from 'mongoose';
-import type { Server } from 'socket.io';
-import { MongodbPersistence } from 'y-mongodb-provider';
-import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
-import * as Y from 'yjs';
-
-const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
-const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
-
-export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
-  const result = ydocId.match(/yjs\/(.*)/);
-  return result?.[1];
-};
-
-class YjsConnectionManager {
-
-  private static instance: YjsConnectionManager;
-
-  private ysocketio: YSocketIO;
-
-  private mdb: MongodbPersistence;
-
-  get ysocketioInstance(): YSocketIO {
-    return this.ysocketio;
-  }
-
-  private constructor(io: Server) {
-    this.ysocketio = new YSocketIO(io);
-    this.ysocketio.initialize();
-
-    this.mdb = new MongodbPersistence(
-      {
-        // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
-        // eslint-disable-next-line @typescript-eslint/no-explicit-any
-        client: mongoose.connection.getClient() as any,
-        // eslint-disable-next-line @typescript-eslint/no-explicit-any
-        db: mongoose.connection.db as any,
-      },
-      {
-        collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
-        flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
-      },
-    );
-  }
-
-  public static getInstance(io?: Server) {
-    if (this.instance != null) {
-      return this.instance;
-    }
-
-    if (io == null) {
-      throw new Error("'io' is required if initialize YjsConnectionManager");
-    }
-
-    this.instance = new YjsConnectionManager(io);
-    return this.instance;
-  }
-
-  public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
-    const currentYdoc = this.getCurrentYdoc(pageId);
-    if (currentYdoc == null) {
-      return;
-    }
-
-    const persistedYdoc = await this.getPersistedYdoc(pageId);
-    const persistedStateVector = Y.encodeStateVector(persistedYdoc);
-
-    await this.mdb.flushDocument(pageId);
-
-    // If no write operation has been performed, insert initial value
-    const clientsSize = persistedYdoc.store.clients.size;
-    if (clientsSize === 0) {
-      currentYdoc.getText('codemirror').insert(0, initialValue);
-    }
-
-    const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
-
-    if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
-      this.mdb.storeUpdate(pageId, diff);
-    }
-
-    Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
-
-    currentYdoc.on('update', async(update) => {
-      await this.mdb.storeUpdate(pageId, update);
-    });
-
-    currentYdoc.on('destroy', async() => {
-      await this.mdb.flushDocument(pageId);
-    });
-
-    persistedYdoc.destroy();
-  }
-
-  public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
-    // TODO: https://redmine.weseek.co.jp/issues/132775
-    // It's necessary to confirm that the user is not editing the target page in the Editor
-    const currentYdoc = this.getCurrentYdoc(pageId);
-    if (currentYdoc == null) {
-      return;
-    }
-
-    const currentMarkdownLength = currentYdoc.getText('codemirror').length;
-    currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
-    currentYdoc.getText('codemirror').insert(0, newValue);
-    Y.encodeStateAsUpdate(currentYdoc);
-  }
-
-  public getCurrentYdoc(pageId: string): Ydoc | undefined {
-    const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
-    return currentYdoc;
-  }
-
-  public async getPersistedYdoc(pageId: string): Promise<Y.Doc> {
-    const persistedYdoc = await this.mdb.getYDoc(pageId);
-    return persistedYdoc;
-  }
-
-}
-
-export const instantiateYjsConnectionManager = (io: Server): YjsConnectionManager => {
-  return YjsConnectionManager.getInstance(io);
-};
-
-// export the singleton instance
-export const getYjsConnectionManager = (): YjsConnectionManager => {
-  return YjsConnectionManager.getInstance();
-};

+ 135 - 0
apps/app/src/server/service/yjs.integ.ts

@@ -0,0 +1,135 @@
+import { Types } from 'mongoose';
+import type { Server } from 'socket.io';
+import { mock } from 'vitest-mock-extended';
+import type { MongodbPersistence } from 'y-mongodb-provider';
+
+import { Revision } from '../models/revision';
+
+import type { IYjsService } from './yjs';
+import { getYjsService, initializeYjsService } from './yjs';
+
+
+vi.mock('y-socket.io/dist/server', () => {
+  const YSocketIO = vi.fn();
+  YSocketIO.prototype.initialize = vi.fn();
+  return { YSocketIO };
+});
+
+
+const ObjectId = Types.ObjectId;
+
+
+const getPrivateMdbInstance = (yjsService: IYjsService): MongodbPersistence => {
+  // eslint-disable-next-line dot-notation
+  return yjsService['mdb'];
+};
+
+describe('YjsService', () => {
+
+  describe('hasYdocsNewerThanLatestRevision()', () => {
+
+    beforeAll(async() => {
+      const ioMock = mock<Server>();
+
+      // initialize
+      initializeYjsService(ioMock);
+    });
+
+    afterAll(async() => {
+      // flush revisions
+      await Revision.deleteMany({});
+
+      // flush yjs-writings
+      const yjsService = getYjsService();
+      const privateMdb = getPrivateMdbInstance(yjsService);
+      await privateMdb.flushDB();
+    });
+
+    it('returns false when neither revisions nor YDocs exists', async() => {
+      // arrange
+      const yjsService = getYjsService();
+
+      const pageId = new ObjectId();
+
+      // act
+      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+
+      // assert
+      expect(result).toBe(false);
+    });
+
+    it('returns true when no revisions exist', async() => {
+      // arrange
+      const yjsService = getYjsService();
+
+      const pageId = new ObjectId();
+
+      const privateMdb = getPrivateMdbInstance(yjsService);
+      await privateMdb.setMeta(pageId.toString(), 'updatedAt', 1000);
+
+      // act
+      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+
+      // assert
+      expect(result).toBe(true);
+    });
+
+    it('returns false when the latest revision is newer than meta data', async() => {
+      // arrange
+      const yjsService = getYjsService();
+
+      const pageId = new ObjectId();
+
+      await Revision.insertMany([
+        { pageId, body: '' },
+      ]);
+
+      const privateMdb = getPrivateMdbInstance(yjsService);
+      await privateMdb.setMeta(pageId.toString(), 'updatedAt', (new Date(2024, 1, 1)).getTime());
+
+      // act
+      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+
+      // assert
+      expect(result).toBe(false);
+    });
+
+    it('returns false when no YDocs exist', async() => {
+      // arrange
+      const yjsService = getYjsService();
+
+      const pageId = new ObjectId();
+
+      await Revision.insertMany([
+        { pageId, body: '' },
+      ]);
+
+      // act
+      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+
+      // assert
+      expect(result).toBe(false);
+    });
+
+    it('returns true when the newer YDocs exist', async() => {
+      // arrange
+      const yjsService = getYjsService();
+
+      const pageId = new ObjectId();
+
+      await Revision.insertMany([
+        { pageId, body: '' },
+      ]);
+
+      const privateMdb = getPrivateMdbInstance(yjsService);
+      await privateMdb.setMeta(pageId.toString(), 'updatedAt', (new Date(2034, 1, 1)).getTime());
+
+      // act
+      const result = await yjsService.hasYdocsNewerThanLatestRevision(pageId.toString());
+
+      // assert
+      expect(result).toBe(true);
+    });
+
+  });
+});

+ 247 - 0
apps/app/src/server/service/yjs.ts

@@ -0,0 +1,247 @@
+import type { IRevisionHasId } from '@growi/core';
+import { GlobalSocketEventName } from '@growi/core';
+import mongoose from 'mongoose';
+import type { Server } from 'socket.io';
+import { MongodbPersistence } from 'y-mongodb-provider';
+import type { Document } from 'y-socket.io/dist/server';
+import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
+import * as Y from 'yjs';
+
+import { SocketEventName } from '~/interfaces/websocket';
+import loggerFactory from '~/utils/logger';
+
+import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
+
+
+const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
+const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
+
+
+const logger = loggerFactory('growi:service:yjs');
+
+
+export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
+  const result = ydocId.match(/yjs\/(.*)/);
+  return result?.[1];
+};
+
+export interface IYjsService {
+  hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean>;
+  handleYDocSync(pageId: string, initialValue: string): Promise<void>;
+  handleYDocUpdate(pageId: string, newValue: string): Promise<void>;
+  getCurrentYdoc(pageId: string): Ydoc | undefined;
+  getPersistedYdoc(pageId: string): Promise<Y.Doc>;
+}
+
+class YjsService implements IYjsService {
+
+  private ysocketio: YSocketIO;
+
+  private mdb: MongodbPersistence;
+
+  constructor(io: Server) {
+    const ysocketio = new YSocketIO(io);
+    ysocketio.initialize();
+    this.ysocketio = ysocketio;
+
+    this.mdb = new MongodbPersistence(
+      // ignore TS2345: Argument of type '{ client: any; db: any; }' is not assignable to parameter of type 'string'.
+      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
+      // @ts-ignore
+      {
+        // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
+        // eslint-disable-next-line @typescript-eslint/no-explicit-any
+        client: mongoose.connection.getClient() as any,
+        // eslint-disable-next-line @typescript-eslint/no-explicit-any
+        db: mongoose.connection.db as any,
+      },
+      {
+        collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
+        flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
+      },
+    );
+
+    this.createIndexes();
+
+    io.on('connection', (socket) => {
+
+      ysocketio.on('awareness-update', async(doc: Document) => {
+        const pageId = extractPageIdFromYdocId(doc.name);
+
+        if (pageId == null) return;
+
+        const awarenessStateSize = doc.awareness.states.size;
+
+        // Triggered when awareness changes
+        io
+          .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
+          .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
+
+        // Triggered when the last user leaves the editor
+        if (awarenessStateSize === 0) {
+          const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
+          io
+            .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
+            .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
+        }
+      });
+
+      socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
+        try {
+          await this.handleYDocSync(pageId, initialValue);
+        }
+        catch (error) {
+          logger.warn(error.message);
+          socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
+        }
+      });
+    });
+  }
+
+  private async createIndexes(): Promise<void> {
+
+    const collection = mongoose.connection.collection(MONGODB_PERSISTENCE_COLLECTION_NAME);
+
+    try {
+      await collection.createIndexes([
+        {
+          key: {
+            version: 1,
+            docName: 1,
+            action: 1,
+            clock: 1,
+            part: 1,
+          },
+        },
+        // for metaKey
+        {
+          key: {
+            version: 1,
+            docName: 1,
+            metaKey: 1,
+          },
+        },
+        // for flushDocument / clearDocument
+        {
+          key: {
+            docName: 1,
+            clock: 1,
+          },
+        },
+      ]);
+    }
+    catch (err) {
+      logger.error('Failed to create Index', err);
+      throw err;
+    }
+  }
+
+  public async hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean> {
+    // get the latest revision createdAt
+    const Revision = mongoose.model<IRevisionHasId>('Revision');
+    const result = await Revision
+      .findOne(
+        // filter
+        { pageId },
+        // projection
+        { createdAt: 1 },
+        { sort: { createdAt: -1 } },
+      );
+
+    const lastRevisionCreatedAt = (result == null)
+      ? 0
+      : result.createdAt.getTime();
+
+    // count yjs-writings documents with updatedAt > latestRevision.updatedAt
+    const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
+
+    return ydocUpdatedAt == null
+      ? false
+      : ydocUpdatedAt > lastRevisionCreatedAt;
+  }
+
+  public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
+    const currentYdoc = this.getCurrentYdoc(pageId);
+    if (currentYdoc == null) {
+      return;
+    }
+
+    const persistedYdoc = await this.getPersistedYdoc(pageId);
+    const persistedStateVector = Y.encodeStateVector(persistedYdoc);
+
+    await this.mdb.flushDocument(pageId);
+
+    // If no write operation has been performed, insert initial value
+    const clientsSize = persistedYdoc.store.clients.size;
+    if (clientsSize === 0) {
+      currentYdoc.getText('codemirror').insert(0, initialValue);
+    }
+
+    const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
+
+    if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
+      this.mdb.storeUpdate(pageId, diff);
+      this.mdb.setMeta(pageId, 'updatedAt', Date.now());
+    }
+
+    Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
+
+    currentYdoc.on('update', async(update) => {
+      this.mdb.storeUpdate(pageId, update);
+      this.mdb.setMeta(pageId, 'updatedAt', Date.now());
+    });
+
+    currentYdoc.on('destroy', async() => {
+      this.mdb.flushDocument(pageId);
+    });
+
+    persistedYdoc.destroy();
+  }
+
+  public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
+    // TODO: https://redmine.weseek.co.jp/issues/132775
+    // It's necessary to confirm that the user is not editing the target page in the Editor
+    const currentYdoc = this.getCurrentYdoc(pageId);
+    if (currentYdoc == null) {
+      return;
+    }
+
+    const currentMarkdownLength = currentYdoc.getText('codemirror').length;
+    currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
+    currentYdoc.getText('codemirror').insert(0, newValue);
+    Y.encodeStateAsUpdate(currentYdoc);
+  }
+
+  public getCurrentYdoc(pageId: string): Ydoc | undefined {
+    const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
+    return currentYdoc;
+  }
+
+  public async getPersistedYdoc(pageId: string): Promise<Y.Doc> {
+    const persistedYdoc = await this.mdb.getYDoc(pageId);
+    return persistedYdoc;
+  }
+
+}
+
+let _instance: YjsService;
+
+export const initializeYjsService = (io: Server): void => {
+  if (_instance != null) {
+    throw new Error('YjsService is already initialized');
+  }
+
+  if (io == null) {
+    throw new Error("'io' is required if initialize YjsService");
+  }
+
+  _instance = new YjsService(io);
+};
+
+export const getYjsService = (): YjsService => {
+  if (_instance == null) {
+    throw new Error('YjsService is not initialized yet');
+  }
+
+  return _instance;
+};

+ 4 - 6
apps/app/src/stores/yjs.ts

@@ -10,24 +10,22 @@ import type { CurrentPageYjsData } from '~/interfaces/yjs';
 import { useCurrentPageId } from './page';
 
 type CurrentPageYjsDataUtils = {
-  updateHasRevisionBodyDiff(hasRevisionBodyDiff: boolean): void
+  updateHasYdocsNewerThanLatestRevision(hasYdocsNewerThanLatestRevision: boolean): void
   updateAwarenessStateSize(awarenessStateSize: number): void
 }
 
 export const useCurrentPageYjsData = (): SWRResponse<CurrentPageYjsData, Error> & CurrentPageYjsDataUtils => {
   const swrResponse = useSWRStatic<CurrentPageYjsData, Error>('currentPageYjsData', undefined);
 
-  const updateHasRevisionBodyDiff = useCallback((hasRevisionBodyDiff: boolean) => {
-    swrResponse.mutate({ ...swrResponse.data, hasRevisionBodyDiff });
+  const updateHasYdocsNewerThanLatestRevision = useCallback((hasYdocsNewerThanLatestRevision: boolean) => {
+    swrResponse.mutate({ ...swrResponse.data, hasYdocsNewerThanLatestRevision });
   }, [swrResponse]);
 
   const updateAwarenessStateSize = useCallback((awarenessStateSize: number) => {
     swrResponse.mutate({ ...swrResponse.data, awarenessStateSize });
   }, [swrResponse]);
 
-  return {
-    ...swrResponse, updateHasRevisionBodyDiff, updateAwarenessStateSize,
-  };
+  return Object.assign(swrResponse, { updateHasYdocsNewerThanLatestRevision, updateAwarenessStateSize });
 };
 
 export const useSWRMUTxCurrentPageYjsData = (): SWRMutationResponse<CurrentPageYjsData, Error> => {

+ 1 - 1
packages/pluginkit/src/model/growi-plugin-package-data.ts

@@ -1,4 +1,4 @@
-import { GrowiPluginType } from '@growi/core';
+import type { GrowiPluginType } from '@growi/core';
 
 export type GrowiPluginDirective = {
   [key: string]: any,