Skip to content

Instantly share code, notes, and snippets.

@AngusFu
Created November 29, 2018 17:29
Show Gist options
  • Save AngusFu/49711c01346300cc24c9646b325917da to your computer and use it in GitHub Desktop.
Save AngusFu/49711c01346300cc24c9646b325917da to your computer and use it in GitHub Desktop.
A callbag demo
const SIG_START = 0
const SIG_DATA = 1
const SIG_END = 2
const pipe = (fst, ...rest) => {
return rest.reduce((source, sink) => sink(source), fst)
}
/// type Callbag = (signal, payload?: any) => void
const interval = period => (signal, sink) => {
// A callbag MUST NOT be delivered data before it has been greeted
if (signal !== SIG_START) return
let i = 0
const tick = () => sink(SIG_DATA, i++)
const tid = setInterval(tick, period)
const talkback = signal => {
// terminating
if (signal === SIG_END) {
console.log('timer cleared')
clearInterval(tid)
}
}
// 2. Handshake with consumer (ie greets the consumer)
sink(SIG_START, talkback)
}
const forEach = operation => producer => {
let talkback = null
// 1. Greets the producer with a sink function
producer(SIG_START, (signal, payload) => {
// 3. started
if (signal === SIG_START) {
talkback = payload
}
if (signal === SIG_DATA) {
operation(payload)
}
if (signal === SIG_DATA || signal === SIG_START) {
talkback(SIG_DATA)
}
})
}
// const map = operation => {
// return producer => {
// return (signal, sink) => {
// if (signal !== SIG_START) return
// const talkback = (signal, payload) => {
// sink(signal, signal === SIG_DATA ? pred(payload) : payload)
// }
// producer(SIG_START, talkback)
// }
// }
// }
const map = operation => source => (signal, sink) => {
if (signal !== SIG_START) return
const talkback = (signal, payload) => {
sink(signal, signal === SIG_DATA ? operation(payload) : payload)
}
source(SIG_START, talkback)
}
const filter = pred => source => (signal, sink) => {
if (signal !== SIG_START) return
const talkback = (signal, payload) => {
if (signal === SIG_DATA) {
if (pred(payload)) {
sink(signal, payload)
}
} else {
sink(signal, payload)
}
}
source(SIG_START, talkback)
}
const take = count => source => (signal, sink) => {
if (signal !== SIG_START) return
let taken = 0
let talkback = null
source(SIG_START, (signal, payload) => {
if (signal === SIG_START) {
talkback = payload
sink(SIG_START, (signal, payload) => {
if (taken < count) {
talkback(SIG_DATA, payload)
}
})
return
}
if (signal === SIG_DATA) {
taken++
sink(signal, payload)
if (taken >= count) {
sink(SIG_END)
talkback(SIG_END)
}
return
}
sink(signal, payload)
})
}
pipe(
interval(1000),
map(x => x * 20),
filter(x => x < 200),
take(3),
forEach(console.log)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment