Skip to content

Instantly share code, notes, and snippets.

@lourd
Last active July 13, 2017 04:17
Show Gist options
  • Save lourd/15d7c9bcf859b4cafb53dccae02e03c6 to your computer and use it in GitHub Desktop.
Save lourd/15d7c9bcf859b4cafb53dccae02e03c6 to your computer and use it in GitHub Desktop.
Example of a redux-saga saga function that uses an external emitter through an eventChannel, tested with redux-saga-test-plan
import { eventChannel } from 'redux-saga'
import { call, cps, take, fork, put, cancel, select } from 'redux-saga/effects'
function emitterChannel(emitter, eventType) {
return eventChannel(emit => {
emitter.on(eventType, emit)
return () => emitter.off(eventType, emit)
})
}
function* streamSaga(session) {
let channel
try {
channel = yield call(emitterChannel, session, 'streamCreated')
while (true) {
// Blocks until a stream is created
const { stream } = yield take(channel)
yield put(setOTStreamCreated(stream.id))
// If this happens more than once, we assume we're in the case that the
// hero left the chat & resumed, creating a new audio stream. That means
// that the old stream was destroyed, and therefore our old subscription,
// because OpenTok takes care of that internally
let subscription
while (!subscription) {
try {
// Subscribe to the stream
subscription = yield cps(
// https://tokbox.com/developer/sdks/js/reference/Session.html#subscribe
[session, session.subscribe],
stream,
// DOM element that opentok will insert the UI in, which we don't want
null,
{ insertDefaultUI: false },
)
} catch (err) {
yield put(setChatReconnecting(err.message))
}
}
yield put(setOTStreamSubscribed(true))
yield fork(subscriptionSaga, session, subscription)
}
} finally {
yield call(channel.close)
}
}
import testSaga from 'redux-saga-test-plan'
describe('streamSaga', () => {
const session = {
subscribe() {},
}
const subscription = {}
const channel = () => {}
channel.close = () => {}
function mainPath() {
const id = 'purpledurple'
const event = { stream: { id } }
return testSaga(streamSaga, session)
.next()
.call(emitterChannel, session, 'streamCreated')
.next(channel)
.take(channel)
.next(event)
.put(setOTStreamCreated(id))
.next()
.cps(
[session, session.subscribe],
event.stream,
null,
{ insertDefaultUI: false },
)
.next(subscription)
.put(setOTStreamSubscribed(true))
.next()
.fork(subscriptionSaga, session, subscription)
.next()
.take(channel)
.save('pre-stream')
// cancellation invoked from above!
.finish()
.call(channel.close)
.next()
.isDone()
}
it('Coach or Hero is present first. No one exits in the middle. coach leaves ' +
'the chat, initiates cancelling the chat', () => {
mainPath()
})
it('Coach or hero is present first. Hero exits in the middle and comes back,' +
'subscribing to the stream fails, reconnection succeeds. Then we leave', () => {
const id = 'purpledurple'
const event = { stream: { id } }
mainPath()
.restore('pre-stream')
// Another stream was created, the iterator yields another stream
.next(event)
.put(setOTStreamCreated(id))
.next()
// We subscribe and do the same things with this one too
.cps(
[session, session.subscribe],
event.stream,
null,
{ insertDefaultUI: false },
)
// Including reconnect when subscribing to the stream fails!
.throw(new Error('fire and brimstone'))
.put(setChatReconnecting('fire and brimstone'))
.next()
.cps(
[session, session.subscribe],
event.stream,
null,
{ insertDefaultUI: false },
)
// We successfully connect
.next(subscription)
.put(setOTStreamSubscribed(true))
.next()
.fork(subscriptionSaga, session, subscription)
.next()
.take(channel)
// and then cancellation invoked from above!
.finish()
.call(channel.close)
.next()
.isDone()
})
})
@lourd
Copy link
Author

lourd commented Mar 27, 2017

It's missing imports for the dependent sagas and action creators (subscriptionSaga, setOTStreamCreated, setChatReconnecting, setOTStreamSubscribed), but it should be clear enough without them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment