Просмотр исходного кода

refactor ConfigPubsubDelegator

Yuki Takei 5 лет назад
Родитель
Сommit
898f21b517

+ 2 - 0
src/server/crowi/index.js

@@ -256,6 +256,8 @@ Crowi.prototype.setupConfigManager = async function() {
   if (this.configPubsub != null) {
     this.configPubsub.subscribe();
     this.configManager.setPubsub(this.configPubsub);
+    // add handler
+    this.configPubsub.addMessageHandler(this.configManager);
   }
 };
 

+ 22 - 0
src/server/models/vo/config-pubsub-message.js

@@ -0,0 +1,22 @@
+class ConfigPubsubMessage {
+
+  constructor(eventName, body) {
+    this.eventName = eventName;
+    for (const [key, value] of Object.entries(body)) {
+      this[key] = value;
+    }
+  }
+
+  static parse(messageString) {
+    const body = JSON.parse(messageString);
+
+    if (body.eventName == null) {
+      throw new Error('message body must contain \'eventName\'');
+    }
+
+    return new ConfigPubsubMessage(body.eventName, body);
+  }
+
+}
+
+module.exports = ConfigPubsubMessage;

+ 21 - 25
src/server/service/config-manager.js

@@ -1,4 +1,8 @@
 const logger = require('@alias/logger')('growi:service:ConfigManager');
