| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- import path from 'path';
- import WebSocket from 'ws';
- import ReconnectingWebSocket from 'reconnecting-websocket';
- import axios from '~/utils/axios';
- import loggerFactory from '~/utils/logger';
- import S2sMessage from '../../models/vo/s2s-message';
- import { AbstractS2sMessagingService } from './base';
- const logger = loggerFactory('growi:service:s2s-messaging:nchan');
- class NchanDelegator extends AbstractS2sMessagingService {
- /**
- * A list of S2sMessageHandlable instance
- */
- handlableToEventListenerMap: any = {};
- socket: any = null;
- constructor(uri, private publishPath: string, private subscribePath: string, private channelId: any) {
- super(uri);
- }
- /**
- * @inheritdoc
- */
- shouldResubscribe() {
- return this.socket.readyState === ReconnectingWebSocket.CLOSED;
- }
- /**
- * @inheritdoc
- */
- subscribe(forceReconnect = false) {
- if (forceReconnect) {
- logger.info('Force reconnecting is requested. Try to reconnect...');
- }
- else if (this.socket != null && this.shouldResubscribe()) {
- logger.info('The connection to config pubsub server is offline. Try to reconnect...');
- }
- // init client
- if (this.socket == null) {
- this.initClient();
- }
- // connect
- if (forceReconnect || this.shouldResubscribe()) {
- this.socket.reconnect();
- }
- }
- /**
- * @inheritdoc
- */
- async publish(s2sMessage: S2sMessage): Promise<void> {
- await super.publish(s2sMessage);
- const url = this.constructUrl(this.publishPath).toString();
- logger.debug('Publish message', s2sMessage, `to ${url}`);
- return axios.post(url, s2sMessage);
- }
- /**
- * @inheritdoc
- */
- addMessageHandler(handlable) {
- if (this.socket == null) {
- logger.error('socket has not initialized yet.');
- return;
- }
- super.addMessageHandler(handlable);
- this.registerMessageHandlerToSocket(handlable);
- }
- /**
- * @inheritdoc
- */
- removeMessageHandler(handlable) {
- if (this.socket == null) {
- logger.error('socket has not initialized yet.');
- return;
- }
- super.removeMessageHandler(handlable);
- const eventListener = this.handlableToEventListenerMap[handlable];
- if (eventListener != null) {
- this.socket.removeEventListener('message', eventListener);
- delete this.handlableToEventListenerMap[handlable];
- }
- }
- registerMessageHandlerToSocket(handlable) {
- const eventListener = (messageObj) => {
- this.handleMessage(messageObj, handlable);
- };
- this.socket.addEventListener('message', eventListener);
- this.handlableToEventListenerMap[handlable] = eventListener;
- }
- constructUrl(basepath) {
- const pathname = this.channelId == null
- ? basepath // /pubsub
- : path.join(basepath, this.channelId); // /pubsub/my-channel-id
- return new URL(pathname, this.uri);
- }
- initClient() {
- // const client = new WebSocketClient();
- const url = this.constructUrl(this.publishPath).toString();
- const socket = new ReconnectingWebSocket(url, [], {
- WebSocket,
- maxRetries: 3,
- startClosed: true,
- });
- socket.addEventListener('close', () => {
- logger.info('WebSocket client disconnected');
- });
- socket.addEventListener('error', (error) => {
- logger.error('WebSocket error occured:', error.message);
- });
- socket.addEventListener('open', () => {
- logger.info('WebSocket client connected.');
- });
- this.handlableList.forEach(handlable => this.registerMessageHandlerToSocket(handlable));
- this.socket = socket;
- }
- /**
- * Handle message string with the specified S2sMessageHandlable
- *
- * @see https://github.com/theturtle32/WebSocket-Node/blob/1f7ffba2f7a6f9473bcb39228264380ce2772ba7/docs/WebSocketConnection.md#message
- *
- * @param {object} message WebSocket-Node message object
- * @param {S2sMessageHandlable} handlable
- */
- handleMessage(message, handlable) {
- try {
- const s2sMessage = S2sMessage.parse(message.data);
- // check uid
- if (s2sMessage.publisherUid === this.uid) {
- logger.debug(`Skip processing by ${handlable.constructor.name} because this message is sent by the publisher itself:`, `from ${this.uid}`);
- return;
- }
- // check shouldHandleS2sMessage
- const shouldHandle = handlable.shouldHandleS2sMessage(s2sMessage);
- logger.debug(`${handlable.constructor.name}.shouldHandleS2sMessage(`, s2sMessage, `) => ${shouldHandle}`);
- if (shouldHandle) {
- handlable.handleS2sMessage(s2sMessage);
- }
- }
- catch (err) {
- logger.warn('Could not handle a message: ', err.message);
- }
- }
- }
- module.exports = function(crowi) {
- const { configManager } = crowi;
- const uri = configManager.getConfig('crowi', 'app:nchanUri');
- // when nachan server URI is not set
- if (uri == null) {
- logger.warn('NCHAN_URI is not specified.');
- return;
- }
- const publishPath = configManager.getConfig('crowi', 's2sMessagingPubsub:nchan:publishPath');
- const subscribePath = configManager.getConfig('crowi', 's2sMessagingPubsub:nchan:subscribePath');
- const channelId = configManager.getConfig('crowi', 's2sMessagingPubsub:nchan:channelId');
- return new NchanDelegator(uri, publishPath, subscribePath, channelId);
- };
|