Sfoglia il codice sorgente

impl middleware to reconnect

Yuki Takei 5 anni fa
parent
commit
9d53b92c87

+ 2 - 0
src/server/crowi/express-init.js

@@ -20,6 +20,7 @@ module.exports = function(crowi, app) {
 
   const registerSafeRedirect = require('../middlewares/safe-redirect')();
   const injectCurrentuserToLocalvars = require('../middlewares/inject-currentuser-to-localvars')();
+  const autoReconnectToConfigPubsub = require('../middlewares/auto-reconnect-to-config-pubsub')(crowi);
   const { listLocaleIds } = require('@commons/util/locale-utils');
 
   const avoidSessionRoutes = require('../routes/avoid-session-routes');
@@ -117,6 +118,7 @@ module.exports = function(crowi, app) {
 
   app.use(registerSafeRedirect);
   app.use(injectCurrentuserToLocalvars);
+  app.use(autoReconnectToConfigPubsub);
 
   const middlewares = require('../util/middlewares')(crowi, app);
   app.use(middlewares.swigFilters(swig));

+ 11 - 0
src/server/middlewares/auto-reconnect-to-config-pubsub.js

@@ -0,0 +1,11 @@
+module.exports = (crowi) => {
+  const { configPubsub } = crowi;
+
+  return async(req, res, next) => {
+    if (configPubsub != null) {
+      configPubsub.subscribe();
+    }
+
+    return next();
+  };
+};

+ 26 - 18
src/server/service/config-pubsub/nchan.js

@@ -17,6 +17,7 @@ class NchanDelegator extends ConfigPubsubDelegator {
 
     this.channelId = channelId;
     this.messageHandlers = [];
+    this.isReconnecting = false;
 
     this.client = null;
     this.connection = null;
@@ -25,28 +26,29 @@ class NchanDelegator extends ConfigPubsubDelegator {
   /**
    * @inheritdoc
    */
-  subscribe(forceReconnect) {
+  subscribe(forceReconnect = false) {
     if (forceReconnect) {
       if (this.connection != null && this.connection.connected) {
         this.connection.close();
       }
     }
 
-    const pathname = this.channelId == null
-      ? this.subscribePath //                               /sub
-      : path.join(this.subscribePath, this.channelId); //   /sub/my-channel-id
-
-    const subscribeUri = new URL(pathname, this.uri);
-
     // init client and connect
     if (this.client == null) {
       this.initClient();
-      this.client.connect(subscribeUri.toString());
+      const url = this.constructUrl(this.subscribePath).toString();
+      this.client.connect(url.toString());
+
+      return;
     }
 
     // reconnect
-    if (this.connection != null && !this.connection.connected) {
-      this.client.connect(subscribeUri.toString());
+    if (this.connection != null && !this.connection.connected && !this.isReconnecting) {
+      logger.info('The connection to config pubsub server is offline. Try to reconnect...');
+      this.isReconnecting = true;
+
+      const url = this.constructUrl(this.subscribePath).toString();
+      this.client.connect(url.toString());
     }
   }
 
@@ -54,15 +56,9 @@ class NchanDelegator extends ConfigPubsubDelegator {
    * @inheritdoc
    */
   async publish(message) {
-    const pathname = this.channelId == null
-      ? this.publishPath //                                 /pub
-      : path.join(this.subscribePath, this.channelId); //   /pub/my-channel-id
-
-    const publishUri = new URL(pathname, this.uri);
-
     logger.debug('Publish message', message);
-
-    return axios.post(publishUri.toString(), message);
+    const url = this.constructUrl(this.publishPath).toString();
+    return axios.post(url, message);
   }
 
   /**
@@ -83,17 +79,29 @@ class NchanDelegator extends ConfigPubsubDelegator {
     }
   }
 
+  constructUrl(basepath) {
+    const pathname = this.channelId == null
+      ? basepath //                                 /pubsub
+      : path.join(basepath, this.channelId); //     /pubsub/my-channel-id
+
+    return new URL(pathname, this.uri);
+  }
+
   initClient() {
     const client = new WebSocketClient();
 
     client.on('connectFailed', (error) => {
       logger.warn(`Connect Error: ${error.toString()}`);
+      this.isReconnecting = false;
     });
 
     client.on('connect', (connection) => {
+      this.isReconnecting = false;
+
       logger.info('WebSocket client connected');
 
       connection.on('error', (error) => {
+        this.isReconnecting = false;
         logger.error(`Connection Error: ${error.toString()}`);
       });
       connection.on('close', () => {