nchan.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. const logger = require('@alias/logger')('growi:service:config-pubsub:nchan');
  2. const path = require('path');
  3. const axios = require('axios');
  4. const WebSocketClient = require('websocket').client;
  5. const ConfigPubsubMessage = require('../../models/vo/config-pubsub-message');
  6. const ConfigPubsubDelegator = require('./base');
  7. class NchanDelegator extends ConfigPubsubDelegator {
  8. constructor(uri, publishPath, subscribePath, channelId) {
  9. super(uri);
  10. this.publishPath = publishPath;
  11. this.subscribePath = subscribePath;
  12. this.channelId = channelId;
  13. this.isConnecting = false;
  14. /**
  15. * A list of ConfigPubsubHandler instance
  16. */
  17. this.handlableList = [];
  18. this.client = null;
  19. this.connection = null;
  20. }
  21. /**
  22. * @inheritdoc
  23. */
  24. shouldResubscribe() {
  25. if (this.connection != null && this.connection.connected) {
  26. return false;
  27. }
  28. return !this.isConnecting;
  29. }
  30. /**
  31. * @inheritdoc
  32. */
  33. subscribe(forceReconnect = false) {
  34. if (forceReconnect) {
  35. if (this.connection != null && this.connection.connected) {
  36. this.connection.close();
  37. }
  38. }
  39. // init client
  40. if (this.client == null) {
  41. this.initClient();
  42. }
  43. if (this.shouldResubscribe()) {
  44. logger.info('The connection to config pubsub server is offline. Try to reconnect...');
  45. }
  46. // connect
  47. this.isConnecting = true;
  48. const url = this.constructUrl(this.subscribePath).toString();
  49. this.client.connect(url.toString());
  50. }
  51. /**
  52. * @inheritdoc
  53. */
  54. async publish(configPubsubMessage) {
  55. await super.publish(configPubsubMessage);
  56. logger.debug('Publish message', configPubsubMessage);
  57. const url = this.constructUrl(this.publishPath).toString();
  58. return axios.post(url, JSON.stringify(configPubsubMessage));
  59. }
  60. /**
  61. * @inheritdoc
  62. */
  63. addMessageHandler(handlable) {
  64. super.addMessageHandler(handlable);
  65. this.handlableList.push(handlable);
  66. if (this.connection != null) {
  67. this.connection.on('message', (messageObj) => {
  68. this.handleMessage(messageObj, handlable);
  69. });
  70. }
  71. }
  72. constructUrl(basepath) {
  73. const pathname = this.channelId == null
  74. ? basepath // /pubsub
  75. : path.join(basepath, this.channelId); // /pubsub/my-channel-id
  76. return new URL(pathname, this.uri);
  77. }
  78. initClient() {
  79. const client = new WebSocketClient();
  80. client.on('connectFailed', (error) => {
  81. logger.warn(`Connect Error: ${error.toString()}`);
  82. this.isConnecting = false;
  83. });
  84. client.on('connect', (connection) => {
  85. this.isConnecting = false;
  86. this.connection = connection;
  87. logger.info('WebSocket client connected');
  88. connection.on('error', (error) => {
  89. this.isConnecting = false;
  90. logger.error(`Connection Error: ${error.toString()}`);
  91. });
  92. connection.on('close', () => {
  93. logger.info('WebSocket connection closed');
  94. });
  95. // register all message handlers
  96. this.handlableList.forEach(handler => this.addMessageHandler(handler));
  97. });
  98. this.client = client;
  99. }
  100. /**
  101. * Handle message string with the specified ConfigPubsubHandler
  102. *
  103. * @see https://github.com/theturtle32/WebSocket-Node/blob/1f7ffba2f7a6f9473bcb39228264380ce2772ba7/docs/WebSocketConnection.md#message
  104. *
  105. * @param {object} message WebSocket-Node message object
  106. * @param {ConfigPubsubHandler} handlable
  107. */
  108. handleMessage(message, handlable) {
  109. if (message.type !== 'utf8') {
  110. logger.warn('Only utf8 message is supported.');
  111. }
  112. try {
  113. const configPubsubMessage = ConfigPubsubMessage.parse(message.utf8Data);
  114. // check uid
  115. if (configPubsubMessage.publisherUid === this.uid) {
  116. logger.debug(`Skip processing by ${handlable.constructor.name} because this message is sent by the publisher itself:`, `from ${this.uid}`);
  117. return;
  118. }
  119. // check shouldHandleConfigPubsubMessage
  120. const shouldHandle = handlable.shouldHandleConfigPubsubMessage(configPubsubMessage);
  121. logger.debug(`${handlable.constructor.name}.shouldHandleConfigPubsubMessage(`, configPubsubMessage, `) => ${shouldHandle}`);
  122. if (shouldHandle) {
  123. handlable.handleConfigPubsubMessage(configPubsubMessage);
  124. }
  125. }
  126. catch (err) {
  127. logger.warn('Could not handle a message: ', err.message);
  128. }
  129. }
  130. }
  131. module.exports = function(crowi) {
  132. const { configManager } = crowi;
  133. const uri = configManager.getConfig('crowi', 'app:nchanUri');
  134. // when nachan server URI is not set
  135. if (uri == null) {
  136. logger.warn('NCHAN_URI is not specified.');
  137. return;
  138. }
  139. const publishPath = configManager.getConfig('crowi', 'configPubsub:nchan:publishPath');
  140. const subscribePath = configManager.getConfig('crowi', 'configPubsub:nchan:subscribePath');
  141. return new NchanDelegator(uri, publishPath, subscribePath);
  142. };