Skip to content

Instantly share code, notes, and snippets.

@jayphelps
Last active August 13, 2017 05:31
Show Gist options
  • Save jayphelps/c9bf4822552f205c9f7b0b3cf99c9e3c to your computer and use it in GitHub Desktop.
Save jayphelps/c9bf4822552f205c9f7b0b3cf99c9e3c to your computer and use it in GitHub Desktop.
import { webSocket } from 'rxjs/observable/dom/webSocket';
import { timer } from 'rxjs/observable/timer';
import { fromEvent } from 'rxjs/observable/fromEvent';
// Lazy, doesn't connect when no one is subscribed,
// but to multiplex we need a single static instance
// so that they all use the same socket
const socket$ = webSocket('ws://stock/endpoint');
// So this is bi-direction multiplexing; multiple concurrent
// stock ticker streaming through a single socket. Each one
// is started with START_TICKER_STREAM and ended with CLOSE_TICKER_STREAM
const stockTickerEpic = (action$, store) =>
action$.ofType('START_TICKER_STREAM')
.mergeMap(action => // mergeMap so that we can utilize multiplexing (concurrent tickers)
socket$.multiplex(
() => JSON.stringify({ sub: action.ticker }), // message to send to the server immediately
() => JSON.stringify({ unsub: action.ticker }), // message to send to the server when this Observable is unsubscribed
msg => msg.ticker === action.ticker // how we filter out incoming server messages that aren't related to this ticker
)
.retryWhen(err => // optional of course. just here to demonstrate
window.navigator.onLine ? timer(1000) : fromEvent(window, 'online')
)
.takeUntil(
action$.ofType('CLOSE_TICKER_STREAM')
.filter(closeAction => closeAction.ticker === action.ticker) // only stop taking this particular ticker if it matches
)
.map(tick => ({ type: 'TICKER_TICK', tick }))
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment