Skip to content

Instantly share code, notes, and snippets.

@baetheus
Last active December 26, 2020 07:26
Show Gist options
  • Select an option

  • Save baetheus/5a8012373e863cb7b157bef8505fabd5 to your computer and use it in GitHub Desktop.

Select an option

Save baetheus/5a8012373e863cb7b157bef8505fabd5 to your computer and use it in GitHub Desktop.
A 15 minute start at observable/callbag/most streams for Deno
import type * as TC from "https://deno.land/x/hkts/type_classes.ts";
import type { _, Fn } from "https://deno.land/x/hkts/types.ts";
import { pipe } from "https://deno.land/x/hkts/fns.ts";
enum Signal {
Open,
Ready,
Pause,
Drain,
Close,
}
enum Status {
Initial,
Active,
Paused,
Complete,
}
type Handler<A> = {
link: Fn<[Fn<[Signal], void>], void>;
next: Fn<[A], void>;
done: Fn<[], void>;
};
type Handle = Fn<[], void>;
// TODO abstract scheduler
type Stream<A> = {
listen: Fn<[Handler<A>], Handle>;
};
/***************************************************************************************************
* @section Constructors
**************************************************************************************************/
const DefaultHandler: Handler<any> = {
link: () => {
throw new Error(`DefaultHandler#link called!`);
},
next: () => {
throw new Error(`DefaultHandler#next called!`);
},
done: () => {
throw new Error(`DefaultHandler#done called!`);
},
};
export const of = <A>(a: A): Stream<A> => {
let s = Status.Initial;
let h = DefaultHandler;
const monitor = (signal: Signal) => {
console.log("Received Signal", signal);
switch (signal) {
case Signal.Open:
s = Status.Active;
break;
case Signal.Ready:
case Signal.Drain:
switch (s) {
case Status.Initial:
throw new Error("Handler signalled Ready or Drain before Open");
case Status.Complete:
throw new Error("Handler signalled Ready or Drain after Complete");
case Status.Active:
case Status.Paused:
h.next(a);
s = Status.Complete;
h.done();
}
break;
case Signal.Pause:
switch (s) {
case Status.Initial:
s = Status.Paused;
break;
case Status.Active:
case Status.Paused:
s = Status.Paused;
break;
case Status.Complete:
throw new Error("Handler signalled Pause after Complete");
}
break;
case Signal.Close:
if (s !== Status.Complete) {
h.done();
s = Status.Complete;
}
}
};
return {
listen: (handler) => {
h = handler;
h.link(monitor);
return () => {
monitor(Signal.Close);
};
},
};
};
/***************************************************************************************************
* @section Modules
**************************************************************************************************/
export const Functor: TC.Functor<Stream<_>> = {
map: (fab) =>
(ta) => ({
listen: ({ link, next, done }) =>
ta.listen({
link,
done,
next: (a) => next(fab(a)),
}),
}),
};
const test = of(1);
const test1 = pipe(
test,
Functor.map((n: number) => n.toString()),
);
test1.listen({
link: (tallback) => {
tallback(Signal.Open);
tallback(Signal.Drain);
},
next: console.log,
done: () => console.log("Done!"),
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment