Created
November 29, 2018 17:29
-
-
Save AngusFu/49711c01346300cc24c9646b325917da to your computer and use it in GitHub Desktop.
A callbag demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const SIG_START = 0 | |
const SIG_DATA = 1 | |
const SIG_END = 2 | |
const pipe = (fst, ...rest) => { | |
return rest.reduce((source, sink) => sink(source), fst) | |
} | |
/// type Callbag = (signal, payload?: any) => void | |
const interval = period => (signal, sink) => { | |
// A callbag MUST NOT be delivered data before it has been greeted | |
if (signal !== SIG_START) return | |
let i = 0 | |
const tick = () => sink(SIG_DATA, i++) | |
const tid = setInterval(tick, period) | |
const talkback = signal => { | |
// terminating | |
if (signal === SIG_END) { | |
console.log('timer cleared') | |
clearInterval(tid) | |
} | |
} | |
// 2. Handshake with consumer (ie greets the consumer) | |
sink(SIG_START, talkback) | |
} | |
const forEach = operation => producer => { | |
let talkback = null | |
// 1. Greets the producer with a sink function | |
producer(SIG_START, (signal, payload) => { | |
// 3. started | |
if (signal === SIG_START) { | |
talkback = payload | |
} | |
if (signal === SIG_DATA) { | |
operation(payload) | |
} | |
if (signal === SIG_DATA || signal === SIG_START) { | |
talkback(SIG_DATA) | |
} | |
}) | |
} | |
// const map = operation => { | |
// return producer => { | |
// return (signal, sink) => { | |
// if (signal !== SIG_START) return | |
// const talkback = (signal, payload) => { | |
// sink(signal, signal === SIG_DATA ? pred(payload) : payload) | |
// } | |
// producer(SIG_START, talkback) | |
// } | |
// } | |
// } | |
const map = operation => source => (signal, sink) => { | |
if (signal !== SIG_START) return | |
const talkback = (signal, payload) => { | |
sink(signal, signal === SIG_DATA ? operation(payload) : payload) | |
} | |
source(SIG_START, talkback) | |
} | |
const filter = pred => source => (signal, sink) => { | |
if (signal !== SIG_START) return | |
const talkback = (signal, payload) => { | |
if (signal === SIG_DATA) { | |
if (pred(payload)) { | |
sink(signal, payload) | |
} | |
} else { | |
sink(signal, payload) | |
} | |
} | |
source(SIG_START, talkback) | |
} | |
const take = count => source => (signal, sink) => { | |
if (signal !== SIG_START) return | |
let taken = 0 | |
let talkback = null | |
source(SIG_START, (signal, payload) => { | |
if (signal === SIG_START) { | |
talkback = payload | |
sink(SIG_START, (signal, payload) => { | |
if (taken < count) { | |
talkback(SIG_DATA, payload) | |
} | |
}) | |
return | |
} | |
if (signal === SIG_DATA) { | |
taken++ | |
sink(signal, payload) | |
if (taken >= count) { | |
sink(SIG_END) | |
talkback(SIG_END) | |
} | |
return | |
} | |
sink(signal, payload) | |
}) | |
} | |
pipe( | |
interval(1000), | |
map(x => x * 20), | |
filter(x => x < 200), | |
take(3), | |
forEach(console.log) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment