nchan.ts 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import path from 'path';
  2. import WebSocket from 'ws';
  3. import ReconnectingWebSocket from 'reconnecting-websocket';
  4. import axios from '~/utils/axios';
  5. import loggerFactory from '~/utils/logger';
  6. import S2sMessage from '../../models/vo/s2s-message';
  7. import { AbstractS2sMessagingService } from './base';
  8. const logger = loggerFactory('growi:service:s2s-messaging:nchan');
  9. class NchanDelegator extends AbstractS2sMessagingService {
  10. /**
  11. * A list of S2sMessageHandlable instance
  12. */
  13. handlableToEventListenerMap: any = {};
  14. socket: any = null;
  15. constructor(uri, private publishPath: string, private subscribePath: string, private channelId: any) {
  16. super(uri);
  17. }
  18. /**
  19. * @inheritdoc
  20. */
  21. shouldResubscribe() {
  22. return this.socket.readyState === ReconnectingWebSocket.CLOSED;
  23. }
  24. /**
  25. * @inheritdoc
  26. */
  27. subscribe(forceReconnect = false) {
  28. if (forceReconnect) {
  29. logger.info('Force reconnecting is requested. Try to reconnect...');
  30. }
  31. else if (this.socket != null && this.shouldResubscribe()) {
  32. logger.info('The connection to config pubsub server is offline. Try to reconnect...');
  33. }
  34. // init client
  35. if (this.socket == null) {
  36. this.initClient();
  37. }
  38. // connect
  39. if (forceReconnect || this.shouldResubscribe()) {
  40. this.socket.reconnect();
  41. }
  42. }
  43. /**
  44. * @inheritdoc
  45. */
  46. async publish(s2sMessage: S2sMessage): Promise<void> {
  47. await super.publish(s2sMessage);
  48. const url = this.constructUrl(this.publishPath).toString();
  49. logger.debug('Publish message', s2sMessage, `to ${url}`);
  50. return axios.post(url, s2sMessage);
  51. }
  52. /**
  53. * @inheritdoc
  54. */
  55. addMessageHandler(handlable) {
  56. if (this.socket == null) {
  57. logger.error('socket has not initialized yet.');
  58. return;
  59. }
  60. super.addMessageHandler(handlable);
  61. this.registerMessageHandlerToSocket(handlable);
  62. }
  63. /**
  64. * @inheritdoc
  65. */
  66. removeMessageHandler(handlable) {
  67. if (this.socket == null) {
  68. logger.error('socket has not initialized yet.');
  69. return;
  70. }
  71. super.removeMessageHandler(handlable);
  72. const eventListener = this.handlableToEventListenerMap[handlable];
  73. if (eventListener != null) {
  74. this.socket.removeEventListener('message', eventListener);
  75. delete this.handlableToEventListenerMap[handlable];
  76. }
  77. }
  78. registerMessageHandlerToSocket(handlable) {
  79. const eventListener = (messageObj) => {
  80. this.handleMessage(messageObj, handlable);
  81. };
  82. this.socket.addEventListener('message', eventListener);
  83. this.handlableToEventListenerMap[handlable] = eventListener;
  84. }
  85. constructUrl(basepath) {
  86. const pathname = this.channelId == null
  87. ? basepath // /pubsub
  88. : path.join(basepath, this.channelId); // /pubsub/my-channel-id
  89. return new URL(pathname, this.uri);
  90. }
  91. initClient() {
  92. // const client = new WebSocketClient();
  93. const url = this.constructUrl(this.publishPath).toString();
  94. const socket = new ReconnectingWebSocket(url, [], {
  95. WebSocket,
  96. maxRetries: 3,
  97. startClosed: true,
  98. });
  99. socket.addEventListener('close', () => {
  100. logger.info('WebSocket client disconnected');
  101. });
  102. socket.addEventListener('error', (error) => {
  103. logger.error('WebSocket error occured:', error.message);
  104. });
  105. socket.addEventListener('open', () => {
  106. logger.info('WebSocket client connected.');
  107. });
  108. this.handlableList.forEach(handlable => this.registerMessageHandlerToSocket(handlable));
  109. this.socket = socket;
  110. }
  111. /**
  112. * Handle message string with the specified S2sMessageHandlable
  113. *
  114. * @see https://github.com/theturtle32/WebSocket-Node/blob/1f7ffba2f7a6f9473bcb39228264380ce2772ba7/docs/WebSocketConnection.md#message
  115. *
  116. * @param {object} message WebSocket-Node message object
  117. * @param {S2sMessageHandlable} handlable
  118. */
  119. handleMessage(message, handlable) {
  120. try {
  121. const s2sMessage = S2sMessage.parse(message.data);
  122. // check uid
  123. if (s2sMessage.publisherUid === this.uid) {
  124. logger.debug(`Skip processing by ${handlable.constructor.name} because this message is sent by the publisher itself:`, `from ${this.uid}`);
  125. return;
  126. }
  127. // check shouldHandleS2sMessage
  128. const shouldHandle = handlable.shouldHandleS2sMessage(s2sMessage);
  129. logger.debug(`${handlable.constructor.name}.shouldHandleS2sMessage(`, s2sMessage, `) => ${shouldHandle}`);
  130. if (shouldHandle) {
  131. handlable.handleS2sMessage(s2sMessage);
  132. }
  133. }
  134. catch (err) {
  135. logger.warn('Could not handle a message: ', err.message);
  136. }
  137. }
  138. }
  139. module.exports = function(crowi) {
  140. const { configManager } = crowi;
  141. const uri = configManager.getConfig('crowi', 'app:nchanUri');
  142. // when nachan server URI is not set
  143. if (uri == null) {
  144. logger.warn('NCHAN_URI is not specified.');
  145. return;
  146. }
  147. const publishPath = configManager.getConfig('crowi', 's2sMessagingPubsub:nchan:publishPath');
  148. const subscribePath = configManager.getConfig('crowi', 's2sMessagingPubsub:nchan:subscribePath');
  149. const channelId = configManager.getConfig('crowi', 's2sMessagingPubsub:nchan:channelId');
  150. return new NchanDelegator(uri, publishPath, subscribePath, channelId);
  151. };