Last active
August 30, 2021 17:56
-
-
Save baetheus/b377312cdab21fc63459eac017c08409 to your computer and use it in GitHub Desktop.
Streams based on callbags
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type * as HKT from "./hkt.ts"; | |
import type * as TC from "./type_classes.ts"; | |
import type { Option } from "./option.ts"; | |
import * as O from "./option.ts"; | |
import { Queue } from "./denque.ts"; | |
import { pipe } from "./fns.ts"; | |
/******************************************************************************* | |
* Types | |
******************************************************************************/ | |
export type Handshake<A> = { tag: "Handshake"; sink: Sink<A> }; | |
export type Acknowledge<B> = { tag: "Acknowledge"; source: Source<B> }; | |
export type Data<B> = { tag: "Data"; data: B }; | |
export type Pull = { tag: "Pull"; count: number }; | |
export type End = { tag: "End" }; | |
export type Signal<B, A> = Handshake<A> | Acknowledge<B> | Data<B> | Pull | End; | |
export type Stream<B, A> = (signal: Signal<B, A>) => void; | |
export type Source<A> = Stream<never, A>; | |
export type Sink<B> = Stream<B, never>; | |
export type From<T> = T extends Stream<infer B, infer _> ? B : never; | |
export type To<T> = T extends Stream<infer _, infer A> ? A : never; | |
/******************************************************************************* | |
* Kind Registration | |
******************************************************************************/ | |
export const URI = "Stream"; | |
export type URI = typeof URI; | |
declare module "./hkt.ts" { | |
// deno-lint-ignore no-explicit-any | |
export interface Kinds<_ extends any[]> { | |
[URI]: Stream<_[1], _[0]>; | |
} | |
} | |
/******************************************************************************* | |
* Signal Constructors | |
******************************************************************************/ | |
const _end: End = { tag: "End" }; | |
export function handshake<A = never, B = never>( | |
sink: Sink<A>, | |
): Signal<B, A> { | |
return { tag: "Handshake", sink }; | |
} | |
export function acknowledge<A = never, B = never>( | |
source: Source<B>, | |
): Signal<B, A> { | |
return { tag: "Acknowledge", source }; | |
} | |
export function data<A = never, B = never>(data: B): Signal<B, A> { | |
return { tag: "Data", data }; | |
} | |
export function pull<A = never, B = never>( | |
count = Number.POSITIVE_INFINITY, | |
): Signal<B, A> { | |
return { tag: "Pull", count }; | |
} | |
export function end<A = never, B = never>(): Signal<B, A> { | |
return _end; | |
} | |
/******************************************************************************* | |
* Signal Handlers | |
******************************************************************************/ | |
type Handlers<B, A> = { | |
onHandshake?: (sink: Sink<A>) => void; | |
onAcknowledge?: (source: Source<B>) => void; | |
onData?: (b: B) => void; | |
onPull?: (count: number) => void; | |
onEnd?: () => void; | |
}; | |
const noop: () => void = () => {}; | |
const noops = { | |
onHandshake: noop, | |
onAcknowledge: noop, | |
onData: noop, | |
onPull: noop, | |
onEnd: noop, | |
}; | |
/******************************************************************************* | |
* Functions | |
******************************************************************************/ | |
export function make<A, B>( | |
handlers: Handlers<B, A> = {}, | |
): Stream<B, A> { | |
const h = Object.assign({}, noops, handlers); | |
return (ta) => { | |
switch (ta.tag) { | |
case "Handshake": | |
return h.onHandshake(ta.sink); | |
case "Acknowledge": | |
return h.onAcknowledge(ta.source); | |
case "Data": | |
return h.onData(ta.data); | |
case "Pull": | |
return h.onPull(ta.count); | |
case "End": | |
return h.onEnd(); | |
} | |
}; | |
} | |
export function of<A>(...values: A[]): Source<A> { | |
return make({ | |
onHandshake: (sink) => { | |
let index = 0; | |
let length = values.length; | |
sink(acknowledge(make({ | |
onEnd: () => { | |
length = 0; | |
}, | |
onPull: (count) => { | |
let pull = count; | |
while (index < length && pull > 0) { | |
sink(data(values[index])); | |
index++; | |
pull++; | |
} | |
if (index >= length) { | |
sink(end()); | |
return true; | |
} | |
}, | |
}))); | |
}, | |
}); | |
} | |
export function map<A, I>( | |
fai: (a: A) => I, | |
): <B>(ta: Stream<B, A>) => Stream<B, I> { | |
return (ta) => { | |
return make({ | |
onHandshake: (sink) => { | |
ta(handshake(make({ | |
onEnd: () => { | |
sink(end()); | |
}, | |
onAcknowledge: (source) => { | |
sink(acknowledge(make({ | |
onEnd: () => source(end()), | |
onPull: (count) => source(pull(count)), | |
}))); | |
}, | |
onData: (a) => sink(data(fai(a))), | |
}))); | |
}, | |
}); | |
}; | |
} | |
export function ap<A, I, B>( | |
tfai: Stream<B, (a: A) => I>, | |
): (ta: Stream<B, A>) => Stream<B, I> { | |
return (ta) => | |
make({ | |
onHandshake: (sink) => { | |
// Cached incoming data | |
let oa: Option<A> = O.none; | |
let ofai: Option<(a: A) => I> = O.none; | |
// Source handles | |
let sfai: Option<Source<(a: A) => I>> = O.none; | |
let sa: Option<Source<A>> = O.none; | |
// Counts | |
let sinkCount = 0; | |
let sfaiCount = 0; | |
let saCount = 0; | |
// Queue for data overflow | |
const queue = new Queue<I>(); | |
tfai(handshake(make({ | |
onAcknowledge: (source) => sfai = O.some(source), | |
onData: (fai) => { | |
sfaiCount--; | |
ofai = O.some(fai); | |
if (sinkCount > 0) { | |
if (queue.length() > 0) { | |
sink(data(queue.shift()!)); | |
sinkCount--; | |
} else { | |
pipe( | |
oa, | |
O.tap((a) => { | |
sink(data(fai(a))); | |
sinkCount--; | |
}), | |
); | |
} | |
} | |
if (sinkCount > 0 && sfaiCount === 0) { | |
sfaiCount++; | |
pipe(sfai, O.tap((source) => source(pull(1)))); | |
} | |
}, | |
}))); | |
ta(handshake(make({ | |
onAcknowledge: (source) => sa = O.some(source), | |
onData: (a) => { | |
saCount--; | |
oa = O.some(a); | |
if (sinkCount > 0) { | |
if (queue.length() > 0) { | |
sink(data(queue.shift()!)); | |
sinkCount--; | |
} else { | |
pipe( | |
ofai, | |
O.tap((fai) => { | |
sink(data(fai(a))); | |
sinkCount--; | |
}), | |
); | |
} | |
} | |
if (sinkCount > 0 && saCount === 0) { | |
saCount++; | |
pipe(sa, O.tap((source) => source(pull(1)))); | |
} | |
}, | |
}))); | |
sink(acknowledge(make({ | |
onPull: (count) => { | |
sinkCount = count; | |
const length = queue.length(); | |
while (sinkCount > 0 && length > 0) { | |
sink(data(queue.shift()!)); | |
sinkCount--; | |
} | |
if (sinkCount > 0) { | |
if (saCount === 0) { | |
saCount++; | |
pipe(sa, O.tap((source) => source(pull(1)))); | |
} | |
if (sfaiCount === 0) { | |
sfaiCount++; | |
pipe(sfai, O.tap((source) => source(pull(1)))); | |
} | |
} | |
}, | |
}))); | |
}, | |
}); | |
} | |
export function forEach<A>( | |
onData: (a: A) => void, | |
onEnd: () => void = noop, | |
): <B>(ta: Stream<B, A>) => void { | |
return (ta) => { | |
ta(handshake(make({ | |
onAcknowledge: (source) => { | |
source(pull()); | |
}, | |
onData, | |
onEnd, | |
}))); | |
}; | |
} | |
const a = pipe( | |
of(1, 2, 3, 4, 5), | |
map((n) => n + 100), | |
); | |
const fa = of((n: number) => n + 1, (n: number) => n * 10); | |
const applied = pipe(a, ap(fa)); | |
const log = forEach( | |
(x) => console.log(JSON.stringify(x)), | |
() => console.log("Done!"), | |
); | |
log(applied); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This feels like a dead end. Moving on to iterable/asynciterable