Просмотр исходного кода

WIP: use reconnecting-websocket

Yuki Takei 5 лет назад
Родитель
Сommit
47f9049d96
4 измененных файлов с 93 добавлено и 123 удалено
  1. 2 1
      package.json
  2. 2 0
      src/server/service/config-pubsub/base.js
  3. 79 50
      src/server/service/config-pubsub/nchan.js
  4. 10 72
      yarn.lock

+ 2 - 1
package.json

@@ -137,6 +137,7 @@
     "passport-twitter": "^1.0.4",
     "react-card-flip": "^1.0.10",
     "react-image-crop": "^8.3.0",
+    "reconnecting-websocket": "^4.4.0",
     "redis": "^3.0.2",
     "rimraf": "^3.0.0",
     "slack-node": "^0.1.8",
@@ -148,7 +149,7 @@
     "unzipper": "^0.10.5",
     "url-join": "^4.0.0",
     "validator": "^12.0.0",
-    "websocket": "^1.0.31",
+    "ws": "^7.3.1",
     "xss": "^1.0.6"
   },
   "devDependencies": {

+ 2 - 0
src/server/service/config-pubsub/base.js

@@ -11,6 +11,8 @@ class ConfigPubsubDelegator {
     if (uri == null) {
       throw new Error('uri must be set');
     }
+
+    this.handlableList = [];
   }
 
   shouldResubscribe() {

+ 79 - 50
src/server/service/config-pubsub/nchan.js

@@ -2,7 +2,9 @@ const logger = require('@alias/logger')('growi:service:config-pubsub:nchan');
 
 const path = require('path');
 const axios = require('axios');
-const WebSocketClient = require('websocket').client;
+// const WebSocketClient = require('websocket').client;
+const WebSocket = require('ws');
+const ReconnectingWebSocket = require('reconnecting-websocket');
 
 const ConfigPubsubMessage = require('../../models/vo/config-pubsub-message');
 const ConfigPubsubDelegator = require('./base');
@@ -17,52 +19,48 @@ class NchanDelegator extends ConfigPubsubDelegator {
     this.subscribePath = subscribePath;
 
     this.channelId = channelId;
-    this.isConnecting = false;
+    // this.isConnecting = false;
 
     /**
      * A list of ConfigPubsubHandler instance
      */
-    this.handlableList = [];
+    this.handlableToEventListenerMap = {};
 
-    this.client = null;
-    this.connection = null;
+    this.socket = null;
+    // this.connection = null;
   }
 
   /**
    * @inheritdoc
    */
   shouldResubscribe() {
-    if (this.connection != null && this.connection.connected) {
-      return false;
-    }
-
-    return !this.isConnecting;
+    return this.socket.readyState === ReconnectingWebSocket.CLOSED;
   }
 
   /**
    * @inheritdoc
    */
   subscribe(forceReconnect = false) {
-    if (forceReconnect) {
-      if (this.connection != null && this.connection.connected) {
-        this.connection.close();
-      }
-    }
+    // if (forceReconnect) {
+    //   if (this.connection != null && this.connection.connected) {
+    //     this.connection.close();
+    //   }
+    // }
 
-    if (this.client != null && this.shouldResubscribe()) {
+    if (this.socket != null && this.shouldResubscribe()) {
       logger.info('The connection to config pubsub server is offline. Try to reconnect...');
     }
 
     // init client
-    if (this.client == null) {
+    if (this.socket == null) {
       this.initClient();
     }
 
     // connect
-    this.isConnecting = true;
-    const url = this.constructUrl(this.subscribePath).toString();
-    logger.debug(`Subscribe to ${url}`);
-    this.client.connect(url.toString());
+    // this.isConnecting = true;
+    // const url = this.constructUrl(this.subscribePath).toString();
+    // logger.debug(`Subscribe to ${url}`);
+    // this.socket.reconnect();
   }
 
   /**
@@ -82,26 +80,42 @@ class NchanDelegator extends ConfigPubsubDelegator {
    * @inheritdoc
    */
   addMessageHandler(handlable) {
+    if (this.socket == null) {
+      logger.error('socket has not initialized yet.');
+      return;
+    }
+
     super.addMessageHandler(handlable);
-    this.registerMessageHandlerToConnection(handlable);
+    this.registerMessageHandlerToSocket(handlable);
   }
 
   /**
    * @inheritdoc
    */
   removeMessageHandler(handlable) {
+    if (this.socket == null) {
+      logger.error('socket has not initialized yet.');
+      return;
+    }
+
     super.removeMessageHandler(handlable);
-    this.subscribe(true);
-  }
 
-  registerMessageHandlerToConnection(handlable) {
-    if (this.connection != null) {
-      this.connection.on('message', (messageObj) => {
-        this.handleMessage(messageObj, handlable);
-      });
+    const eventListener = this.handlableToEventListenerMap[handlable];
+    if (eventListener != null) {
+      this.socket.removeEventListener('message', eventListener);
+      delete this.handlableToEventListenerMap[handlable];
     }
   }
 
+  registerMessageHandlerToSocket(handlable) {
+    const eventListener = (messageObj) => {
+      this.handleMessage(messageObj, handlable);
+    };
+
+    this.socket.addEventListener('message', eventListener);
+    this.handlableToEventListenerMap[handlable] = eventListener;
+  }
+
   constructUrl(basepath) {
     const pathname = this.channelId == null
       ? basepath //                                 /pubsub
@@ -111,32 +125,47 @@ class NchanDelegator extends ConfigPubsubDelegator {
   }
 
   initClient() {
-    const client = new WebSocketClient();
-
-    client.on('connectFailed', (error) => {
-      logger.warn(`Connect Error: ${error.toString()}`);
-      this.isConnecting = false;
+    // const client = new WebSocketClient();
+    const url = this.constructUrl(this.publishPath).toString();
+    const socket = new ReconnectingWebSocket(url, [], {
+      WebSocket,
     });
 
-    client.on('connect', (connection) => {
-      this.isConnecting = false;
-      this.connection = connection;
-
-      logger.info('WebSocket client connected');
-
-      connection.on('error', (error) => {
-        this.isConnecting = false;
-        logger.error(`Connection Error: ${error.toString()}`);
-      });
-      connection.on('close', () => {
-        logger.info('WebSocket connection closed');
-      });
+    // client.on('connectFailed', (error) => {
+    //   logger.warn(`Connect Error: ${error.toString()}`);
+    //   this.isConnecting = false;
+    // });
+    socket.addEventListener('close', () => {
+      logger.info('WebSocket client disconnected');
+    });
+    socket.addEventListener('error', (error) => {
+      logger.error('WebSocket error occured:', error.message);
+    });
 
-      // register all message handlers
-      this.handlableList.forEach(handler => this.registerMessageHandlerToConnection(handler));
+    // client.on('connect', (connection) => {
+    //   this.isConnecting = false;
+    //   this.connection = connection;
+
+    //   logger.info('WebSocket client connected');
+
+    //   connection.on('error', (error) => {
+    //     this.isConnecting = false;
+    //     logger.error(`Connection Error: ${error.toString()}`);
+    //   });
+    //   connection.on('close', () => {
+    //     logger.info('WebSocket connection closed');
+    //   });
+
+    //   // register all message handlers
+    //   this.handlableList.forEach(handler => this.registerMessageHandlerToConnection(handler));
+    // });
+    socket.addEventListener('open', () => {
+      logger.info('WebSocket client connected.');
     });
 
-    this.client = client;
+    this.handlableList.forEach(handlable => this.registerMessageHandlerToSocket(handlable));
+
+    this.socket = socket;
   }
 
   /**

+ 10 - 72
yarn.lock

@@ -4624,14 +4624,6 @@ cyclist@~0.2.2:
   version "0.2.2"
   resolved "https://registry.yarnpkg.com/cyclist/-/cyclist-0.2.2.tgz#1b33792e11e914a2fd6d6ed6447464444e5fa640"
 
-d@1, d@^1.0.1:
-  version "1.0.1"
-  resolved "https://registry.yarnpkg.com/d/-/d-1.0.1.tgz#8698095372d58dbee346ffd0c7093f99f8f9eb5a"
-  integrity sha512-m62ShEObQ39CfralilEQRjH6oAMtNCV1xJyEx5LpRYUVN+EviphDgUc/F3hnYbADmkiNs67Y+3ylmlG7Lnu+FA==
-  dependencies:
-    es5-ext "^0.10.50"
-    type "^1.0.1"
-
 dashdash@^1.12.0, dashdash@^1.14.0:
   version "1.14.1"
   resolved "https://registry.yarnpkg.com/dashdash/-/dashdash-1.14.1.tgz#853cfa0f7cbe2fed5de20326b8dd581035f6e2f0"
@@ -5408,24 +5400,6 @@ es-to-primitive@^1.2.0:
     is-date-object "^1.0.1"
     is-symbol "^1.0.2"
 
-es5-ext@^0.10.35, es5-ext@^0.10.50:
-  version "0.10.53"
-  resolved "https://registry.yarnpkg.com/es5-ext/-/es5-ext-0.10.53.tgz#93c5a3acfdbef275220ad72644ad02ee18368de1"
-  integrity sha512-Xs2Stw6NiNHWypzRTY1MtaG/uJlwCk8kH81920ma8mvN8Xq1gsfhZvpkImLQArw8AHnv8MT2I45J3c0R8slE+Q==
-  dependencies:
-    es6-iterator "~2.0.3"
-    es6-symbol "~3.1.3"
-    next-tick "~1.0.0"
-
-es6-iterator@~2.0.3:
-  version "2.0.3"
-  resolved "https://registry.yarnpkg.com/es6-iterator/-/es6-iterator-2.0.3.tgz#a7de889141a05a94b0854403b2d0a0fbfa98f3b7"
-  integrity sha1-p96IkUGgWpSwhUQDstCg+/qY87c=
-  dependencies:
-    d "1"
-    es5-ext "^0.10.35"
-    es6-symbol "^3.1.1"
-
 es6-object-assign@^1.1.0:
   version "1.1.0"
   resolved "https://registry.yarnpkg.com/es6-object-assign/-/es6-object-assign-1.1.0.tgz#c2c3582656247c39ea107cb1e6652b6f9f24523c"
@@ -5452,14 +5426,6 @@ es6-promisify@^5.0.0:
   dependencies:
     es6-promise "^4.0.3"
 
-es6-symbol@^3.1.1, es6-symbol@~3.1.3:
-  version "3.1.3"
-  resolved "https://registry.yarnpkg.com/es6-symbol/-/es6-symbol-3.1.3.tgz#bad5d3c1bcdac28269f4cb331e431c78ac705d18"
-  integrity sha512-NJ6Yn3FuDinBaBRWl/q5X/s4koRHBrgKAu+yGI6JCBeiu3qrcbJhwT2GeR/EXVfylRk8dpQVJoLEFhK+Mu31NA==
-  dependencies:
-    d "^1.0.1"
-    ext "^1.1.2"
-
 esa-nodejs@^0.0.7:
   version "0.0.7"
   resolved "https://registry.yarnpkg.com/esa-nodejs/-/esa-nodejs-0.0.7.tgz#c4749412605ad430d5da17aa4928291927561b42"
@@ -5939,13 +5905,6 @@ express@^4.16.3:
     utils-merge "1.0.1"
     vary "~1.1.2"
 
-ext@^1.1.2:
-  version "1.4.0"
-  resolved "https://registry.yarnpkg.com/ext/-/ext-1.4.0.tgz#89ae7a07158f79d35517882904324077e4379244"
-  integrity sha512-Key5NIsUxdqKg3vIsdw9dSuXpPCQ297y6wBjL30edxwPgt2E44WcWBZey/ZvUc6sERLTxKdyCu4gZFmUbk1Q7A==
-  dependencies:
-    type "^2.0.0"
-
 extend-shallow@^2.0.1:
   version "2.0.1"
   resolved "https://registry.yarnpkg.com/extend-shallow/-/extend-shallow-2.0.1.tgz#51af7d614ad9a9f610ea1bafbb989d6b1c56890f"
@@ -9792,11 +9751,6 @@ neo-async@^2.6.1:
   resolved "https://registry.yarnpkg.com/neo-async/-/neo-async-2.6.1.tgz#ac27ada66167fa8849a6addd837f6b189ad2081c"
   integrity sha512-iyam8fBuCUpWeKPGpaNMetEocMt364qkCsfL9JuhjXX6dRnguRVOfk2GZaDpPjcOKiiXCPINZC1GczQ7iTq3Zw==
 
-next-tick@~1.0.0:
-  version "1.0.0"
-  resolved "https://registry.yarnpkg.com/next-tick/-/next-tick-1.0.0.tgz#ca86d1fe8828169b0120208e3dc8424b9db8342c"
-  integrity sha1-yobR/ogoFpsBICCOPchCS524NCw=
-
 nice-try@^1.0.4:
   version "1.0.4"
   resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.4.tgz#d93962f6c52f2c1558c0fbda6d512819f1efe1c4"
@@ -12223,6 +12177,11 @@ realpath-native@^1.1.0:
   dependencies:
     util.promisify "^1.0.0"
 
+reconnecting-websocket@^4.4.0:
+  version "4.4.0"
+  resolved "https://registry.yarnpkg.com/reconnecting-websocket/-/reconnecting-websocket-4.4.0.tgz#3b0e5b96ef119e78a03135865b8bb0af1b948783"
+  integrity sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng==
+
 redent@^1.0.0:
   version "1.0.0"
   resolved "https://registry.yarnpkg.com/redent/-/redent-1.0.0.tgz#cf916ab1fd5f1f16dfb20822dd6ec7f730c2afde"
@@ -14593,16 +14552,6 @@ type-is@~1.6.16:
     media-typer "0.3.0"
     mime-types "~2.1.18"
 
-type@^1.0.1:
-  version "1.2.0"
-  resolved "https://registry.yarnpkg.com/type/-/type-1.2.0.tgz#848dd7698dafa3e54a6c479e759c4bc3f18847a0"
-  integrity sha512-+5nt5AAniqsCnu2cEQQdpzCAh33kVx8n0VoFidKpB1dVVLAN/F+bgVOqOJqOnEnrhp222clB5p3vUlD+1QAnfg==
-
-type@^2.0.0:
-  version "2.0.0"
-  resolved "https://registry.yarnpkg.com/type/-/type-2.0.0.tgz#5f16ff6ef2eb44f260494dae271033b29c09a9c3"
-  integrity sha512-KBt58xCHry4Cejnc2ISQAF7QY+ORngsWfxezO68+12hKV6lQY8P/psIkcbjeHWn7MqcgciWJyCCevFMJdIXpow==
-
 typed-styles@^0.0.7:
   version "0.0.7"
   resolved "https://registry.yarnpkg.com/typed-styles/-/typed-styles-0.0.7.tgz#93392a008794c4595119ff62dde6809dbc40a3d9"
@@ -15250,17 +15199,6 @@ webpack@^4.39.3:
     watchpack "^1.6.0"
     webpack-sources "^1.4.1"
 
-websocket@^1.0.31:
-  version "1.0.31"
-  resolved "https://registry.yarnpkg.com/websocket/-/websocket-1.0.31.tgz#e5d0f16c3340ed87670e489ecae6144c79358730"
-  integrity sha512-VAouplvGKPiKFDTeCCO65vYHsyay8DqoBSlzIO3fayrfOgU94lQN5a1uWVnFrMLceTJw/+fQXR5PGbUVRaHshQ==
-  dependencies:
-    debug "^2.2.0"
-    es5-ext "^0.10.50"
-    nan "^2.14.0"
-    typedarray-to-buffer "^3.1.5"
-    yaeti "^0.0.6"
-
 whatwg-encoding@^1.0.1, whatwg-encoding@^1.0.5:
   version "1.0.5"
   resolved "https://registry.yarnpkg.com/whatwg-encoding/-/whatwg-encoding-1.0.5.tgz#5abacf777c32166a51d085d6b4f3e7d27113ddb0"
@@ -15435,6 +15373,11 @@ ws@^7.0.0:
   resolved "https://registry.yarnpkg.com/ws/-/ws-7.2.1.tgz#03ed52423cd744084b2cf42ed197c8b65a936b8e"
   integrity sha512-sucePNSafamSKoOqoNfBd8V0StlkzJKL2ZAhGQinCfNQ+oacw+Pk7lcdAElecBF2VkLNZRiIb5Oi1Q5lVUVt2A==
 
+ws@^7.3.1:
+  version "7.3.1"
+  resolved "https://registry.yarnpkg.com/ws/-/ws-7.3.1.tgz#d0547bf67f7ce4f12a72dfe31262c68d7dc551c8"
+  integrity sha512-D3RuNkynyHmEJIpD2qrgVkc9DQ23OrN/moAwZX4L8DfvszsJxpjQuUq3LMx6HoYji9fbIOBY18XWBsAux1ZZUA==
+
 ws@~3.3.1:
   version "3.3.3"
   resolved "https://registry.yarnpkg.com/ws/-/ws-3.3.3.tgz#f1cf84fe2d5e901ebce94efaece785f187a228f2"
@@ -15581,11 +15524,6 @@ y18n@^3.2.1:
   version "4.0.0"
   resolved "https://registry.yarnpkg.com/y18n/-/y18n-4.0.0.tgz#95ef94f85ecc81d007c264e190a120f0a3c8566b"
 
-yaeti@^0.0.6:
-  version "0.0.6"
-  resolved "https://registry.yarnpkg.com/yaeti/-/yaeti-0.0.6.tgz#f26f484d72684cf42bedfb76970aa1608fbf9577"
-  integrity sha1-8m9ITXJoTPQr7ft2lwqhYI+/lXc=
-
 yallist@^2.1.2:
   version "2.1.2"
   resolved "https://registry.yarnpkg.com/yallist/-/yallist-2.1.2.tgz#1c11f9218f076089a47dd512f93c6699a6a81d52"