Skip to content

Instantly share code, notes, and snippets.

@indrekj
Created November 3, 2019 11:48
Show Gist options
  • Select an option

  • Save indrekj/692c28aedfe5ec42e26d4502845cd06d to your computer and use it in GitHub Desktop.

Select an option

Save indrekj/692c28aedfe5ec42e26d4502845cd06d to your computer and use it in GitHub Desktop.
import R from 'ramda';
import Rx from 'rx';
import logger from './logger';
import Phoenix from 'phoenix';
const STATS_NAMESPACE = 'sm.app.operator_api.pubsub';
const CONNECTION_METRIC = `${STATS_NAMESPACE}.connection`;
const PUSH_METRIC = `${STATS_NAMESPACE}.push`;
const CRASH_REASON = 'join crashed';
const MAX_32_BIT_INTEGER = 2147483647;
const TEMPORARY_ERRORS = ['tracking_failed'];
const AVOID_LONGPOLL_WORKER_INTERVAL_IN_MS = 300000; // 5 minutes
const createLogger = logsEnabled => {
if (logsEnabled) {
return (kind, message, data) => logger.info(`[Pubsub] ${kind}: ${message}`, {pubsub_data: data});
} else {
return () => {};
}
};
const findNewTransport = socket => {
if (!window.WebSocket) {
// Always use LongPoll if WebSocket is not defined
return Phoenix.LongPoll;
} else if (!navigator.onLine) {
// Keep the transport when the user is not online
return socket.transport;
} else if (socket.transport === window.WebSocket) {
// Change transport to LongPoll if we had connection error using WebSocket
return Phoenix.LongPoll;
} else {
// Change transport to WebSocket if we had connection error using LongPoll
return window.WebSocket;
}
};
const startAvoidLongPollWorker = socket => {
const work = () => {
// Avoid changing transports and interfering with the reconnection logic
// when we don't have an established connection and the reconnection might
// already be in progress
if (socket.connectionState() !== 'open') return;
// Return if the user is already not using LongPoll
if (socket.transport !== Phoenix.LongPoll) return;
const newTransport = findNewTransport(socket.transport);
// Return if the new transport is the same
if (socket.transport === newTransport) return;
// Change socket transport and schedule a reconnect
socket.transport = newTransport;
socket.reconnectTimer.scheduleTimeout();
};
setInterval(work, AVOID_LONGPOLL_WORKER_INTERVAL_IN_MS);
};
const buildPubSubManager = ({server, socket, stats, logsEnabled, accessToken}) => {
let shouldReconnectOnAccessTokenUpdate = false;
const updateAccessToken = newAccessToken => {
accessToken = newAccessToken;
if (shouldReconnectOnAccessTokenUpdate) {
logger.info('Pubsub access token updated, reconnecting');
socket.reconnectTimer.scheduleTimeout();
} else {
logger.info('Pubsub access token updated');
}
};
if (R.isNil(socket)) {
socket = new Phoenix.Socket(`${server}/notifications`, {
logger: createLogger(logsEnabled),
params: () => ({access_token: accessToken})
});
} else {
// Used in testing
socket.params = () => ({access_token: accessToken});
}
let connected = false;
const createContext = params => R.merge({transport: socket.transport.name, online: navigator.onLine}, params);
const createTags = (tags, param) => {
if (R.isNil(param)) {
param = {};
}
const {transport} = param;
return R.concat([`transport:${transport || socket.transport.name}`, `online:${navigator.onLine}`], tags);
};
socket.onOpen(() => {
stats.increment(CONNECTION_METRIC, createTags(['action:connected']));
logger.info('PubSub socket opened', createContext({}));
shouldReconnectOnAccessTokenUpdate = false;
// Do channel rejoin in next event loop, otherwise it's too soon and
// doesn't work.
const channels = socket.channels;
setTimeout(() => {
// Channels are not always rejoined
// https://github.com/phoenixframework/phoenix/issues/3458
//
// We can however explicitly invoke rejoinTimer on each socket channel to
// force the rejoin.
//
// If phoenix already scheduled a rejoinTimer then invoking it again
// doesn't do any harm. It may only increase the rejoin delay but it is
// capped at 10 seconds.
channels.forEach(channel =>
channel.rejoinTimer.scheduleTimeout()
);
});
});
// onClose is triggered when the connection drops or when connection is
// closed gracefully. If the connection drops because of an error then
// onError is triggered before the onClose.
//
// We extract transport information from the event instead of the socket
// because we're changing the transport on error but want to show here the
// original transport name.
socket.onClose(event => {
// Event is passed as a callback parameter only for WebSocket connections.
let context;
let tags;
let transport;
if (event) {
transport = window.WebSocket.name;
context = createContext({reason: event.reason, type: event.type, code: event.code, transport});
tags = createTags(['action:disconnected', `type:${event.type}`, `code:${event.code}`], {transport});
if (event.wasClean) {
stats.increment(CONNECTION_METRIC, R.concat(tags, ['reason:closed']));
logger.info('PubSub connection disconnected cleanly', context);
} else {
// This can occur when
// * server closes connection (e.g during a deploy)
// * a middleman closes the connection
// In any case, the connection is reconnected and as there's no way to
// be sure that there was an actual problem, no warnings should be
// logged nor error metrics emitted.
stats.increment(CONNECTION_METRIC, R.concat(tags, ['reason:closed']));
logger.info('PubSub connection disconnected', context);
}
} else {
transport = Phoenix.LongPoll.name;
context = createContext({reason: 'unknown', type: 'unknown', code: 'unknown', transport});
tags = createTags(['action:disconnected', 'type:unknown', 'reason:unknown', 'code:unknown'], {transport});
stats.increment(CONNECTION_METRIC, tags);
logger.info('Pubsub connection disconnected', context);
}
});
socket.onError(event => {
const newTransport = findNewTransport(socket);
const context = event ?
createContext({type: event.type, new_transport: newTransport.name}) :
createContext({type: 'unknown', new_transport: newTransport.name});
stats.increment(CONNECTION_METRIC, createTags(['action:error']));
logger.warn('PubSub socket connection error', context);
socket.transport = newTransport;
socket.reconnectTimer.scheduleTimeout();
});
window.addEventListener('error', event => {
if (event && event.message.match('status 403') && socket.connectionState() === 'connecting') {
logger.info(
'Detected likely pubsub authentication error, reconnecting when token is updated'
);
shouldReconnectOnAccessTokenUpdate = true;
}
});
const ensureConnected = () => {
if (!connected) {
// This happens only the first time. Reconnects are handled by phoenix
// library and don't invoke this function.
stats.increment(CONNECTION_METRIC, createTags(['action:connect']));
try {
socket.connect();
} catch (error) {
const newTransport = findNewTransport(socket);
logger.warn(
'Failed to connect to Pubsub with initial transport',
createContext({new_transport: newTransport.name, error})
);
socket.transport = newTransport;
socket.connect();
}
connected = true;
startAvoidLongPollWorker(socket);
}
};
const wrapChannel = channel => {
const observable = (event, opts) => {
if (R.isNil(opts)) {
opts = {};
}
return Rx.Observable.create(observer => {
const ref = channel.on(event, payload => observer.onNext(payload));
return () => {
channel.off(event, ref);
if (opts.leaveChannelOnDispose) {
channel.leave();
}
};
});
};
const push = (event, payload) =>
Rx.Observable.create(observer => {
channel.push(event, payload)
.receive('ok', response => {
observer.onNext(response);
observer.onCompleted();
})
.receive('error', error => {
stats.increment(PUSH_METRIC, ['action:error']);
observer.onError({type: 'error', error});
})
.receive('timeout', () => {
stats.increment(PUSH_METRIC, ['action:timeout']);
observer.onError({type: 'timeout'});
});
// nothing to dispose
return () => {};
});
const pushAggregateState = (event, key, state) => push(event, {key, clock: Date.now(), state});
const leave = () => channel.leave();
return {observable, push, pushAggregateState, leave, phoenixChannel: channel};
};
const joinChannel = (topic, params) => {
if (R.isNil(params)) {
params = {};
}
return Rx.Observable.create(observer => {
ensureConnected();
let joined = false;
// In case the socket is not connected, the channel is rejoined at the same interval
// as the socket is reconnected. Avoid timing out the channel join unless otherwise specified.
const timeout = params.timeout || MAX_32_BIT_INTEGER;
const channel = socket.channel(topic, params);
channel.join(timeout)
.receive('ok', () => {
joined = true;
observer.onNext(wrapChannel(channel));
observer.onCompleted();
})
.receive('error', error => {
if (error && (error.reason === CRASH_REASON || TEMPORARY_ERRORS.indexOf(error) !== -1)) {
logger.warn('Joining Pubsub channel failed, retrying', {reason: 'error', error, topic});
} else {
logger.warn('Joining Pubsub channel failed', {reason: 'error', error, topic});
observer.onError({type: 'error', error});
}
})
.receive('timeout', () => logger.warn('Joining Pubsub channel failed', {reason: 'timeout'}));
return () => {
if (!joined) {
channel.leave();
}
};
});
};
return {
joinChannel,
updateAccessToken
};
};
export default buildPubSubManager;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment