Created
November 3, 2019 11:48
-
-
Save indrekj/692c28aedfe5ec42e26d4502845cd06d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import R from 'ramda'; | |
| import Rx from 'rx'; | |
| import logger from './logger'; | |
| import Phoenix from 'phoenix'; | |
| const STATS_NAMESPACE = 'sm.app.operator_api.pubsub'; | |
| const CONNECTION_METRIC = `${STATS_NAMESPACE}.connection`; | |
| const PUSH_METRIC = `${STATS_NAMESPACE}.push`; | |
| const CRASH_REASON = 'join crashed'; | |
| const MAX_32_BIT_INTEGER = 2147483647; | |
| const TEMPORARY_ERRORS = ['tracking_failed']; | |
| const AVOID_LONGPOLL_WORKER_INTERVAL_IN_MS = 300000; // 5 minutes | |
| const createLogger = logsEnabled => { | |
| if (logsEnabled) { | |
| return (kind, message, data) => logger.info(`[Pubsub] ${kind}: ${message}`, {pubsub_data: data}); | |
| } else { | |
| return () => {}; | |
| } | |
| }; | |
| const findNewTransport = socket => { | |
| if (!window.WebSocket) { | |
| // Always use LongPoll if WebSocket is not defined | |
| return Phoenix.LongPoll; | |
| } else if (!navigator.onLine) { | |
| // Keep the transport when the user is not online | |
| return socket.transport; | |
| } else if (socket.transport === window.WebSocket) { | |
| // Change transport to LongPoll if we had connection error using WebSocket | |
| return Phoenix.LongPoll; | |
| } else { | |
| // Change transport to WebSocket if we had connection error using LongPoll | |
| return window.WebSocket; | |
| } | |
| }; | |
| const startAvoidLongPollWorker = socket => { | |
| const work = () => { | |
| // Avoid changing transports and interfering with the reconnection logic | |
| // when we don't have an established connection and the reconnection might | |
| // already be in progress | |
| if (socket.connectionState() !== 'open') return; | |
| // Return if the user is already not using LongPoll | |
| if (socket.transport !== Phoenix.LongPoll) return; | |
| const newTransport = findNewTransport(socket.transport); | |
| // Return if the new transport is the same | |
| if (socket.transport === newTransport) return; | |
| // Change socket transport and schedule a reconnect | |
| socket.transport = newTransport; | |
| socket.reconnectTimer.scheduleTimeout(); | |
| }; | |
| setInterval(work, AVOID_LONGPOLL_WORKER_INTERVAL_IN_MS); | |
| }; | |
| const buildPubSubManager = ({server, socket, stats, logsEnabled, accessToken}) => { | |
| let shouldReconnectOnAccessTokenUpdate = false; | |
| const updateAccessToken = newAccessToken => { | |
| accessToken = newAccessToken; | |
| if (shouldReconnectOnAccessTokenUpdate) { | |
| logger.info('Pubsub access token updated, reconnecting'); | |
| socket.reconnectTimer.scheduleTimeout(); | |
| } else { | |
| logger.info('Pubsub access token updated'); | |
| } | |
| }; | |
| if (R.isNil(socket)) { | |
| socket = new Phoenix.Socket(`${server}/notifications`, { | |
| logger: createLogger(logsEnabled), | |
| params: () => ({access_token: accessToken}) | |
| }); | |
| } else { | |
| // Used in testing | |
| socket.params = () => ({access_token: accessToken}); | |
| } | |
| let connected = false; | |
| const createContext = params => R.merge({transport: socket.transport.name, online: navigator.onLine}, params); | |
| const createTags = (tags, param) => { | |
| if (R.isNil(param)) { | |
| param = {}; | |
| } | |
| const {transport} = param; | |
| return R.concat([`transport:${transport || socket.transport.name}`, `online:${navigator.onLine}`], tags); | |
| }; | |
| socket.onOpen(() => { | |
| stats.increment(CONNECTION_METRIC, createTags(['action:connected'])); | |
| logger.info('PubSub socket opened', createContext({})); | |
| shouldReconnectOnAccessTokenUpdate = false; | |
| // Do channel rejoin in next event loop, otherwise it's too soon and | |
| // doesn't work. | |
| const channels = socket.channels; | |
| setTimeout(() => { | |
| // Channels are not always rejoined | |
| // https://github.com/phoenixframework/phoenix/issues/3458 | |
| // | |
| // We can however explicitly invoke rejoinTimer on each socket channel to | |
| // force the rejoin. | |
| // | |
| // If phoenix already scheduled a rejoinTimer then invoking it again | |
| // doesn't do any harm. It may only increase the rejoin delay but it is | |
| // capped at 10 seconds. | |
| channels.forEach(channel => | |
| channel.rejoinTimer.scheduleTimeout() | |
| ); | |
| }); | |
| }); | |
| // onClose is triggered when the connection drops or when connection is | |
| // closed gracefully. If the connection drops because of an error then | |
| // onError is triggered before the onClose. | |
| // | |
| // We extract transport information from the event instead of the socket | |
| // because we're changing the transport on error but want to show here the | |
| // original transport name. | |
| socket.onClose(event => { | |
| // Event is passed as a callback parameter only for WebSocket connections. | |
| let context; | |
| let tags; | |
| let transport; | |
| if (event) { | |
| transport = window.WebSocket.name; | |
| context = createContext({reason: event.reason, type: event.type, code: event.code, transport}); | |
| tags = createTags(['action:disconnected', `type:${event.type}`, `code:${event.code}`], {transport}); | |
| if (event.wasClean) { | |
| stats.increment(CONNECTION_METRIC, R.concat(tags, ['reason:closed'])); | |
| logger.info('PubSub connection disconnected cleanly', context); | |
| } else { | |
| // This can occur when | |
| // * server closes connection (e.g during a deploy) | |
| // * a middleman closes the connection | |
| // In any case, the connection is reconnected and as there's no way to | |
| // be sure that there was an actual problem, no warnings should be | |
| // logged nor error metrics emitted. | |
| stats.increment(CONNECTION_METRIC, R.concat(tags, ['reason:closed'])); | |
| logger.info('PubSub connection disconnected', context); | |
| } | |
| } else { | |
| transport = Phoenix.LongPoll.name; | |
| context = createContext({reason: 'unknown', type: 'unknown', code: 'unknown', transport}); | |
| tags = createTags(['action:disconnected', 'type:unknown', 'reason:unknown', 'code:unknown'], {transport}); | |
| stats.increment(CONNECTION_METRIC, tags); | |
| logger.info('Pubsub connection disconnected', context); | |
| } | |
| }); | |
| socket.onError(event => { | |
| const newTransport = findNewTransport(socket); | |
| const context = event ? | |
| createContext({type: event.type, new_transport: newTransport.name}) : | |
| createContext({type: 'unknown', new_transport: newTransport.name}); | |
| stats.increment(CONNECTION_METRIC, createTags(['action:error'])); | |
| logger.warn('PubSub socket connection error', context); | |
| socket.transport = newTransport; | |
| socket.reconnectTimer.scheduleTimeout(); | |
| }); | |
| window.addEventListener('error', event => { | |
| if (event && event.message.match('status 403') && socket.connectionState() === 'connecting') { | |
| logger.info( | |
| 'Detected likely pubsub authentication error, reconnecting when token is updated' | |
| ); | |
| shouldReconnectOnAccessTokenUpdate = true; | |
| } | |
| }); | |
| const ensureConnected = () => { | |
| if (!connected) { | |
| // This happens only the first time. Reconnects are handled by phoenix | |
| // library and don't invoke this function. | |
| stats.increment(CONNECTION_METRIC, createTags(['action:connect'])); | |
| try { | |
| socket.connect(); | |
| } catch (error) { | |
| const newTransport = findNewTransport(socket); | |
| logger.warn( | |
| 'Failed to connect to Pubsub with initial transport', | |
| createContext({new_transport: newTransport.name, error}) | |
| ); | |
| socket.transport = newTransport; | |
| socket.connect(); | |
| } | |
| connected = true; | |
| startAvoidLongPollWorker(socket); | |
| } | |
| }; | |
| const wrapChannel = channel => { | |
| const observable = (event, opts) => { | |
| if (R.isNil(opts)) { | |
| opts = {}; | |
| } | |
| return Rx.Observable.create(observer => { | |
| const ref = channel.on(event, payload => observer.onNext(payload)); | |
| return () => { | |
| channel.off(event, ref); | |
| if (opts.leaveChannelOnDispose) { | |
| channel.leave(); | |
| } | |
| }; | |
| }); | |
| }; | |
| const push = (event, payload) => | |
| Rx.Observable.create(observer => { | |
| channel.push(event, payload) | |
| .receive('ok', response => { | |
| observer.onNext(response); | |
| observer.onCompleted(); | |
| }) | |
| .receive('error', error => { | |
| stats.increment(PUSH_METRIC, ['action:error']); | |
| observer.onError({type: 'error', error}); | |
| }) | |
| .receive('timeout', () => { | |
| stats.increment(PUSH_METRIC, ['action:timeout']); | |
| observer.onError({type: 'timeout'}); | |
| }); | |
| // nothing to dispose | |
| return () => {}; | |
| }); | |
| const pushAggregateState = (event, key, state) => push(event, {key, clock: Date.now(), state}); | |
| const leave = () => channel.leave(); | |
| return {observable, push, pushAggregateState, leave, phoenixChannel: channel}; | |
| }; | |
| const joinChannel = (topic, params) => { | |
| if (R.isNil(params)) { | |
| params = {}; | |
| } | |
| return Rx.Observable.create(observer => { | |
| ensureConnected(); | |
| let joined = false; | |
| // In case the socket is not connected, the channel is rejoined at the same interval | |
| // as the socket is reconnected. Avoid timing out the channel join unless otherwise specified. | |
| const timeout = params.timeout || MAX_32_BIT_INTEGER; | |
| const channel = socket.channel(topic, params); | |
| channel.join(timeout) | |
| .receive('ok', () => { | |
| joined = true; | |
| observer.onNext(wrapChannel(channel)); | |
| observer.onCompleted(); | |
| }) | |
| .receive('error', error => { | |
| if (error && (error.reason === CRASH_REASON || TEMPORARY_ERRORS.indexOf(error) !== -1)) { | |
| logger.warn('Joining Pubsub channel failed, retrying', {reason: 'error', error, topic}); | |
| } else { | |
| logger.warn('Joining Pubsub channel failed', {reason: 'error', error, topic}); | |
| observer.onError({type: 'error', error}); | |
| } | |
| }) | |
| .receive('timeout', () => logger.warn('Joining Pubsub channel failed', {reason: 'timeout'})); | |
| return () => { | |
| if (!joined) { | |
| channel.leave(); | |
| } | |
| }; | |
| }); | |
| }; | |
| return { | |
| joinChannel, | |
| updateAccessToken | |
| }; | |
| }; | |
| export default buildPubSubManager; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment