Skip to content

Instantly share code, notes, and snippets.

@Sawtaytoes
Created March 17, 2019 10:53
Show Gist options
  • Save Sawtaytoes/b54b1bc1678d9f8276f1dd4d7b0ed93e to your computer and use it in GitHub Desktop.
Save Sawtaytoes/b54b1bc1678d9f8276f1dd4d7b0ed93e to your computer and use it in GitHub Desktop.
const { delay, filter, map, mapTo, mergeAll, mergeMap, scan, switchMap, tap } = require('rxjs/operators')
const { of, Subject } = require('rxjs')
const { ofType } = require('redux-observable')
const ADD_TO_QUEUE = 'ADD_TO_QUEUE'
const FINISHED_PROCESSING_ITEM = 'FINISHED_PROCESSING_ITEM'
const REMOVE_FROM_QUEUE = 'REMOVE_FROM_QUEUE'
const START_PROCESSING_QUEUE = 'START_PROCESSING_QUEUE'
const queueReducer = (
queuedItems,
{
item,
type,
},
) => (
type === ADD_TO_QUEUE
? (
queuedItems
.concat(item)
)
: (
type === REMOVE_FROM_QUEUE
? (
queuedItems
.filter((
queuedItem,
) => (
queuedItem !== item
))
)
: queuedItems
)
)
const queuedItemsEpic = (
action$,
) => (
action$
.pipe(
tap(({
type,
}) => {
console
.info(
'[Action]',
type,
)
}),
ofType(
ADD_TO_QUEUE,
REMOVE_FROM_QUEUE,
),
scan(
queueReducer,
[],
),
switchMap((
queuedItems,
) => (
action$
.pipe(
ofType(
FINISHED_PROCESSING_ITEM,
START_PROCESSING_QUEUE,
),
mapTo(queuedItems[0]),
)
)),
tap((
value,
) => {
console
.info(
'Processing value:',
value,
)
}),
)
.pipe(
filter(Boolean),
mergeMap((
item,
) => ([
(
of({
item,
type: REMOVE_FROM_QUEUE,
})
),
(
of(item)
.pipe(
delay(4000),
map((
value,
) => (
`'${value}' is done`
)),
tap(console.info),
mapTo({
type: FINISHED_PROCESSING_ITEM,
}),
)
),
])),
mergeAll(),
)
)
const action$ = new Subject()
queuedItemsEpic(action$)
.subscribe(action$)
action$
.next({
item: 'A',
type: 'ADD_TO_QUEUE',
})
action$
.next({
item: 'B',
type: 'ADD_TO_QUEUE',
})
action$
.next({
item: 'C',
type: 'ADD_TO_QUEUE',
})
action$
.next({
type: 'START_PROCESSING_QUEUE',
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment