Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* eslint-disable func-names, no-underscore-dangle */
- import EventBus from 'vertx3-eventbus-client'
- import { DelayObserver } from 'core/lib/delay-observer'
- import { endpointWs } from 'core/constants'
- const MyEventBus = (function () {
- let callbacks = []
- const multiEmitter = (type) => (...param) => {
- callbacks.forEach((cb) => {
- if (cb.type === type) cb.fn(...param)
- })
- }
- EventBus.prototype.onopen = multiEmitter('onopen')
- EventBus.prototype.onclose = multiEmitter('onclose')
- EventBus.prototype.onerror = multiEmitter('onerror')
- EventBus.prototype.onOpen = function (fn) {
- callbacks.push({ type: 'onopen', fn })
- }
- EventBus.prototype.onClose = function (fn) {
- callbacks.push({ type: 'onclose', fn })
- }
- EventBus.prototype.onError = function (fn) {
- callbacks.push({ type: 'onerror', fn })
- }
- EventBus.prototype.clearCallback = function () {
- callbacks = []
- }
- return EventBus
- }())
- /*
- ws.state ->
- EventBus.CONNECTING = 0;
- EventBus.OPEN = 1;
- EventBus.CLOSING = 2;
- EventBus.CLOSED = 3;
- */
- class ConnectWs {
- constructor() {
- this.addr = endpointWs
- // this.addr = '/eventbus'
- this.ws = new MyEventBus(this.addr)
- this.state = {} // хранилище данных для каждого канала
- this.socketListeners = {} // подписчики вебсокета, по одному на канал
- this.channelSubscribers = {} // подписчики канала, (может быть много на один канал)
- // this.ws.reconnectAttempts
- // this.ws.reconnectDelayMax = 3
- this.freezeState = false
- this.listenerClose()
- }
- subscribeChannel(chanel, cb = () => {}, handlerResponse = () => {}) {
- return new Promise((resolve) => {
- if (this.ws.state === EventBus.OPEN) {
- resolve(this._controllerChannel(chanel, cb, handlerResponse))
- return
- }
- else if (this.ws.state === EventBus.CLOSED) {
- this.ws = new MyEventBus(this.addr)
- this.listenerClose()
- }
- this.ws.onOpen(() => {
- // console.log('open')
- resolve(this._controllerChannel(chanel, cb, handlerResponse))
- })
- })
- }
- freeze() {
- Object.entries(this.socketListeners).forEach(([channel, handler]) => {
- this.ws.unregisterHandler(channel, handler)
- })
- this.ws.close()
- this.freezeState = true
- }
- restore() {
- this.ws = new MyEventBus(this.addr)
- this.ws.onOpen(() => {
- Object.entries(this.socketListeners).forEach(([channel, handler]) => {
- this.state[channel] = null
- this.ws.registerHandler(channel, handler)
- })
- this.freezeState = false
- })
- }
- _controllerChannel(chanel, cb, handlerResponse) {
- if (!this.channelSubscribers[chanel]) {
- this.channelSubscribers[chanel] = []
- }
- this.channelSubscribers[chanel].push({ cb })
- if (this.socketListeners[chanel]) {
- cb(this.state[chanel], chanel)
- return () => {
- this._unsubscribeChannel(chanel, cb)
- }
- }
- const delayCb = new DelayObserver()
- this.state[chanel] = null
- this.socketListeners[chanel] = (error, message) => {
- if (error) {
- console.info('ws error:', error)
- }
- this.state[chanel] = handlerResponse(this.state[chanel], message.body, chanel)
- delayCb.emmit()
- }
- delayCb.subscribe(() => {
- this.channelSubscribers[chanel].forEach((subr) => {
- const res = subr.cb(this.state[chanel], chanel)
- if (res) {
- this.state[chanel] = res
- }
- })
- })
- this.ws.registerHandler(chanel, this.socketListeners[chanel])
- return () => {
- delayCb.clear()
- this._unsubscribeChannel(chanel, cb)
- }
- }
- _unsubscribeChannel(chanel, cb) {
- // Cannot read property 'findIndex' of undefined
- const index = this.channelSubscribers[chanel].findIndex((subr) => subr.cb === cb)
- if (index !== -1) {
- // this.channelSubscribers[chanel][index].delayCb.clear()
- this.channelSubscribers[chanel].splice(index, 1)
- }
- if (!this.channelSubscribers[chanel].length) {
- this.ws.unregisterHandler(chanel, this.socketListeners[chanel])
- delete this.channelSubscribers[chanel]
- delete this.socketListeners[chanel]
- delete this.state[chanel]
- }
- }
- clearSubscribes() {
- this.state = {}
- this.socketListeners = {}
- /* Object.keys(this.channelSubscribers).forEach((channel) => {
- this.channelSubscribers[channel].forEach((cb) => cb.delayCb.clear())
- }) */
- this.channelSubscribers = {}
- // console.log('clearSubscribes')
- }
- listenerClose() {
- this.ws.onClose((closeEvent) => {
- // console.log('close')
- if (closeEvent.code !== 1000) {
- this.clearSubscribes()
- this.ws.clearCallback()
- }
- })
- }
- }
- export const wsConnected = new ConnectWs()
- // export default ConnectWs
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement