Skip to content

Instantly share code, notes, and snippets.

@baetheus
Last active August 30, 2021 17:56
Show Gist options
  • Save baetheus/b377312cdab21fc63459eac017c08409 to your computer and use it in GitHub Desktop.
Save baetheus/b377312cdab21fc63459eac017c08409 to your computer and use it in GitHub Desktop.
Streams based on callbags
import type * as HKT from "./hkt.ts";
import type * as TC from "./type_classes.ts";
import type { Option } from "./option.ts";
import * as O from "./option.ts";
import { Queue } from "./denque.ts";
import { pipe } from "./fns.ts";
/*******************************************************************************
* Types
******************************************************************************/
export type Handshake<A> = { tag: "Handshake"; sink: Sink<A> };
export type Acknowledge<B> = { tag: "Acknowledge"; source: Source<B> };
export type Data<B> = { tag: "Data"; data: B };
export type Pull = { tag: "Pull"; count: number };
export type End = { tag: "End" };
export type Signal<B, A> = Handshake<A> | Acknowledge<B> | Data<B> | Pull | End;
export type Stream<B, A> = (signal: Signal<B, A>) => void;
export type Source<A> = Stream<never, A>;
export type Sink<B> = Stream<B, never>;
export type From<T> = T extends Stream<infer B, infer _> ? B : never;
export type To<T> = T extends Stream<infer _, infer A> ? A : never;
/*******************************************************************************
* Kind Registration
******************************************************************************/
export const URI = "Stream";
export type URI = typeof URI;
declare module "./hkt.ts" {
// deno-lint-ignore no-explicit-any
export interface Kinds<_ extends any[]> {
[URI]: Stream<_[1], _[0]>;
}
}
/*******************************************************************************
* Signal Constructors
******************************************************************************/
const _end: End = { tag: "End" };
export function handshake<A = never, B = never>(
sink: Sink<A>,
): Signal<B, A> {
return { tag: "Handshake", sink };
}
export function acknowledge<A = never, B = never>(
source: Source<B>,
): Signal<B, A> {
return { tag: "Acknowledge", source };
}
export function data<A = never, B = never>(data: B): Signal<B, A> {
return { tag: "Data", data };
}
export function pull<A = never, B = never>(
count = Number.POSITIVE_INFINITY,
): Signal<B, A> {
return { tag: "Pull", count };
}
export function end<A = never, B = never>(): Signal<B, A> {
return _end;
}
/*******************************************************************************
* Signal Handlers
******************************************************************************/
type Handlers<B, A> = {
onHandshake?: (sink: Sink<A>) => void;
onAcknowledge?: (source: Source<B>) => void;
onData?: (b: B) => void;
onPull?: (count: number) => void;
onEnd?: () => void;
};
const noop: () => void = () => {};
const noops = {
onHandshake: noop,
onAcknowledge: noop,
onData: noop,
onPull: noop,
onEnd: noop,
};
/*******************************************************************************
* Functions
******************************************************************************/
export function make<A, B>(
handlers: Handlers<B, A> = {},
): Stream<B, A> {
const h = Object.assign({}, noops, handlers);
return (ta) => {
switch (ta.tag) {
case "Handshake":
return h.onHandshake(ta.sink);
case "Acknowledge":
return h.onAcknowledge(ta.source);
case "Data":
return h.onData(ta.data);
case "Pull":
return h.onPull(ta.count);
case "End":
return h.onEnd();
}
};
}
export function of<A>(...values: A[]): Source<A> {
return make({
onHandshake: (sink) => {
let index = 0;
let length = values.length;
sink(acknowledge(make({
onEnd: () => {
length = 0;
},
onPull: (count) => {
let pull = count;
while (index < length && pull > 0) {
sink(data(values[index]));
index++;
pull++;
}
if (index >= length) {
sink(end());
return true;
}
},
})));
},
});
}
export function map<A, I>(
fai: (a: A) => I,
): <B>(ta: Stream<B, A>) => Stream<B, I> {
return (ta) => {
return make({
onHandshake: (sink) => {
ta(handshake(make({
onEnd: () => {
sink(end());
},
onAcknowledge: (source) => {
sink(acknowledge(make({
onEnd: () => source(end()),
onPull: (count) => source(pull(count)),
})));
},
onData: (a) => sink(data(fai(a))),
})));
},
});
};
}
export function ap<A, I, B>(
tfai: Stream<B, (a: A) => I>,
): (ta: Stream<B, A>) => Stream<B, I> {
return (ta) =>
make({
onHandshake: (sink) => {
// Cached incoming data
let oa: Option<A> = O.none;
let ofai: Option<(a: A) => I> = O.none;
// Source handles
let sfai: Option<Source<(a: A) => I>> = O.none;
let sa: Option<Source<A>> = O.none;
// Counts
let sinkCount = 0;
let sfaiCount = 0;
let saCount = 0;
// Queue for data overflow
const queue = new Queue<I>();
tfai(handshake(make({
onAcknowledge: (source) => sfai = O.some(source),
onData: (fai) => {
sfaiCount--;
ofai = O.some(fai);
if (sinkCount > 0) {
if (queue.length() > 0) {
sink(data(queue.shift()!));
sinkCount--;
} else {
pipe(
oa,
O.tap((a) => {
sink(data(fai(a)));
sinkCount--;
}),
);
}
}
if (sinkCount > 0 && sfaiCount === 0) {
sfaiCount++;
pipe(sfai, O.tap((source) => source(pull(1))));
}
},
})));
ta(handshake(make({
onAcknowledge: (source) => sa = O.some(source),
onData: (a) => {
saCount--;
oa = O.some(a);
if (sinkCount > 0) {
if (queue.length() > 0) {
sink(data(queue.shift()!));
sinkCount--;
} else {
pipe(
ofai,
O.tap((fai) => {
sink(data(fai(a)));
sinkCount--;
}),
);
}
}
if (sinkCount > 0 && saCount === 0) {
saCount++;
pipe(sa, O.tap((source) => source(pull(1))));
}
},
})));
sink(acknowledge(make({
onPull: (count) => {
sinkCount = count;
const length = queue.length();
while (sinkCount > 0 && length > 0) {
sink(data(queue.shift()!));
sinkCount--;
}
if (sinkCount > 0) {
if (saCount === 0) {
saCount++;
pipe(sa, O.tap((source) => source(pull(1))));
}
if (sfaiCount === 0) {
sfaiCount++;
pipe(sfai, O.tap((source) => source(pull(1))));
}
}
},
})));
},
});
}
export function forEach<A>(
onData: (a: A) => void,
onEnd: () => void = noop,
): <B>(ta: Stream<B, A>) => void {
return (ta) => {
ta(handshake(make({
onAcknowledge: (source) => {
source(pull());
},
onData,
onEnd,
})));
};
}
const a = pipe(
of(1, 2, 3, 4, 5),
map((n) => n + 100),
);
const fa = of((n: number) => n + 1, (n: number) => n * 10);
const applied = pipe(a, ap(fa));
const log = forEach(
(x) => console.log(JSON.stringify(x)),
() => console.log("Done!"),
);
log(applied);
@baetheus
Copy link
Author

This feels like a dead end. Moving on to iterable/asynciterable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment