Skip to content

Instantly share code, notes, and snippets.

@bafonins
Created July 17, 2019 12:23
Show Gist options
  • Select an option

  • Save bafonins/2e60e9f28b5fd13b7c9aecf67348e74f to your computer and use it in GitHub Desktop.

Select an option

Save bafonins/2e60e9f28b5fd13b7c9aecf67348e74f to your computer and use it in GitHub Desktop.
// stream.js
import ArrayBufferToString from 'arraybuffer-to-string'
import Token from '../../util/token'
import { notify, EVENTS } from './shared'
import 'web-streams-polyfill/dist/polyfill.js'
export default async function (payload, url, executor) {
let listeners = Object.values(EVENTS)
.reduce((acc, curr) => ({ ...acc, [curr]: null }), {})
const token = new Token().get()
let Authorization = null
if (typeof token === 'function') {
Authorization = `Bearer ${(await token()).access_token}`
} else {
Authorization = `Bearer ${token}`
}
return new Promise(async function (resolve, reject) {
try {
const reader = await fetch(url, {
body: JSON.stringify(payload),
method: 'POST',
headers: {
Authorization,
},
})
.then(async function (response) {
if (response.status !== 200) {
const err = await response.json()
throw err
}
return response.body.getReader()
})
const subscription = {
on (eventName, callback) {
if (listeners[eventName] === undefined) {
throw new Error(
`${eventName} event is not supported. Should be one of: start, error, event or close`
)
}
listeners[eventName] = callback
return this
},
close () {
if (reader) {
reader.cancel()
}
},
}
reader.read()
.then(function (data) {
notify(listeners[EVENTS.START])
return data
})
.then(function onChunk ({ done, value }) {
if (done) {
notify(listeners[EVENTS.CLOSE])
listeners = null
return
}
const parsed = ArrayBufferToString(value)
const result = JSON.parse(parsed).result
notify(listeners[EVENTS.EVENT], result)
return reader.read().then(onChunk)
})
.catch(function (error) {
notify(listeners[EVENTS.ERROR], error)
listeners = null
})
resolve(subscription)
} catch (error) {
reject(error)
}
})
}
// middleware/events.js
try {
channel = await onEventsStart([ id ])
channel.on('start', () => dispatch(startEventsSuccess(id)))
channel.on('event', message => dispatch(getEventSuccess(id, message)))
channel.on('error', error => dispatch(getEventFailure(id, error)))
channel.on('close', () => dispatch(stopEvents(id)))
} catch (error) {
// ...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment