Last active
March 9, 2019 14:14
-
-
Save eiriklv/ea8c9dbbae3069930054d3393b076808 to your computer and use it in GitHub Desktop.
Channels and websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Import dependencies | |
*/ | |
const { EventEmitter } = require('events'); | |
const ReconnectingWebSocket = require('reconnecting-websocket'); | |
/** | |
* Export a function that creates a websocket connection interface | |
*/ | |
export default function createReconnectingWebsocket(uri) { | |
/** | |
* Flag to determine if socket has been opened and is ready for sending messages | |
*/ | |
let isOpen = false; | |
/** | |
* Create a message queue for all messages that have been requested sent before the socket opened | |
*/ | |
let messageQueue = []; | |
/** | |
* Create an event emitter | |
*/ | |
const emitter = new EventEmitter(); | |
/** | |
* Create error handler | |
*/ | |
emitter.on('error', console.log); | |
/** | |
* Create a new websocket connection instance | |
*/ | |
const websocket = new ReconnectingWebSocket(uri); | |
/** | |
* Handle opening of the websocket | |
*/ | |
websocket.onopen = (evt) => { | |
isOpen = true; | |
messageQueue.forEach((msg) => websocket.send(JSON.stringify(msg))); | |
}; | |
/** | |
* Handle closing of the websocket | |
*/ | |
websocket.onclose = (evt) => { | |
isOpen = false; | |
}; | |
/** | |
* Handle websocket messages | |
*/ | |
websocket.onmessage = (evt) => { | |
/** | |
* Try to parse the message if it is JSON | |
*/ | |
let parsedMessage; | |
try { | |
parsedMessage = JSON.parse(evt.data); | |
} catch (e) {} | |
emitter.emit('message', parsedMessage || evt.data); | |
}; | |
/** | |
* Handle errors on the websocket | |
*/ | |
websocket.onerror = (evt) => { | |
isOpen = false; | |
}; | |
/** | |
* Return the interface | |
*/ | |
return { | |
send(msg) { | |
if (isOpen) { | |
websocket.send(JSON.stringify(msg)) | |
} else { | |
messageQueue.push(msg); | |
} | |
}, | |
close() { | |
websocket.close(); | |
return emitter.removeAllListeners(); | |
}, | |
on(chan, listener) { | |
return emitter.on(chan, listener); | |
}, | |
removeListener(chan, listener) { | |
return emitter.removeListener(chan, listener); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Dependencies | |
*/ | |
import { eventChannel } from 'redux-saga'; | |
/** | |
* Abstraction for creating socket channels | |
*/ | |
function createSocketChannel(socket) { | |
return eventChannel(emit => { | |
const messageHandler = (data) => emit(data); | |
socket.on('message', messageHandler); | |
return () => socket.removeListener('message', messageHandler); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Import dependencies | |
*/ | |
import { take, call, apply, put, fork } from 'redux-saga/effects'; | |
/** | |
* Import helpers | |
*/ | |
import createReconnectingWebsocket from '../utils/create-reconnecting-websocket'; | |
import createSocketChannel from '../utils/create-socket-channel'; | |
/** | |
* Import action types | |
*/ | |
import { | |
SEND_SOCKET_MESSAGE, | |
} from '../ducks/whatever'; | |
/** | |
* Import action creators | |
*/ | |
import { | |
someAction, | |
someOtherAction, | |
} from '../ducks/whatever'; | |
/** | |
* Initialization saga | |
*/ | |
export default function* initializeSocket() { | |
const socket = yield call(createWebsocket, 'ws://socket.url'); | |
const socketChannel = yield call(createSocketChannel, socket); | |
yield [ | |
fork(sendSocketMessages, socket), | |
fork(receiveSocketMessages, socketChannel), | |
]; | |
} | |
/** | |
* Receive socket messages | |
*/ | |
function* receiveSocketMessages(socketChannel) { | |
while (true) { | |
const message = yield take(socketChannel); | |
/** | |
* Map websocket messages to actions | |
*/ | |
switch (message.type) { | |
case 'some-message': | |
yield put(someAction(message.payload)); | |
break; | |
case 'some-other-message': | |
yield put(someOtherAction(message.payload)); | |
break; | |
default: | |
break; | |
} | |
} | |
} | |
/** | |
* Send socket messages | |
*/ | |
function* sendSocketMessages(socket) { | |
while (true) { | |
const message = yield take(SEND_SOCKET_MESSAGE); | |
yield apply(socket, socket.send, [message]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment