Yuki Takei 1 год назад
Родитель
Сommit
1c6e558531

+ 1 - 0
apps/app/config/logger/config.dev.js

@@ -17,6 +17,7 @@ module.exports = {
   'growi:middleware:safe-redirect': 'debug',
   'growi:service:PassportService': 'debug',
   'growi:service:s2s-messaging:*': 'debug',
+  'growi:service:yjs': 'debug',
   // 'growi:service:socket-io': 'debug',
   // 'growi:service:ConfigManager': 'debug',
   // 'growi:service:mail': 'debug',

+ 143 - 73
apps/app/src/server/service/yjs.ts

@@ -3,13 +3,14 @@ import { GlobalSocketEventName } from '@growi/core';
 import mongoose from 'mongoose';
 import type { Server } from 'socket.io';
 import { MongodbPersistence } from 'y-mongodb-provider';
-import type { Document } from 'y-socket.io/dist/server';
+import type { Document, Persistence } from 'y-socket.io/dist/server';
 import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
 import * as Y from 'yjs';
 
 import { SocketEventName } from '~/interfaces/websocket';
 import loggerFactory from '~/utils/logger';
 
+import { Revision } from '../models/revision';
 import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
 
 
@@ -20,19 +21,19 @@ const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
 const logger = loggerFactory('growi:service:yjs');
 
 
-export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
-  const result = ydocId.match(/yjs\/(.*)/);
-  return result?.[1];
-};
+// export const extractPageIdFromYdocId = (ydocId: string): string | undefined => {
+//   const result = ydocId.match(/yjs\/(.*)/);
+//   return result?.[1];
+// };
 
 export interface IYjsService {
   hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean>;
-  handleYDocSync(pageId: string, initialValue: string): Promise<void>;
+  // handleYDocSync(pageId: string, initialValue: string): Promise<void>;
   handleYDocUpdate(pageId: string, newValue: string): Promise<void>;
   getCurrentYdoc(pageId: string): Ydoc | undefined;
-  getPersistedYdoc(pageId: string): Promise<Y.Doc>;
 }
 
