Skip to content

Instantly share code, notes, and snippets.

@SergProduction
Created March 2, 2018 13:07
Show Gist options
  • Save SergProduction/5817df7d2758b124db8537c47a8e2ffc to your computer and use it in GitHub Desktop.
Save SergProduction/5817df7d2758b124db8537c47a8e2ffc to your computer and use it in GitHub Desktop.
/* eslint-disable func-names, no-underscore-dangle */
import EventBus from 'vertx3-eventbus-client'
import { DelayObserver } from '../lib/delay-observer'
import { endpointApi } from './constants'
const MyEventBus = (function () {
let callbacks = []
const multiEmitter = (type) => (...param) => {
callbacks.forEach((cb) => {
if (cb.type === type) cb.fn(...param)
})
}
EventBus.prototype.onopen = multiEmitter('onopen')
EventBus.prototype.onclose = multiEmitter('onclose')
EventBus.prototype.onerror = multiEmitter('onerror')
EventBus.prototype.onOpen = function (fn) {
callbacks.push({ type: 'onopen', fn })
}
EventBus.prototype.onClose = function (fn) {
callbacks.push({ type: 'onclose', fn })
}
EventBus.prototype.onError = function (fn) {
callbacks.push({ type: 'onerror', fn })
}
EventBus.prototype.clearCallback = function () {
callbacks = []
}
return EventBus
}())
/*
ws.state ->
EventBus.CONNECTING = 0;
EventBus.OPEN = 1;
EventBus.CLOSING = 2;
EventBus.CLOSED = 3;
*/
class ConnectWs {
constructor() {
this.addr = '//217.28.215.174:8889/eventbus'
this.ws = new MyEventBus(this.addr)
this.state = {} // хранилище данных для каждого канала
this.socketListeners = {} // подписчики вебсокета, по одному на канал
this.channelSubscribers = {} // подписчики канала, (может быть много на один канал)
// this.ws.reconnectAttempts
// this.ws.reconnectDelayMax = 3
this.listenerClose()
}
subscribeChannel(chanel, cb = () => {}, handlerResponse = () => {}) {
return new Promise((resolve) => {
if (this.ws.state === EventBus.OPEN) {
resolve(this._controllerChannel(chanel, cb, handlerResponse))
return
}
else if (this.ws.state === EventBus.CLOSED) {
this.ws = new MyEventBus(this.addr)
this.listenerClose()
}
this.ws.onOpen(() => {
// console.log('open')
resolve(this._controllerChannel(chanel, cb, handlerResponse))
})
})
}
_controllerChannel(chanel, cb, handlerResponse) {
if (!this.channelSubscribers[chanel]) {
this.channelSubscribers[chanel] = []
}
this.channelSubscribers[chanel].push({ cb })
if (this.socketListeners[chanel]) {
cb(this.state[chanel])
return () => {
this._unsubscribeChannel(chanel, cb)
}
}
const delayCb = new DelayObserver()
this.state[chanel] = null
this.socketListeners[chanel] = (error, message) => {
if (error) {
console.info('ws error:', error)
}
this.state[chanel] = handlerResponse(this.state[chanel], message.body, chanel)
delayCb.emmit()
}
delayCb.subscribe(() => {
this.channelSubscribers[chanel].forEach((subr) => {
const res = subr.cb(this.state[chanel])
if (res) {
this.state[chanel] = res
}
})
})
this.ws.registerHandler(chanel, this.socketListeners[chanel])
return () => {
delayCb.clear()
this._unsubscribeChannel(chanel, cb)
}
}
_unsubscribeChannel(chanel, cb) {
// Cannot read property 'findIndex' of undefined
const index = this.channelSubscribers[chanel].findIndex((subr) => subr.cb === cb)
if (index !== -1) {
// this.channelSubscribers[chanel][index].delayCb.clear()
this.channelSubscribers[chanel].splice(index, 1)
}
if (!this.channelSubscribers[chanel].length) {
this.ws.unregisterHandler(chanel, this.socketListeners[chanel])
delete this.channelSubscribers[chanel]
delete this.socketListeners[chanel]
delete this.state[chanel]
}
}
clearSubscribes() {
this.state = {}
this.socketListeners = {}
/* Object.keys(this.channelSubscribers).forEach((channel) => {
this.channelSubscribers[channel].forEach((cb) => cb.delayCb.clear())
}) */
this.channelSubscribers = {}
// console.log('clearSubscribes')
}
listenerClose() {
this.ws.onClose(() => {
// console.log('close')
this.clearSubscribes()
this.ws.clearCallback()
})
}
}
export const wsConnected = new ConnectWs()
// export default ConnectWs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment