Explorar o código

add setup yjs connection

ryoji-s %!s(int64=2) %!d(string=hai) anos
pai
achega
3964028d70
Modificáronse 1 ficheiros con 68 adicións e 0 borrados
  1. 68 0
      apps/app/src/server/service/socket-io.js

+ 68 - 0
apps/app/src/server/service/socket-io.js

@@ -1,6 +1,11 @@
 import { Server } from 'socket.io';
+import { MongodbPersistence } from 'y-mongodb-provider';
+import { YSocketIO } from 'y-socket.io/dist/server';
+import * as Y from 'yjs';
 
 import loggerFactory from '~/utils/logger';
+
+import { getMongoUri } from '../util/mongoose-utils';
 import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
 
 const expressSession = require('express-session');
@@ -33,6 +38,10 @@ class SocketIoService {
     });
     this.io.attach(server);
 
+    // create the YScoketIO instance
+    this.ysocketio = new YSocketIO(this.io);
+    this.ysocketio.initialize();
+
     // create namespace for admin
     this.adminNamespace = this.io.of('/admin');
 
@@ -47,6 +56,7 @@ class SocketIoService {
 
     await this.setupLoginedUserRoomsJoinOnConnection();
     await this.setupDefaultSocketJoinRoomsEventHandler();
+    await this.setupYjsConnection();
   }
 
   getDefaultSocket() {
@@ -151,6 +161,64 @@ class SocketIoService {
     });
   }
 
+  setupYjsConnection() {
+    // TODO: move to packages/editor
+    // https://redmine.weseek.co.jp/issues/130773
+    const mdb = new MongodbPersistence(getMongoUri(), {
+      collectionName: 'yjs-writings',
+      flushSize: 100,
+    });
+
+    this.io.on('connection', (socket) => {
+      socket.on('sync:ydoc', async({ pageId, initialValue }) => {
+        // get persistent Ydoc data from DB
+        const persistedYdoc = await mdb.getYDoc(pageId);
+        const persistedStateVector = Y.encodeStateVector(persistedYdoc);
+
+        // cleanup document
+        await mdb.flushDocument(pageId);
+
+        // get current Ydoc
+        const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
+
+        // TODO: add error handling
+        // https://redmine.weseek.co.jp/issues/130773
+        if (currentYdoc == null) {
+          logger.debug('currentYdoc is undefined');
+          return;
+        }
+
+        const persistedCodeMirrorText = persistedYdoc.getText('codemirror').toString();
+        const currentCodeMirrorText = currentYdoc.getText('codemirror').toString();
+        if (persistedCodeMirrorText === '' && currentCodeMirrorText === '') {
+          currentYdoc.insert(0, initialValue);
+        }
+
+        // store the new data in db (if there is any: empty update is an array of 0s)
+        const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
+        if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
+          mdb.storeUpdate(pageId, diff);
+        }
+
+        // send persisted data to the client
+        Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
+
+        // persistent data is also updated when ydoc is updated
+        currentYdoc.on('update', async(update, origin, doc, tr) => {
+          mdb.storeUpdate(pageId, update);
+        });
+
+        // cleanup document when ydoc is destroyed
+        currentYdoc.on('destroy', async(doc) => {
+          await mdb.flushDocument(pageId);
+        });
+
+        // Delete old persistent data
+        persistedYdoc.destroy();
+      });
+    });
+  }
+
   async checkConnectionLimitsForAdmin(socket, next) {
     const namespaceName = socket.nsp.name;