+
 class YjsService implements IYjsService {
 
   private ysocketio: YSocketIO;
@@ -40,11 +41,8 @@ class YjsService implements IYjsService {
   private mdb: MongodbPersistence;
 
   constructor(io: Server) {
-    const ysocketio = new YSocketIO(io);
-    ysocketio.initialize();
-    this.ysocketio = ysocketio;
 
-    this.mdb = new MongodbPersistence(
+    const mdb = new MongodbPersistence(
       // ignore TS2345: Argument of type '{ client: any; db: any; }' is not assignable to parameter of type 'string'.
       // eslint-disable-next-line @typescript-eslint/ban-ts-comment
       // @ts-ignore
@@ -60,42 +58,118 @@ class YjsService implements IYjsService {
         flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
       },
     );
+    this.mdb = mdb;
 
-    this.createIndexes();
+    const ysocketio = new YSocketIO(io);
+    this.injectPersistence(ysocketio, mdb);
+    ysocketio.initialize();
+    this.ysocketio = ysocketio;
 
-    io.on('connection', (socket) => {
+    this.createIndexes();
 
-      ysocketio.on('awareness-update', async(doc: Document) => {
-        const pageId = extractPageIdFromYdocId(doc.name);
+    ysocketio.on('document-loaded', async(doc: Document) => {
+      // const pageId = extractPageIdFromYdocId(doc.name);
+      const pageId = doc.name;
 
-        if (pageId == null) return;
+      if (pageId != null && !await this.hasYdocsNewerThanLatestRevision(pageId)) {
+        logger.debug(`YDoc for the page ('${pageId}') is initialized by the latest revision body`);
 
-        const awarenessStateSize = doc.awareness.states.size;
+        const revision = await Revision.findOne({ pageId });
+        if (revision?.body != null) {
+          doc.getText('codemirror').insert(0, revision.body);
+        }
+      }
+    });
 
-        // Triggered when awareness changes
-        io
-          .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
-          .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
+    // io.on('connection', (socket) => {
+
+    //   ysocketio.on('awareness-update', async(doc: Document) => {
+    //     const pageId = extractPageIdFromYdocId(doc.name);
+
+    //     if (pageId == null) return;
+
+    //     const awarenessStateSize = doc.awareness.states.size;
+
+    //     // Triggered when awareness changes
+    //     io
+    //       .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
+    //       .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
+
+    //     // Triggered when the last user leaves the editor
+    //     if (awarenessStateSize === 0) {
+    //       const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
+    //       io
+    //         .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
+    //         .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
+    //     }
+    //   });
+
+    //   socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
+    //     try {
+    //       await this.handleYDocSync(pageId, initialValue);
+    //     }
+    //     catch (error) {
+    //       logger.warn(error.message);
+    //       socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
+    //     }
+    //   });
+    // });
+  }
 
-        // Triggered when the last user leaves the editor
-        if (awarenessStateSize === 0) {
-          const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
-          io
-            .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
-            .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
+  private injectPersistence(ysocketio: YSocketIO, mdb: MongodbPersistence): void {
+    const persistece: Persistence = {
+      provider: mdb,
+      bindState: async(docName, ydoc) => {
+        logger.debug('bindState', { docName });
+
+        const persistedYdoc = await mdb.getYDoc(docName);
+        // get the state vector so we can just store the diffs between client and server
+        const persistedStateVector = Y.encodeStateVector(persistedYdoc);
+
+        /* we could also retrieve that sv with a mdb function
+         *  however this takes longer;
+         *  it would also flush the document (which merges all updates into one)
+         *   thats prob a good thing, which is why we always do this on document close (see writeState)
+         */
+        // const persistedStateVector = await mdb.getStateVector(docName);
+
+        // in the default code the following value gets saved in the db
+        //  this however leads to the case that multiple complete Y.Docs are saved in the db (https://github.com/fadiquader/y-mongodb/issues/7)
+        // const newUpdates = Y.encodeStateAsUpdate(ydoc);
+
+        // better just get the differences and save those:
+        const diff = Y.encodeStateAsUpdate(ydoc, persistedStateVector);
+
+        // store the new data in db (if there is any: empty update is an array of 0s)
+        if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
+          mdb.storeUpdate(docName, diff);
+          mdb.setMeta(docName, 'updatedAt', Date.now());
         }
-      });
 
-      socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
-        try {
-          await this.handleYDocSync(pageId, initialValue);
-        }
-        catch (error) {
-          logger.warn(error.message);
-          socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
-        }
-      });
-    });
+        // send the persisted data to clients
+        Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
+
+        // store updates of the document in db
+        ydoc.on('update', async(update) => {
+          mdb.storeUpdate(docName, update);
+          mdb.setMeta(docName, 'updatedAt', Date.now());
+        });
+
+        // cleanup some memory
+        persistedYdoc.destroy();
+      },
+      writeState: async(docName) => {
+        logger.debug('writeState', { docName });
+        // This is called when all connections to the document are closed.
+
+        // flush document on close to have the smallest possible database
+        await mdb.flushDocument(docName);
+      },
+    };
+
+    // foce set to private property
+    // eslint-disable-next-line dot-notation
+    ysocketio['persistence'] = persistece;
   }
 
   private async createIndexes(): Promise<void> {
@@ -138,7 +212,6 @@ class YjsService implements IYjsService {
 
   public async hasYdocsNewerThanLatestRevision(pageId: string): Promise<boolean> {
     // get the latest revision createdAt
-    const Revision = mongoose.model<IRevisionHasId>('Revision');
     const result = await Revision
       .findOne(
         // filter
@@ -155,48 +228,50 @@ class YjsService implements IYjsService {
     // count yjs-writings documents with updatedAt > latestRevision.updatedAt
     const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
 
+    logger.debug('hasYdocsNewerThanLatestRevision', { pageId, lastRevisionCreatedAt, ydocUpdatedAt });
+
     return ydocUpdatedAt == null
       ? false
       : ydocUpdatedAt > lastRevisionCreatedAt;
   }
 
-  public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
-    const currentYdoc = this.getCurrentYdoc(pageId);
-    if (currentYdoc == null) {
-      return;
-    }
+  // public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
+  //   const currentYdoc = this.getCurrentYdoc(pageId);
+  //   if (currentYdoc == null) {
+  //     return;
+  //   }
 
-    const persistedYdoc = await this.getPersistedYdoc(pageId);
-    const persistedStateVector = Y.encodeStateVector(persistedYdoc);
+  //   const persistedYdoc = await this.getPersistedYdoc(pageId);
+  //   const persistedStateVector = Y.encodeStateVector(persistedYdoc);
 
-    await this.mdb.flushDocument(pageId);
+  //   await this.mdb.flushDocument(pageId);
 
-    // If no write operation has been performed, insert initial value
-    const clientsSize = persistedYdoc.store.clients.size;
-    if (clientsSize === 0) {
-      currentYdoc.getText('codemirror').insert(0, initialValue);
-    }
+  //   // If no write operation has been performed, insert initial value
+  //   const clientsSize = persistedYdoc.store.clients.size;
+  //   if (clientsSize === 0) {
+  //     currentYdoc.getText('codemirror').insert(0, initialValue);
+  //   }
 
-    const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
+  //   const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
 
-    if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
-      this.mdb.storeUpdate(pageId, diff);
-      this.mdb.setMeta(pageId, 'updatedAt', Date.now());
-    }
+  //   if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
+  //     this.mdb.storeUpdate(pageId, diff);
+  //     this.mdb.setMeta(pageId, 'updatedAt', Date.now());
+  //   }
 
-    Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
+  //   Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
 
-    currentYdoc.on('update', async(update) => {
-      this.mdb.storeUpdate(pageId, update);
-      this.mdb.setMeta(pageId, 'updatedAt', Date.now());
-    });
+  //   currentYdoc.on('update', async(update) => {
+  //     this.mdb.storeUpdate(pageId, update);
+  //     this.mdb.setMeta(pageId, 'updatedAt', Date.now());
+  //   });
 
-    currentYdoc.on('destroy', async() => {
-      this.mdb.flushDocument(pageId);
-    });
+  //   currentYdoc.on('destroy', async() => {
+  //     this.mdb.flushDocument(pageId);
+  //   });
 
-    persistedYdoc.destroy();
-  }
+  //   persistedYdoc.destroy();
+  // }
 
   public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
     // TODO: https://redmine.weseek.co.jp/issues/132775
@@ -213,15 +288,10 @@ class YjsService implements IYjsService {
   }
 
   public getCurrentYdoc(pageId: string): Ydoc | undefined {
-    const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
+    const currentYdoc = this.ysocketio.documents.get(pageId);
     return currentYdoc;
   }
 
-  public async getPersistedYdoc(pageId: string): Promise<Y.Doc> {
-    const persistedYdoc = await this.mdb.getYDoc(pageId);
-    return persistedYdoc;
-  }
-
 }
 
 let _instance: YjsService;

+ 3 - 3
packages/editor/src/client/stores/use-collaborative-editor-mode.ts

@@ -70,13 +70,13 @@ export const useCollaborativeEditorMode = (
 
   // Setup provider
   useEffect(() => {
-    if (provider != null || ydoc == null || socket == null || onEditorsUpdated == null) {
+    if (provider != null || pageId == null || ydoc == null || socket == null || onEditorsUpdated == null) {
       return;
     }
 
     const socketIOProvider = new SocketIOProvider(
-      GLOBAL_SOCKET_NS,
-      `yjs/${pageId}`,
+      '/',
+      pageId,
       ydoc,
       { autoConnect: true },
     );