| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- import { Server } from 'socket.io';
- import { MongodbPersistence } from 'y-mongodb-provider';
- import { YSocketIO } from 'y-socket.io/dist/server';
- import * as Y from 'yjs';
- import loggerFactory from '~/utils/logger';
- import { getMongoUri } from '../util/mongoose-utils';
- import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
- const expressSession = require('express-session');
- const passport = require('passport');
- const logger = loggerFactory('growi:service:socket-io');
- /**
- * Serve socket.io for server-to-client messaging
- */
- class SocketIoService {
- constructor(crowi) {
- this.crowi = crowi;
- this.configManager = crowi.configManager;
- this.guestClients = new Set();
- }
- get isInitialized() {
- return (this.io != null);
- }
- // Since the Order is important, attachServer() should be async
- async attachServer(server) {
- this.io = new Server({
- transports: ['websocket'],
- serveClient: false,
- });
- this.io.attach(server);
- // create the YScoketIO instance
- this.ysocketio = new YSocketIO(this.io);
- this.ysocketio.initialize();
- // create namespace for admin
- this.adminNamespace = this.io.of('/admin');
- // setup middlewares
- // !!CAUTION!! -- ORDER IS IMPORTANT
- await this.setupSessionMiddleware();
- await this.setupLoginRequiredMiddleware();
- await this.setupAdminRequiredMiddleware();
- await this.setupCheckConnectionLimitsMiddleware();
- await this.setupStoreGuestIdEventHandler();
- await this.setupLoginedUserRoomsJoinOnConnection();
- await this.setupDefaultSocketJoinRoomsEventHandler();
- await this.setupYjsConnection();
- }
- getDefaultSocket() {
- if (this.io == null) {
- throw new Error('Http server has not attached yet.');
- }
- return this.io.sockets;
- }
- getAdminSocket() {
- if (this.io == null) {
- throw new Error('Http server has not attached yet.');
- }
- return this.adminNamespace;
- }
- /**
- * use passport session
- * @see https://socket.io/docs/v4/middlewares/#Compatibility-with-Express-middleware
- */
- setupSessionMiddleware() {
- const wrap = middleware => (socket, next) => middleware(socket.request, {}, next);
- this.io.use(wrap(expressSession(this.crowi.sessionConfig)));
- this.io.use(wrap(passport.initialize()));
- this.io.use(wrap(passport.session()));
- // express and passport session on main socket doesn't shared to child namespace socket
- // need to define the session for specific namespace
- this.getAdminSocket().use(wrap(expressSession(this.crowi.sessionConfig)));
- this.getAdminSocket().use(wrap(passport.initialize()));
- this.getAdminSocket().use(wrap(passport.session()));
- }
- /**
- * use loginRequired middleware
- */
- setupLoginRequiredMiddleware() {
- const loginRequired = require('../middlewares/login-required')(this.crowi, true, (req, res, next) => {
- next(new Error('Login is required to connect.'));
- });
- // convert Connect/Express middleware to Socket.io middleware
- this.io.use((socket, next) => {
- loginRequired(socket.request, {}, next);
- });
- }
- /**
- * use adminRequired middleware
- */
- setupAdminRequiredMiddleware() {
- const adminRequired = require('../middlewares/admin-required')(this.crowi, (req, res, next) => {
- next(new Error('Admin priviledge is required to connect.'));
- });
- // convert Connect/Express middleware to Socket.io middleware
- this.getAdminSocket().use((socket, next) => {
- adminRequired(socket.request, {}, next);
- });
- }
- /**
- * use checkConnectionLimits middleware
- */
- setupCheckConnectionLimitsMiddleware() {
- this.getAdminSocket().use(this.checkConnectionLimitsForAdmin.bind(this));
- this.getDefaultSocket().use(this.checkConnectionLimitsForGuest.bind(this));
- this.getDefaultSocket().use(this.checkConnectionLimits.bind(this));
- }
- setupStoreGuestIdEventHandler() {
- this.io.on('connection', (socket) => {
- if (socket.request.user == null) {
- this.guestClients.add(socket.id);
- socket.on('disconnect', () => {
- this.guestClients.delete(socket.id);
- });
- }
- });
- }
- setupLoginedUserRoomsJoinOnConnection() {
- this.io.on('connection', (socket) => {
- const user = socket.request.user;
- if (user == null) {
- logger.debug('Socket io: An anonymous user has connected');
- return;
- }
- socket.join(getRoomNameWithId(RoomPrefix.USER, user._id));
- });
- }
- setupDefaultSocketJoinRoomsEventHandler() {
- this.io.on('connection', (socket) => {
- // set event handlers for joining rooms
- socket.on('join:page', ({ pageId }) => {
- socket.join(getRoomNameWithId(RoomPrefix.PAGE, pageId));
- });
- });
- }
- setupYjsConnection() {
- // TODO: move to packages/editor
- // https://redmine.weseek.co.jp/issues/130773
- const mdb = new MongodbPersistence(getMongoUri(), {
- collectionName: 'yjs-writings',
- flushSize: 100,
- });
- this.io.on('connection', (socket) => {
- socket.on('sync:ydoc', async({ pageId, initialValue }) => {
- // get persistent Ydoc data from DB
- const persistedYdoc = await mdb.getYDoc(pageId);
- const persistedStateVector = Y.encodeStateVector(persistedYdoc);
- // cleanup document
- await mdb.flushDocument(pageId);
- // get current Ydoc
- const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
- // TODO: add error handling
- // https://redmine.weseek.co.jp/issues/130773
- if (currentYdoc == null) {
- logger.debug('currentYdoc is undefined');
- return;
- }
- const persistedCodeMirrorText = persistedYdoc.getText('codemirror').toString();
- const currentCodeMirrorText = currentYdoc.getText('codemirror').toString();
- if (persistedCodeMirrorText === '' && currentCodeMirrorText === '') {
- currentYdoc.insert(0, initialValue);
- }
- // store the new data in db (if there is any: empty update is an array of 0s)
- const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
- if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
- mdb.storeUpdate(pageId, diff);
- }
- // send persisted data to the client
- Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
- // persistent data is also updated when ydoc is updated
- currentYdoc.on('update', async(update, origin, doc, tr) => {
- mdb.storeUpdate(pageId, update);
- });
- // cleanup document when ydoc is destroyed
- currentYdoc.on('destroy', async(doc) => {
- await mdb.flushDocument(pageId);
- });
- // Delete old persistent data
- persistedYdoc.destroy();
- });
- });
- }
- async checkConnectionLimitsForAdmin(socket, next) {
- const namespaceName = socket.nsp.name;
- if (namespaceName === '/admin') {
- const clients = await this.getAdminSocket().allSockets();
- const clientsCount = clients.length;
- logger.debug('Current count of clients for \'/admin\':', clientsCount);
- const limit = this.configManager.getConfig('crowi', 's2cMessagingPubsub:connectionsLimitForAdmin');
- if (limit <= clientsCount) {
- const msg = `The connection was refused because the current count of clients for '/admin' is ${clientsCount} and exceeds the limit`;
- logger.warn(msg);
- next(new Error(msg));
- return;
- }
- }
- next();
- }
- async checkConnectionLimitsForGuest(socket, next) {
- if (socket.request.user == null) {
- const clientsCount = this.guestClients.size;
- logger.debug('Current count of clients for guests:', clientsCount);
- const limit = this.configManager.getConfig('crowi', 's2cMessagingPubsub:connectionsLimitForGuest');
- if (limit <= clientsCount) {
- const msg = `The connection was refused because the current count of clients for guests is ${clientsCount} and exceeds the limit`;
- logger.warn(msg);
- next(new Error(msg));
- return;
- }
- }
- next();
- }
- /**
- * @see https://socket.io/docs/server-api/#socket-client
- */
- async checkConnectionLimits(socket, next) {
- // exclude admin
- const namespaceName = socket.nsp.name;
- if (namespaceName === '/admin') {
- next();
- }
- const clients = await this.getDefaultSocket().allSockets();
- const clientsCount = clients.length;
- logger.debug('Current count of clients for \'/\':', clientsCount);
- const limit = this.configManager.getConfig('crowi', 's2cMessagingPubsub:connectionsLimit');
- if (limit <= clientsCount) {
- const msg = `The connection was refused because the current count of clients for '/' is ${clientsCount} and exceeds the limit`;
- logger.warn(msg);
- next(new Error(msg));
- return;
- }
- next();
- }
- }
- module.exports = SocketIoService;
|