socket-io.js 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. import { Server } from 'socket.io';
  2. import { MongodbPersistence } from 'y-mongodb-provider';
  3. import { YSocketIO } from 'y-socket.io/dist/server';
  4. import * as Y from 'yjs';
  5. import loggerFactory from '~/utils/logger';
  6. import { getMongoUri } from '../util/mongoose-utils';
  7. import { RoomPrefix, getRoomNameWithId } from '../util/socket-io-helpers';
  8. const expressSession = require('express-session');
  9. const passport = require('passport');
  10. const logger = loggerFactory('growi:service:socket-io');
  11. /**
  12. * Serve socket.io for server-to-client messaging
  13. */
  14. class SocketIoService {
  15. constructor(crowi) {
  16. this.crowi = crowi;
  17. this.configManager = crowi.configManager;
  18. this.guestClients = new Set();
  19. }
  20. get isInitialized() {
  21. return (this.io != null);
  22. }
  23. // Since the Order is important, attachServer() should be async
  24. async attachServer(server) {
  25. this.io = new Server({
  26. transports: ['websocket'],
  27. serveClient: false,
  28. });
  29. this.io.attach(server);
  30. // create the YScoketIO instance
  31. this.ysocketio = new YSocketIO(this.io);
  32. this.ysocketio.initialize();
  33. // create namespace for admin
  34. this.adminNamespace = this.io.of('/admin');
  35. // setup middlewares
  36. // !!CAUTION!! -- ORDER IS IMPORTANT
  37. await this.setupSessionMiddleware();
  38. await this.setupLoginRequiredMiddleware();
  39. await this.setupAdminRequiredMiddleware();
  40. await this.setupCheckConnectionLimitsMiddleware();
  41. await this.setupStoreGuestIdEventHandler();
  42. await this.setupLoginedUserRoomsJoinOnConnection();
  43. await this.setupDefaultSocketJoinRoomsEventHandler();
  44. await this.setupYjsConnection();
  45. }
  46. getDefaultSocket() {
  47. if (this.io == null) {
  48. throw new Error('Http server has not attached yet.');
  49. }
  50. return this.io.sockets;
  51. }
  52. getAdminSocket() {
  53. if (this.io == null) {
  54. throw new Error('Http server has not attached yet.');
  55. }
  56. return this.adminNamespace;
  57. }
  58. /**
  59. * use passport session
  60. * @see https://socket.io/docs/v4/middlewares/#Compatibility-with-Express-middleware
  61. */
  62. setupSessionMiddleware() {
  63. const wrap = middleware => (socket, next) => middleware(socket.request, {}, next);
  64. this.io.use(wrap(expressSession(this.crowi.sessionConfig)));
  65. this.io.use(wrap(passport.initialize()));
  66. this.io.use(wrap(passport.session()));
  67. // express and passport session on main socket doesn't shared to child namespace socket
  68. // need to define the session for specific namespace
  69. this.getAdminSocket().use(wrap(expressSession(this.crowi.sessionConfig)));
  70. this.getAdminSocket().use(wrap(passport.initialize()));
  71. this.getAdminSocket().use(wrap(passport.session()));
  72. }
  73. /**
  74. * use loginRequired middleware
  75. */
  76. setupLoginRequiredMiddleware() {
  77. const loginRequired = require('../middlewares/login-required')(this.crowi, true, (req, res, next) => {
  78. next(new Error('Login is required to connect.'));
  79. });
  80. // convert Connect/Express middleware to Socket.io middleware
  81. this.io.use((socket, next) => {
  82. loginRequired(socket.request, {}, next);
  83. });
  84. }
  85. /**
  86. * use adminRequired middleware
  87. */
  88. setupAdminRequiredMiddleware() {
  89. const adminRequired = require('../middlewares/admin-required')(this.crowi, (req, res, next) => {
  90. next(new Error('Admin priviledge is required to connect.'));
  91. });
  92. // convert Connect/Express middleware to Socket.io middleware
  93. this.getAdminSocket().use((socket, next) => {
  94. adminRequired(socket.request, {}, next);
  95. });
  96. }
  97. /**
  98. * use checkConnectionLimits middleware
  99. */
  100. setupCheckConnectionLimitsMiddleware() {
  101. this.getAdminSocket().use(this.checkConnectionLimitsForAdmin.bind(this));
  102. this.getDefaultSocket().use(this.checkConnectionLimitsForGuest.bind(this));
  103. this.getDefaultSocket().use(this.checkConnectionLimits.bind(this));
  104. }
  105. setupStoreGuestIdEventHandler() {
  106. this.io.on('connection', (socket) => {
  107. if (socket.request.user == null) {
  108. this.guestClients.add(socket.id);
  109. socket.on('disconnect', () => {
  110. this.guestClients.delete(socket.id);
  111. });
  112. }
  113. });
  114. }
  115. setupLoginedUserRoomsJoinOnConnection() {
  116. this.io.on('connection', (socket) => {
  117. const user = socket.request.user;
  118. if (user == null) {
  119. logger.debug('Socket io: An anonymous user has connected');
  120. return;
  121. }
  122. socket.join(getRoomNameWithId(RoomPrefix.USER, user._id));
  123. });
  124. }
  125. setupDefaultSocketJoinRoomsEventHandler() {
  126. this.io.on('connection', (socket) => {
  127. // set event handlers for joining rooms
  128. socket.on('join:page', ({ pageId }) => {
  129. socket.join(getRoomNameWithId(RoomPrefix.PAGE, pageId));
  130. });
  131. });
  132. }
  133. setupYjsConnection() {
  134. // TODO: move to packages/editor
  135. // https://redmine.weseek.co.jp/issues/130773
  136. const mdb = new MongodbPersistence(getMongoUri(), {
  137. collectionName: 'yjs-writings',
  138. flushSize: 100,
  139. });
  140. this.io.on('connection', (socket) => {
  141. socket.on('sync:ydoc', async({ pageId, initialValue }) => {
  142. // get persistent Ydoc data from DB
  143. const persistedYdoc = await mdb.getYDoc(pageId);
  144. const persistedStateVector = Y.encodeStateVector(persistedYdoc);
  145. // cleanup document
  146. await mdb.flushDocument(pageId);
  147. // get current Ydoc
  148. const currentYdoc = this.ysocketio.documents.get(`yjs/${pageId}`);
  149. // TODO: add error handling
  150. // https://redmine.weseek.co.jp/issues/130773
  151. if (currentYdoc == null) {
  152. logger.debug('currentYdoc is undefined');
  153. return;
  154. }
  155. const persistedCodeMirrorText = persistedYdoc.getText('codemirror').toString();
  156. const currentCodeMirrorText = currentYdoc.getText('codemirror').toString();
  157. if (persistedCodeMirrorText === '' && currentCodeMirrorText === '') {
  158. currentYdoc.insert(0, initialValue);
  159. }
  160. // store the new data in db (if there is any: empty update is an array of 0s)
  161. const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
  162. if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
  163. mdb.storeUpdate(pageId, diff);
  164. }
  165. // send persisted data to the client
  166. Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
  167. // persistent data is also updated when ydoc is updated
  168. currentYdoc.on('update', async(update, origin, doc, tr) => {
  169. mdb.storeUpdate(pageId, update);
  170. });
  171. // cleanup document when ydoc is destroyed
  172. currentYdoc.on('destroy', async(doc) => {
  173. await mdb.flushDocument(pageId);
  174. });
  175. // Delete old persistent data
  176. persistedYdoc.destroy();
  177. });
  178. });
  179. }
  180. async checkConnectionLimitsForAdmin(socket, next) {
  181. const namespaceName = socket.nsp.name;
  182. if (namespaceName === '/admin') {
  183. const clients = await this.getAdminSocket().allSockets();
  184. const clientsCount = clients.length;
  185. logger.debug('Current count of clients for \'/admin\':', clientsCount);
  186. const limit = this.configManager.getConfig('crowi', 's2cMessagingPubsub:connectionsLimitForAdmin');
  187. if (limit <= clientsCount) {
  188. const msg = `The connection was refused because the current count of clients for '/admin' is ${clientsCount} and exceeds the limit`;
  189. logger.warn(msg);
  190. next(new Error(msg));
  191. return;
  192. }
  193. }
  194. next();
  195. }
  196. async checkConnectionLimitsForGuest(socket, next) {
  197. if (socket.request.user == null) {
  198. const clientsCount = this.guestClients.size;
  199. logger.debug('Current count of clients for guests:', clientsCount);
  200. const limit = this.configManager.getConfig('crowi', 's2cMessagingPubsub:connectionsLimitForGuest');
  201. if (limit <= clientsCount) {
  202. const msg = `The connection was refused because the current count of clients for guests is ${clientsCount} and exceeds the limit`;
  203. logger.warn(msg);
  204. next(new Error(msg));
  205. return;
  206. }
  207. }
  208. next();
  209. }
  210. /**
  211. * @see https://socket.io/docs/server-api/#socket-client
  212. */
  213. async checkConnectionLimits(socket, next) {
  214. // exclude admin
  215. const namespaceName = socket.nsp.name;
  216. if (namespaceName === '/admin') {
  217. next();
  218. }
  219. const clients = await this.getDefaultSocket().allSockets();
  220. const clientsCount = clients.length;
  221. logger.debug('Current count of clients for \'/\':', clientsCount);
  222. const limit = this.configManager.getConfig('crowi', 's2cMessagingPubsub:connectionsLimit');
  223. if (limit <= clientsCount) {
  224. const msg = `The connection was refused because the current count of clients for '/' is ${clientsCount} and exceeds the limit`;
  225. logger.warn(msg);
  226. next(new Error(msg));
  227. return;
  228. }
  229. next();
  230. }
  231. }
  232. module.exports = SocketIoService;