+
+const ConfigPubsubMessage = require('../models/vo/config-pubsub-message');
+const ConfigPubsubHandlable = require('./config-pubsub/handlable');
+
 const ConfigLoader = require('./config-loader');
 
 const KEYS_FOR_LOCAL_STRATEGY_USE_ONLY_ENV_OPTION = [
@@ -18,9 +22,11 @@ const KEYS_FOR_SAML_USE_ONLY_ENV_OPTION = [
   'security:passport-saml:ABLCRule',
 ];
 
-class ConfigManager {
+class ConfigManager extends ConfigPubsubHandlable {
 
   constructor(configModel) {
+    super();
+
     this.configModel = configModel;
     this.configLoader = new ConfigLoader(this.configModel);
     this.configObject = null;
@@ -49,10 +55,6 @@ class ConfigManager {
    */
   async setPubsub(configPubsub) {
     this.configPubsub = configPubsub;
-    this.configPubsub.addMessageHandler((message) => {
-      logger.debug('Recieved message from publisher', message);
-      this.handleUpdateMessage(message);
-    });
   }
 
   /**
@@ -311,35 +313,29 @@ class ConfigManager {
   }
 
   async publishUpdateMessage(updatedAt) {
-    const message = JSON.stringify({ updatedAt });
+    const configPubsubMessage = new ConfigPubsubMessage('configUpdated', { updatedAt });
 
     try {
-      await this.configPubsub.publish(message);
+      await this.configPubsub.publish(configPubsubMessage);
     }
     catch (e) {
       logger.error('Failed to publish update message with configPubsub: ', e.message);
     }
   }
 
-  async handleUpdateMessage(message) {
-    let parsedMessage;
-    let updatedAt;
-
-    try {
-      // parse JSON
-      parsedMessage = JSON.parse(message);
-
-      if (!(parsedMessage instanceof Object)) {
-        throw new Error('A message could not be parsed to JSON object: ', message);
-      }
+  /**
+   * @inheritdoc
+   */
+  souldHandleConfigPubsubMessage(configPubsubMessage) {
+    const { eventName, updatedAt } = configPubsubMessage;
+    return eventName === 'configUpdated' && updatedAt != null;
+  }
 
-      // parse Date
-      updatedAt = new Date(parsedMessage.updatedAt);
-    }
-    catch (e) {
-      logger.warn(e.message);
-      return;
-    }
+  /**
+   * @inheritdoc
+   */
+  async handleConfigPubsubMessage(configPubsubMessage) {
+    const updatedAt = new Date(configPubsubMessage.updatedAt);
 
     if (this.lastLoadedAt == null || this.lastLoadedAt < updatedAt) {
       logger.info('Reload configs by pubsub notification');

+ 10 - 2
src/server/service/config-pubsub/base.js

@@ -16,11 +16,19 @@ class ConfigPubsubDelegator {
     throw new Error('implement this');
   }
 
-  async publish() {
+  /**
+   * Publish message
+   * @param {ConfigPubsubMessage} configPubsubMessage
+   */
+  async publish(configPubsubMessage) {
     throw new Error('implement this');
   }
 
-  addMessageHandler(handler) {
+  /**
+   * Add message handler
+   * @param {ConfigPubsubHandlable} handlable
+   */
+  addMessageHandler(handlable) {
     throw new Error('implement this');
   }
 

+ 14 - 0
src/server/service/config-pubsub/handlable.js

@@ -0,0 +1,14 @@
+// TODO: make interface with TS
+class ConfigPubsubHandlable {
+
+  souldHandleConfigPubsubMessage(configPubsubMessage) {
+    throw new Error('implement this');
+  }
+
+  async handleConfigPubsubMessage(configPubsubMessage) {
+    throw new Error('implement this');
+  }
+
+}
+
+module.exports = ConfigPubsubHandlable;

+ 39 - 14
src/server/service/config-pubsub/nchan.js

@@ -4,6 +4,7 @@ const path = require('path');
 const axios = require('axios');
 const WebSocketClient = require('websocket').client;
 
+const ConfigPubsubMessage = require('../../models/vo/config-pubsub-message');
 const ConfigPubsubDelegator = require('./base');
 
 
@@ -16,9 +17,13 @@ class NchanDelegator extends ConfigPubsubDelegator {
     this.subscribePath = subscribePath;
 
     this.channelId = channelId;
-    this.messageHandlers = [];
     this.isConnecting = false;
 
+    /**
+     * A list of ConfigPubsubHandler instance
+     */
+    this.handlableList = [];
+
     this.client = null;
     this.connection = null;
   }
@@ -62,26 +67,21 @@ class NchanDelegator extends ConfigPubsubDelegator {
   /**
    * @inheritdoc
    */
-  async publish(message) {
-    logger.debug('Publish message', message);
+  async publish(configPubsubMessage) {
+    logger.debug('Publish message', configPubsubMessage);
     const url = this.constructUrl(this.publishPath).toString();
-    return axios.post(url, message);
+    return axios.post(url, JSON.stringify(configPubsubMessage));
   }
 
   /**
    * @inheritdoc
    */
-  addMessageHandler(handler) {
-    this.messageHandlers.push(handler);
+  addMessageHandler(handlable) {
+    this.handlableList.push(handlable);
 
     if (this.connection != null) {
-      this.connection.on('message', (message) => {
-        if (message.type === 'utf8') {
-          handler(message.utf8Data);
-        }
-        else {
-          logger.warn('Only utf8 message is supported.');
-        }
+      this.connection.on('message', (messageObj) => {
+        this.handleMessage(messageObj, handlable);
       });
     }
   }
@@ -117,12 +117,37 @@ class NchanDelegator extends ConfigPubsubDelegator {
       });
 
       // register all message handlers
-      this.messageHandlers.forEach(handler => this.addMessageHandler(handler));
+      this.handlableList.forEach(handler => this.addMessageHandler(handler));
     });
 
     this.client = client;
   }
 
+  /**
+   * Handle message string with the specified ConfigPubsubHandler
+   *
+   * @see https://github.com/theturtle32/WebSocket-Node/blob/1f7ffba2f7a6f9473bcb39228264380ce2772ba7/docs/WebSocketConnection.md#message
+   *
+   * @param {object} message WebSocket-Node message object
+   * @param {ConfigPubsubHandler} handlable
+   */
+  handleMessage(message, handlable) {
+    if (message.type !== 'utf8') {
+      logger.warn('Only utf8 message is supported.');
+    }
+
+    try {
+      const configPubsubMessage = ConfigPubsubMessage.parse(message.utf8Data);
+
+      if (handlable.souldHandleConfigPubsubMessage(configPubsubMessage)) {
+        handlable.handleConfigPubsubMessage(configPubsubMessage);
+      }
+    }
+    catch (err) {
+      logger.warn('Could not handle a message: ', err.message);
+    }
+  }
+
 }
 
 module.exports = function(crowi) {