Advertisement
Guest User

Untitled

a guest
Jul 16th, 2018
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.95 KB | None | 0 0
  1. /* eslint-disable func-names, no-underscore-dangle */
  2. import EventBus from 'vertx3-eventbus-client'
  3.  
  4. import { DelayObserver } from 'core/lib/delay-observer'
  5. import { endpointWs } from 'core/constants'
  6.  
  7.  
  8. const MyEventBus = (function () {
  9. let callbacks = []
  10.  
  11. const multiEmitter = (type) => (...param) => {
  12. callbacks.forEach((cb) => {
  13. if (cb.type === type) cb.fn(...param)
  14. })
  15. }
  16.  
  17. EventBus.prototype.onopen = multiEmitter('onopen')
  18. EventBus.prototype.onclose = multiEmitter('onclose')
  19. EventBus.prototype.onerror = multiEmitter('onerror')
  20.  
  21. EventBus.prototype.onOpen = function (fn) {
  22. callbacks.push({ type: 'onopen', fn })
  23. }
  24. EventBus.prototype.onClose = function (fn) {
  25. callbacks.push({ type: 'onclose', fn })
  26. }
  27. EventBus.prototype.onError = function (fn) {
  28. callbacks.push({ type: 'onerror', fn })
  29. }
  30. EventBus.prototype.clearCallback = function () {
  31. callbacks = []
  32. }
  33. return EventBus
  34. }())
  35.  
  36.  
  37. /*
  38. ws.state ->
  39. EventBus.CONNECTING = 0;
  40. EventBus.OPEN = 1;
  41. EventBus.CLOSING = 2;
  42. EventBus.CLOSED = 3;
  43. */
  44.  
  45.  
  46. class ConnectWs {
  47. constructor() {
  48. this.addr = endpointWs
  49. // this.addr = '/eventbus'
  50. this.ws = new MyEventBus(this.addr)
  51. this.state = {} // хранилище данных для каждого канала
  52. this.socketListeners = {} // подписчики вебсокета, по одному на канал
  53. this.channelSubscribers = {} // подписчики канала, (может быть много на один канал)
  54. // this.ws.reconnectAttempts
  55. // this.ws.reconnectDelayMax = 3
  56. this.freezeState = false
  57. this.listenerClose()
  58. }
  59.  
  60. subscribeChannel(chanel, cb = () => {}, handlerResponse = () => {}) {
  61. return new Promise((resolve) => {
  62. if (this.ws.state === EventBus.OPEN) {
  63. resolve(this._controllerChannel(chanel, cb, handlerResponse))
  64. return
  65. }
  66. else if (this.ws.state === EventBus.CLOSED) {
  67. this.ws = new MyEventBus(this.addr)
  68. this.listenerClose()
  69. }
  70.  
  71. this.ws.onOpen(() => {
  72. // console.log('open')
  73. resolve(this._controllerChannel(chanel, cb, handlerResponse))
  74. })
  75. })
  76. }
  77.  
  78. freeze() {
  79. Object.entries(this.socketListeners).forEach(([channel, handler]) => {
  80. this.ws.unregisterHandler(channel, handler)
  81. })
  82. this.ws.close()
  83. this.freezeState = true
  84. }
  85.  
  86. restore() {
  87. this.ws = new MyEventBus(this.addr)
  88. this.ws.onOpen(() => {
  89. Object.entries(this.socketListeners).forEach(([channel, handler]) => {
  90. this.state[channel] = null
  91. this.ws.registerHandler(channel, handler)
  92. })
  93. this.freezeState = false
  94. })
  95. }
  96.  
  97. _controllerChannel(chanel, cb, handlerResponse) {
  98. if (!this.channelSubscribers[chanel]) {
  99. this.channelSubscribers[chanel] = []
  100. }
  101.  
  102. this.channelSubscribers[chanel].push({ cb })
  103.  
  104. if (this.socketListeners[chanel]) {
  105. cb(this.state[chanel], chanel)
  106.  
  107. return () => {
  108. this._unsubscribeChannel(chanel, cb)
  109. }
  110. }
  111.  
  112. const delayCb = new DelayObserver()
  113.  
  114. this.state[chanel] = null
  115.  
  116. this.socketListeners[chanel] = (error, message) => {
  117. if (error) {
  118. console.info('ws error:', error)
  119. }
  120. this.state[chanel] = handlerResponse(this.state[chanel], message.body, chanel)
  121. delayCb.emmit()
  122. }
  123.  
  124. delayCb.subscribe(() => {
  125. this.channelSubscribers[chanel].forEach((subr) => {
  126. const res = subr.cb(this.state[chanel], chanel)
  127.  
  128. if (res) {
  129. this.state[chanel] = res
  130. }
  131. })
  132. })
  133.  
  134. this.ws.registerHandler(chanel, this.socketListeners[chanel])
  135.  
  136. return () => {
  137. delayCb.clear()
  138. this._unsubscribeChannel(chanel, cb)
  139. }
  140. }
  141.  
  142. _unsubscribeChannel(chanel, cb) {
  143. // Cannot read property 'findIndex' of undefined
  144. const index = this.channelSubscribers[chanel].findIndex((subr) => subr.cb === cb)
  145.  
  146.  
  147. if (index !== -1) {
  148. // this.channelSubscribers[chanel][index].delayCb.clear()
  149. this.channelSubscribers[chanel].splice(index, 1)
  150. }
  151.  
  152. if (!this.channelSubscribers[chanel].length) {
  153. this.ws.unregisterHandler(chanel, this.socketListeners[chanel])
  154. delete this.channelSubscribers[chanel]
  155. delete this.socketListeners[chanel]
  156. delete this.state[chanel]
  157. }
  158. }
  159.  
  160. clearSubscribes() {
  161. this.state = {}
  162. this.socketListeners = {}
  163. /* Object.keys(this.channelSubscribers).forEach((channel) => {
  164. this.channelSubscribers[channel].forEach((cb) => cb.delayCb.clear())
  165. }) */
  166. this.channelSubscribers = {}
  167. // console.log('clearSubscribes')
  168. }
  169.  
  170. listenerClose() {
  171. this.ws.onClose((closeEvent) => {
  172. // console.log('close')
  173. if (closeEvent.code !== 1000) {
  174. this.clearSubscribes()
  175. this.ws.clearCallback()
  176. }
  177. })
  178. }
  179. }
  180.  
  181.  
  182. export const wsConnected = new ConnectWs()
  183.  
  184. // export default ConnectWs
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement