Kaynağa Gözat

create yjs connection manager

ryoji-s 2 yıl önce
ebeveyn
işleme
af353e6353

+ 11 - 50
apps/app/src/server/service/socket-io.js

@@ -1,13 +1,12 @@
 import { Server } from 'socket.io';
-import { MongodbPersistence } from 'y-mongodb-provider';
 import { YSocketIO } from 'y-socket.io/dist/server';
-import * as Y from 'yjs';
 
 import loggerFactory from '~/utils/logger';
 
-import { getMongoUri } from '../util/mongoose-utils';
 import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
 
+import YjsConnectionManager from './yjsConnectionManager';
+
 const expressSession = require('express-session');
 const passport = require('passport');
 
@@ -42,6 +41,9 @@ class SocketIoService {
     this.ysocketio = new YSocketIO(this.io);
     this.ysocketio.initialize();
 
+    // create the YjsConnectionManager instance
+    this.yjsConnectionManager = new YjsConnectionManager(this.ysocketio);
+
     // create namespace for admin
     this.adminNamespace = this.io.of('/admin');
 
@@ -162,56 +164,15 @@ class SocketIoService {
   }
 
   setupYjsConnection() {
-    // TODO: move to packages/editor
-    // https://redmine.weseek.co.jp/issues/130773
-    const mdb = new MongodbPersistence(getMongoUri(), {
-      collectionName: 'yjs-writings',
-      flushSize: 100,
-    });
-
     this.io.on('connection', (socket) => {
-      socket.on('sync:ydoc', async({ pageId, initialValue }) => {
-        // get persistent Ydoc data from DB
-        const persistedYdoc = await mdb.getYDoc(pageId);
-        const persistedStateVector = Y.encodeStateVector(persistedYdoc);
-
-        // cleanup document
-        await mdb.flushDocument(pageId);
-
-        // get current Ydoc
-        const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
-
-        if (currentYdoc == null) {
-          throw new Error(`currentYdoc for pageId ${pageId} is undefined.`);
-        }
-
-        const persistedCodeMirrorText = persistedYdoc.getText('codemirror').toString();
-        const currentCodeMirrorText = currentYdoc.getText('codemirror').toString();
-        if (persistedCodeMirrorText === '' && currentCodeMirrorText === '') {
-          currentYdoc.getText('codemirror').insert(0, initialValue);
+      socket.on('ydoc:sync', async({ pageId, initialValue }) => {
+        try {
+          await this.yjsConnectionManager.handleYDocSync(pageId, initialValue);
         }
-
-        // store the new data in db (if there is any: empty update is an array of 0s)
-        const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
-        if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
-          mdb.storeUpdate(pageId, diff);
+        catch (error) {
+          logger.warn(error.message);
+          socket.emit('ydoc:sync:error', 'An error occurred during YDoc synchronization.');
         }
-
-        // send persisted data to the client
-        Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
-
-        // persistent data is also updated when ydoc is updated
-        currentYdoc.on('update', async(update, origin, doc, tr) => {
-          mdb.storeUpdate(pageId, update);
-        });
-
-        // cleanup document when ydoc is destroyed
-        currentYdoc.on('destroy', async(doc) => {
-          await mdb.flushDocument(pageId);
-        });
-
-        // Delete old persistent data
-        persistedYdoc.destroy();
       });
     });
   }

+ 70 - 0
apps/app/src/server/service/yjsConnectionManager.ts

@@ -0,0 +1,70 @@
+import { MongodbPersistence } from 'y-mongodb-provider';
+import { YSocketIO } from 'y-socket.io/dist/server';
+import * as Y from 'yjs';
+
+import { getMongoUri } from '../util/mongoose-utils';
+
+export const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
+export const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
+
+class YjsConnectionManager {
+
+  private ysocketio: YSocketIO;
+
+  private mdb: MongodbPersistence;
+
+  constructor(ysocketio: YSocketIO) {
+    this.ysocketio = ysocketio;
+    this.mdb = new MongodbPersistence(getMongoUri(), {
+      collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
+      flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
+    });
+
+    this.getCurrentYdoc = this.getCurrentYdoc.bind(this);
+  }
+
+  public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
+    const persistedYdoc = await this.mdb.getYDoc(pageId);
+    const persistedStateVector = Y.encodeStateVector(persistedYdoc);
+
+    await this.mdb.flushDocument(pageId);
+
+    const currentYdoc = this.getCurrentYdoc(pageId);
+
+    const persistedCodeMirrorText = persistedYdoc.getText('codemirror').toString();
+    const currentCodeMirrorText = currentYdoc.getText('codemirror').toString();
+
+    if (persistedCodeMirrorText === '' && currentCodeMirrorText === '') {
+      currentYdoc.getText('codemirror').insert(0, initialValue);
+    }
+
+    const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
+
+    if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
+      this.mdb.storeUpdate(pageId, diff);
+    }
+
+    Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
+
+    currentYdoc.on('update', async(update) => {
+      await this.mdb.storeUpdate(pageId, update);
+    });
+
+    currentYdoc.on('destroy', async() => {
+      await this.mdb.flushDocument(pageId);
+    });
+
+    persistedYdoc.destroy();
+  }
+
+  private getCurrentYdoc(pageId: string): Y.Doc {
+    const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
+    if (currentYdoc == null) {
+      throw new Error(`currentYdoc for pageId ${pageId} is undefined.`);
+    }
+    return currentYdoc;
+  }
+
+}
+
+export default YjsConnectionManager;