| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- const logger = require('@alias/logger')('growi:service:config-pubsub:nchan');
- const path = require('path');
- const axios = require('axios');
- const WebSocketClient = require('websocket').client;
- const ConfigPubsubMessage = require('../../models/vo/config-pubsub-message');
- const ConfigPubsubDelegator = require('./base');
- class NchanDelegator extends ConfigPubsubDelegator {
- constructor(uri, publishPath, subscribePath, channelId) {
- super(uri);
- this.publishPath = publishPath;
- this.subscribePath = subscribePath;
- this.channelId = channelId;
- this.isConnecting = false;
- /**
- * A list of ConfigPubsubHandler instance
- */
- this.handlableList = [];
- this.client = null;
- this.connection = null;
- }
- /**
- * @inheritdoc
- */
- shouldResubscribe() {
- if (this.connection != null && this.connection.connected) {
- return false;
- }
- return !this.isConnecting;
- }
- /**
- * @inheritdoc
- */
- subscribe(forceReconnect = false) {
- if (forceReconnect) {
- if (this.connection != null && this.connection.connected) {
- this.connection.close();
- }
- }
- // init client
- if (this.client == null) {
- this.initClient();
- }
- if (this.shouldResubscribe()) {
- logger.info('The connection to config pubsub server is offline. Try to reconnect...');
- }
- // connect
- this.isConnecting = true;
- const url = this.constructUrl(this.subscribePath).toString();
- this.client.connect(url.toString());
- }
- /**
- * @inheritdoc
- */
- async publish(configPubsubMessage) {
- await super.publish(configPubsubMessage);
- logger.debug('Publish message', configPubsubMessage);
- const url = this.constructUrl(this.publishPath).toString();
- return axios.post(url, JSON.stringify(configPubsubMessage));
- }
- /**
- * @inheritdoc
- */
- addMessageHandler(handlable) {
- super.addMessageHandler(handlable);
- this.handlableList.push(handlable);
- if (this.connection != null) {
- this.connection.on('message', (messageObj) => {
- this.handleMessage(messageObj, handlable);
- });
- }
- }
- constructUrl(basepath) {
- const pathname = this.channelId == null
- ? basepath // /pubsub
- : path.join(basepath, this.channelId); // /pubsub/my-channel-id
- return new URL(pathname, this.uri);
- }
- initClient() {
- const client = new WebSocketClient();
- client.on('connectFailed', (error) => {
- logger.warn(`Connect Error: ${error.toString()}`);
- this.isConnecting = false;
- });
- client.on('connect', (connection) => {
- this.isConnecting = false;
- this.connection = connection;
- logger.info('WebSocket client connected');
- connection.on('error', (error) => {
- this.isConnecting = false;
- logger.error(`Connection Error: ${error.toString()}`);
- });
- connection.on('close', () => {
- logger.info('WebSocket connection closed');
- });
- // register all message handlers
- this.handlableList.forEach(handler => this.addMessageHandler(handler));
- });
- this.client = client;
- }
- /**
- * Handle message string with the specified ConfigPubsubHandler
- *
- * @see https://github.com/theturtle32/WebSocket-Node/blob/1f7ffba2f7a6f9473bcb39228264380ce2772ba7/docs/WebSocketConnection.md#message
- *
- * @param {object} message WebSocket-Node message object
- * @param {ConfigPubsubHandler} handlable
- */
- handleMessage(message, handlable) {
- if (message.type !== 'utf8') {
- logger.warn('Only utf8 message is supported.');
- }
- try {
- const configPubsubMessage = ConfigPubsubMessage.parse(message.utf8Data);
- // check uid
- if (configPubsubMessage.publisherUid === this.uid) {
- logger.debug(`Skip processing by ${handlable.constructor.name} because this message is sent by the publisher itself:`, `from ${this.uid}`);
- return;
- }
- // check shouldHandleConfigPubsubMessage
- const shouldHandle = handlable.shouldHandleConfigPubsubMessage(configPubsubMessage);
- logger.debug(`${handlable.constructor.name}.shouldHandleConfigPubsubMessage(`, configPubsubMessage, `) => ${shouldHandle}`);
- if (shouldHandle) {
- handlable.handleConfigPubsubMessage(configPubsubMessage);
- }
- }
- catch (err) {
- logger.warn('Could not handle a message: ', err.message);
- }
- }
- }
- module.exports = function(crowi) {
- const { configManager } = crowi;
- const uri = configManager.getConfig('crowi', 'app:nchanUri');
- // when nachan server URI is not set
- if (uri == null) {
- logger.warn('NCHAN_URI is not specified.');
- return;
- }
- const publishPath = configManager.getConfig('crowi', 'configPubsub:nchan:publishPath');
- const subscribePath = configManager.getConfig('crowi', 'configPubsub:nchan:subscribePath');
- return new NchanDelegator(uri, publishPath, subscribePath);
- };
|