Skip to content

Instantly share code, notes, and snippets.

@praveen001
Created July 22, 2019 11:09
Show Gist options
  • Save praveen001/9c7bf4658c7ba20db788b55cec69b6ec to your computer and use it in GitHub Desktop.
Save praveen001/9c7bf4658c7ba20db788b55cec69b6ec to your computer and use it in GitHub Desktop.
Redux Observable WebSocket
import { combineEpics, StateObservable } from 'redux-observable';
import { of } from 'rxjs';
import { Subject } from 'rxjs/Subject';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
connected,
connect,
disconnected,
disconnect,
IConnectWebsocketAction,
receiveMessageFromWebSocket,
sentMessage,
WebsocketActionTypes,
IDisconnectFromWebsocketAction
} from '../actions/websocketActions';
import { IState } from '../reducers';
let webSocketSubject: WebSocketSubject<{}>;
let onOpenSubject = new Subject();
let onCloseSubject = new Subject();
const connectSocket = websocketUrl => {
onOpenSubject = new Subject();
onCloseSubject = new Subject();
webSocketSubject = webSocket({
url: websocketUrl,
openObserver: onOpenSubject,
closeObserver: onCloseSubject
});
return webSocketSubject;
};
const connectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.CONNECT)
.switchMap((action: IConnectWebsocketAction) =>
connectSocket(state$.value.config.webSocketURL)
.map(data => receiveMessageFromWebSocket(data))
.catch(e => of(disconnect(true)))
);
const connectedEpic = action$ =>
action$.ofType(WebsocketActionTypes.CONNECT).switchMap(() =>
onOpenSubject.map(() => {
onCloseSubject.map(() => disconnect(true));
return connected();
})
);
const sendMessageEpic = action$ =>
action$.ofType(WebsocketActionTypes.SEND_MESSAGE).map(action => {
webSocketSubject.next(action.payload);
return sentMessage();
});
const disconnectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.DISCONNECT)
.mergeMap((action: IDisconnectFromWebsocketAction) => {
if (action.payload.retry) {
return of(connect(state$.value.config.webSocketURL))
.delay(5000)
.startWith(disconnected());
}
onCloseSubject.complete();
webSocketSubject.complete();
return [disconnected()];
});
export default combineEpics(
connectEpic,
connectedEpic,
sendMessageEpic,
disconnectEpic
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment