Skip to content

Instantly share code, notes, and snippets.

@hisui
Last active August 21, 2020 21:20
Show Gist options
  • Save hisui/6261547 to your computer and use it in GitHub Desktop.
Save hisui/6261547 to your computer and use it in GitHub Desktop.
Functional Reactive Programming (FRP) in TypeScript.
// stream.ts - A TypeScript module for FRP.
// - https://gist.github.com/hisui/6261547
// - tsc stream.ts -t es5 --sourcemap --noImplicitAny
module sf {
export class Packet<T> {
constructor(private _flags:number, private _value:any=void 0) {}
static next<A>(e:A):Packet<A> {
return new Packet<A>(0, e);
}
static done<A>():Packet<A> {
return new Packet<A>(1);
}
static fail<A>(e:Error):Packet<A> {
return new Packet<A>(3, e);
}
get done():boolean { return (this._flags & 1) !== 0 }
get fail():boolean { return (this._flags & 2) !== 0 }
get value():T { return !this.done ? this._value: void 0 }
get error():Error { return this.fail ? this._value: void 0 }
}
var done = Packet.done;
var fail = Packet.fail;
var next = Packet.next;
// https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
export interface EventTarget {
addEventListener(type:string, listener:any):void;
addEventListener(type:string, listener:any, useCapture:boolean):void;
removeEventListener(type:string, listener:any):void;
removeEventListener(type:string, listener:any, useCapture:boolean):void;
dispatchEvent(event:any):boolean;
}
function on<A>(target:EventTarget, type:string, useCapture:boolean, f:(chan:Channel<A>, e:Event) => void):Stream<A> {
var chan = new Source2<A>(
() => target. addEventListener(type, listener, useCapture)
, () => target.removeEventListener(type, listener, useCapture))
;
function listener(e:Event):void {
f(chan, e);
}
return chan;
}
export class Stream<T> {
static channel<A>(autofork:boolean=false):Channel<A> {
return autofork ? new Autofork<A>(new Source<A>()): new Source<A>();
}
static flatten<A>(s:Stream<Stream<A>>):Stream<A> {
return s.flatMap(e => e, "sync");
}
static forEach<A>(... args:A[]):Stream<A> {
return new List<A>(args.map(next).concat(done<A>()));
}
static from<A>(gen:() => Packet<A>):Stream<A> {
return new Generator<A>(gen);
}
static zero<A>():Stream<A> {
return new Zero<A>();
}
static fail<A>(e:Error):Stream<A> {
return new List<A>([fail<A>(e)]);
}
static pure<A>(value:A):Stream<A> {
return Stream.forEach(value);
}
static list<A>(a:Packet<A>[]):Stream<A> { return new List<A>(a); }
static step(a:number, b:number, n:number=1):Stream<number> {
if (n <= 0) {
throw new Error("'n' must be greater than 0.");
}
if (Math.abs(a - b) % n) {
throw new Error("Precondition failed: |a - b| mod n == 0");
}
if (a > b) n = -n;
return new Generator<number>(() => {
try {
return a !== b ? next(a): done<number>();
}
finally { a += n }
});
}
static tick<A>(time:number, gen:() => A = (():A => null)):Stream<A> {
var timer = NaN;
return new Source2<A>(function () {
var self:Source2<A> = this;
if (!isNaN(timer)) {
throw new Error("[bug] !isNaN(timer)");
}
timer = setInterval(() => self.yield(next(gen())), time);
}
, () => (clearInterval(timer), timer = NaN));
}
static on(target:EventTarget, type:string, useCapture:boolean=false):Stream<Event> {
return on<Event>(target, type, useCapture, (chan, e) => chan.push(e));
}
static doneOn(target:EventTarget, type:string, useCapture:boolean=false):Stream<Event> {
return on<Event>(target, type, useCapture, (chan, e) => chan.done());
}
static failOn(target:EventTarget, type:string, useCapture:boolean=false):Stream<Event> {
return on<Event>(target, type, useCapture, (chan, e) => chan.fail(new Error(e.type)));
}
static disj<A>(... args:Stream<A>[]):Stream<any[]> {
return new Disjunction(args);
}
static conj<A>(... args:Stream<A>[]):Stream<any[]> {
return new Conjunction(args);
}
disj<U>(s:Stream<U>):Stream<any[]/*[T, U]*/> {
return new Disjunction([this, s]);
}
conj<U>(s:Stream<U>):Stream<any[]/*[T, U]*/> {
return new Conjunction([this, s]);
}
plus(s:Stream<T>):Stream<T> {
return Stream.flatten(Stream.forEach(this, s));
}
next(func:(e:T) => void=null, fatal:boolean=true):void {
this.open(e => {
if (!e.done) func && func(e.value);
else if (e.error && fatal) {
throw e.error;
}
});
}
open(func: (f:Packet<T>) => void):void {}
close():void {}
abort():void {}
$<U>(func:(s:Stream<T>) => Stream<U>):Stream<U> { return func(this) }
connect(chan:Channel<T>):void {
this.open(o => chan.yield(o));
}
channel():Channel<T> {
return new Autofork(new Identity(this));
}
mix(that:Stream<T>):Stream<T> { return new Mix<T>(this, that) }
bind<U>(func:(o:Packet<T>) => Stream<Packet<U>>, auto:boolean, mode:string=null):Stream<U> {
return new Binder<T,U>(this, func, auto, mode || "sync");
}
hold(limit:number=Infinity):Stream<T> {
return new Buffer<T>(this, limit);
}
fork(n:number=2):Stream<T>[] { return new Splitter(this, n)._list; }
get outer1st():Stream<T> { return new BinderOption(this, "stop") }
get inner1st():Stream<T> { return new BinderOption(this, "skip") }
get synchronize():Stream<T> { return new BinderOption(this, "sync") }
get nohup():Stream<T> { return new NoHup<T>(this); }
map0<U>(func:(o:Packet<T>) => Packet<U>, auto:boolean=true):Stream<U> {
return this.bind(o => pure(func(o)), auto, "sync");
}
map<U>(func:(e:T) => U):Stream<U> {
return this.map0(o => next(func(o.value)));
}
halfMap<U>(func:(e:T) => Packet<U>):Stream<U> {
return this.bind(o => pure(func(o.value)), true, "sync");
}
flatMap<U>(func:(e:T) => Stream<U>, mode:string=null):Stream<U> {
return this.bind(o => func(o.value).map0(next), true, mode);
}
recover(func:(e:Error) => Stream<T>=null):Stream<T> {
return this.bind((o:Packet<T>) => ! o.fail ? pure(o)
: func === null ? pure(done<T>())
: func(o.error).map0(next, false)
, false, "sync");
}
fill<U>(e:U):Stream<U> { return this.map(_ => e); }
distinct<U>(lens:(e:T) => U= <X>(e:X) => e, eq:(a:U, b:U) => boolean= (a, b) => a === b):Stream<T> {
var last:T = undefined;
return this.flatMap(e =>
last !== undefined && eq(lens(e), lens(last))
? zero<T>()
: pure<T>(last = e));
}
filter(func:(e:T) => boolean):Stream<T> {
return this.flatMap((e:T) => func(e) ? pure<T>(e): zero<T>(), "sync");
}
// ISSUE tsc-0.9.2 crashes
groupBy(keyof: (e:T) => string):Stream<{_1:string; _2:Stream<T>}> {
var map:{ [key:string]: Source<T> } = {};
return this.bind((o:Packet<T>):Stream<Packet<{_1:string; _2:Stream<T>}>> => {
if (o.done) {
for (var e in map) {
map[e].yield(o);
}
return pure(done<{_1:string; _2:Stream<T>}>());
}
var key = keyof(o.value);
var out = map[key];
try {
if (out !== undefined)
return zero<Packet<{_1:string; _2:Stream<T>}>>();
return pure(
next<{_1:string; _2:Stream<T>}>({
_1: key,
_2: (map[key] = out = new Source<T>()).hold()
}));
} finally {
out.yield(o);
}
}, false, "sync");
}
fatal(e:Error=new Error()):Stream<T> {
return this.map0(o => (!o.done || o.fail) ? o: fail<T>(e), false);
}
onClose(func:() => void):Stream<T> {
return new Closer<T>(this, func);
}
fold<U>(func:(acc:U, e:T) => U, z:U):Stream<U> {
return this.bind((o:Packet<T>) => {
if (o.fail) return Stream.fail<Packet<U>>(o.error);
if (o.done) return Stream.list<Packet<U>>(
[ next(next<U>(z))
, next(done<U>()) ]);
z = func(z, o.value);
return zero<Packet<U>>();
}, false, "sync");
}
takeWhile(func:(e:T) => boolean):Stream<T> {
return this.halfMap((e:T) => func(e) ? next<T>(e): done<T>());
}
take(n:number):Stream<T> {
if (n === 0) {
return zero<T>().onClose(() => this.close()); // TODO: abort
}
if (n > 0) {
return this.bind<T>((o:Packet<T>) =>
new List([next(o)].concat(--n > 0 ? done<Packet<T>>(): next(done<T>()))), false, "sync");
}
return this.fold((acc:T[], e:T) => {
if (acc.push(e) > -n) {
acc.shift();
}
return acc;
}, []).flatMap<T>(e => Stream.forEach.apply(null, e));
}
skipUntil(func:(e:T) => boolean):Stream<T> {
var b = false;
return this.filter(e => b || (b = func(e)));
}
skip(n:number):Stream<T> {
if (n >= 0) {
return this.skipUntil(e => --n < 0);
}
return this.fold((acc:T[], e:T) => (acc.push(e), acc), [])
.flatMap<T>(a => Stream
.forEach.apply(null, a.slice(n)), "skip");
}
// ISSUE tsc-0.9.2 crashes
behind<T>(that:Stream<Stream<T>>):Stream<T> {
var chan = new Source<T>();
var lock = true;
return (<Stream<any>> this).tag(1)
.mix(chan.tag(2))
.mix(that.tag(3))
.flatMap((e:{_1:any; _2:number}) => {
switch (e._2) {
case 1: if (!lock) break;
case 2: return pure(e._1);
case 3:
if (lock) {
lock = false;
(<Stream<T>> e._1).open((o:Packet<T>) => {
if (o.fail || !o.done) {
chan.yield(o);
}
else lock = true;
});
}
}
return zero();
}, "sync");
}
each(f:(e:T) => void):Stream<T> { return this.map(e => (f(e), e)) }
delay(time:number):Stream<T> {
return this.flatMap(e => Stream.tick(time, () => e).take(1));
}
tag<U>(tag:U):Stream<{_1:T; _2:U}> { return this.map(e => ({_1: e, _2: tag})); }
}
export interface Channel<T> {
yield(o:Packet<T>, interrupt?:boolean):void;
push(value:T):void;
done():void;
fail(error:Error):void;
stream():Stream<T>;
}
var pure = Stream.pure;
var zero = Stream.zero;
class LinearScheduler {
private _tasks:Array<() => void> = [];
private _timer = NaN;
schedule(func:() => void):void {
if (isNaN(this._timer)) {
this._timer = setTimeout(() => this.eventLoop(), 0);
}
this._tasks.push(func);
}
private eventLoop():void {
var count = 0;
for (;;) {
var n = this._tasks.length;
if (n === 0) {
break;
}
while (n-- !== 0) {
this._tasks.shift()();
++count;
}
}
this._timer = NaN;
// console.log(count +" tasks done..");
}
}
class Source<T> extends Stream<T> implements Channel<T> {
static scheduler = new LinearScheduler();
private _func:(o:Packet<T>) => void = null;
private _disp = 0;
stream():Stream<T> { return <Stream<T>> this; }
open(func:(o:Packet<T>) => void):void {
if (this._func !== null) throw new Error("stream in use.");
if (this._disp === -1) throw new Error("exhausted");
this._func = ( func || ((o:Packet<T>) => {}));
this._disp ++;
}
yield(o:Packet<T>, interrupt:boolean=false):void {
if (this._disp === -1) {
throw new Error("exhausted");
}
var disp = interrupt ? ++this._disp: this._disp;
Source.scheduler.schedule(() => {
var func = this._func;
if (func !== null && this._disp === disp) {
func(o);
if (!this.closed && o.done) {
this.close();
}
}
});
}
push(value:T):void {
this.yield(next(value));
}
done(value:T=undefined):void {
if (value !== undefined)
this.yield(next(value));
this.yield(done<T>());
}
fail(error:Error):void {
this.yield(fail<T>(error));
}
close():void {
if (this.closed) {
throw new Error("closed");
}
this._func = null;
this._disp++;
}
abort():void {
this.yield(fail<T>(new Error("aborted")), true);
}
get closed():boolean {
return this._func === null;
}
}
class Source2<T> extends Source<T> {
constructor(private _open:() => void
, private _close:() => void
, private _abort:() => void=null)
{
super();
}
open(func:(o:Packet<T>) => void):void {
super.open(func);
this._open();
}
close():void {
super.close();
this._close();
}
abort():void {
super.abort();
this._abort();
}
}
class Autofork<T> implements Channel<T> {
private _tail:Stream<T>;
constructor(private _chan:Channel<T>) {
this._tail = _chan.stream();
}
yield(o:Packet<T>, interrupt:boolean=false):void {
this._chan.yield(o, interrupt);
}
push(value:T):void {
this._chan.push(value);
}
done():void {
this._chan.done();
}
fail(error:Error):void {
this._chan.fail(error);
}
stream():Stream<T> {
var pair =
this._tail.fork(2);
this._tail = pair[0];
return pair[1];
}
}
class Zero<T> extends Source<T> {
constructor() { super(); }
open(func:(o:Packet<T>) => void) {
super.open(func), this.yield(done<T>());
}
}
class List<T> extends Source<T> {
constructor(private _list:Packet<T>[]) { super(); }
open(func:(o:Packet<T>) => void) {
super.open(func);
this._list.forEach(e => this.yield(e));
}
}
class Generator<T> extends Source<T> {
constructor(private _gen:() => Packet<T>) { super(); }
open(func:(o:Packet<T>) => void) {
super.open(o => {
func(o);
if (!(o.done || this.closed)) {
this.yield(this._gen());
}
});
this.yield(this._gen());
}
}
class Wrapper<T> extends Stream<T> {
constructor(public _base:Stream<T>) { super(); }
open(func:(o:Packet<T>) => void):void {
this._base.open(func);
}
abort():void { this._base.abort() }
close():void { this._base.close() }
}
class Filter<T,U> extends Source<U> {
private _open = 0;
constructor(private _base:Stream<T>) { super(); }
open(func:(o:Packet<U>) => void):void {
super.open(func);
this._open = 1;
this._base.open(o => (this.trap(o), this._open &= +! o.done));
}
abort():void {
if (this._open) this._base.abort();
}
close():void {
this._open && this._base.close();
this._open = 0;
super.close();
}
trap(o:Packet<T>):void {}
}
class Identity<T> extends Filter<T,T> {
constructor(base:Stream<T>) { super(base); }
trap(o:Packet<T>):void {
this.yield(o);
}
}
class Closer<T> extends Identity<T> {
constructor(base:Stream<T>, private _action:() => void) {
super(base);
}
close():void {
super.close(), this._action();
}
}
class Binder<T,U> extends Filter<T,U> {
private _next:Stream<Packet<U>> = null;
private _ring:Packet<T>[] = null;
constructor(base:Stream<T>
, private _call:(o:Packet<T>) => Stream<Packet<U>>
, private _auto:boolean
, private _mode:string)
{
super(base);
if (_mode == "sync") this._ring = [];
}
trap(o:Packet<T>):void {
if (this._auto && o.fail) {
this.yield(<Packet<any>>o);
return;
}
if (this._next !== null) {
switch (this._mode) {
case "sync": this._ring.unshift(o);
case "skip":
return;
case "stop":
this._next.close();
this._next = null;
break;
}
}
if (this._auto && o.done) {
this.yield(<Packet<any>>o);
return;
}
(this._next = this._call(o)).open((o:Packet<Packet<U>>) => {
if (o.done) {
this._next = null;
if (o.fail) {
this.yield(<Packet<any>>o);
}
else if (this._ring !== null && this._ring.length > 0) {
this.trap(this._ring.pop())
}
}
else this.yield(o.value);
});
}
close():void {
if (this._next) this.
_next.close();
super.close();
}
}
class BinderOption<T> extends Wrapper<T> {
constructor(base:Stream<T>, private _mode:string) {
super(base);
}
bind<U>(func:(o:Packet<T>) => Stream<Packet<U>>, auto:boolean, mode:string=null):Stream<U> {
return this._base.bind(func, auto, mode || this._mode);
}
}
class Splitter<T> {
private _used = 0;
public _list:ForkSplit<T>[] = [];
constructor(private _base:Stream<T>, n:number) {
for (var i = 0; i < n; ++i) {
this._list.push(new ForkSplit<T>(this));
}
}
inc():void {
if(++this._used === 1) {
this._base.open((o:Packet<T>):void => {
if (o.done) {
this._base = null;
}
this._list.forEach((s:ForkSplit<T>) => s.yield(o));
});
}
}
dec():void {
if (this._base !== null && --this._used === 0) {
this._base.close();
}
}
abort():void {
if (this._base !== null) this._base.abort();
}
}
class ForkSplit<T> extends Source<T> {
constructor(private _parent:Splitter<T>) { super(); }
open(func:(o:Packet<T>) => void):void {
super.open(func);
this._parent.inc();
}
close():void {
super.close();
this._parent.dec();
}
abort():void {
this._parent.abort();
}
}
// TODO reopenable
class Mix<T> extends Source<T> {
constructor(private _a:Stream<T>, private _b:Stream<T>) {
super();
}
open(func:(o:Packet<T>) => void):void {
super.open(func);
this._a.open(this.block(0));
this._b.open(this.block(1));
}
abort():void {
this._a.abort();
this._b.abort();
}
close():void {
if (this._a !== null) this._a.close();
if (this._b !== null) this._b.close();
super.close();
}
private block(i:number):(o:Packet<T>) => void {
return o => {
// trace("[debug] Mix#block("+ i +"): "+ o);
if (!o.done) {
this.yield(o);
return;
}
if (!o.fail) {
if (i === 0
? (this._a = null, this._b === null)
: (this._b = null, this._a === null)) {
this.yield(done<T>());
}
return;
}
var c = i === 0 ? this._b: this._a;
if (c) c.close();
this._a = null;
this._b = null;
this.yield(o);
};
}
}
// TODO reopenable
class Disjunction extends Source<any[]> {
private _close:number;
private _empty:number;
constructor(private _list:Stream<any>[]) { super(); }
open(func:(o:Packet<any>) => void):void {
super.open(func);
var a:any[] = [];
this._close = (1 << this._list.length) - 1;
this._empty = (1 << this._list.length) - 1;
for (var i = 0; i < this._list.length; ++i) {
this._list[i].open(((i:number) =>
(o:Packet<any>) => {
if (!o.done) {
a[i] = o.value;
if((this._empty &= ~(1 << i)) === 0) {
this.yield(next(a.concat()));
}
return;
}
this._close &= ~(1 << i);
if (!o.fail) {
if (this._close === 0) {
this.yield(done<any[]>());
}
return;
}
this.yield(o);
})(i));
a.push(null);
}
}
abort():void {
this._list.forEach(s => s.abort());
super.abort();
}
close():void {
this._list.forEach((s, i) => (this._close & (1 << i)) !== 0 && s.close());
this._close = 0;
super.close();
}
}
// TODO reopenable
class Conjunction extends Source<any[]> {
constructor(private _list:Stream<any>[]) { super(); }
open(func:(o:Packet<any>) => void):void {
super.open(func);
var g:number[] = [0];
var a: any[][] = [ ];
for (var i = 0; i < this._list.length; ++i) {
this._list[i].open(((i:number) =>
(o:Packet<any>) => {
if (o.done) {
this._list.forEach((s, j) => i !== j && s.close());
this.yield(o);
return;
}
var n = a[i].push(o.value) - 1;
if (g.length <= n) {
g.push(0);
}
++g[n];
if (n === 0 && g[0] === a.length) {
g.shift();
this.yield(next(a.reduce((acc, a) => (acc.push(a.pop()), acc), [])));
}
})(i));
a.push([]);
}
}
abort():void {
this._list.forEach(s => s.abort());
super.abort();
}
close():void {
this._list.forEach(s => s.close());
super.close();
}
}
class Buffer<T> extends Source<T> {
private _tmp:Packet<T>[] = [];
constructor(private _base:Stream<T>, private _limit:number) {
super();
_base.open((o:Packet<T>):void => {
if (this._tmp !== null) {
if (this._tmp.push(o) > this._limit) this._tmp.pop();
}
else this.yield(o);
if (o.done) {
this._base = null;
}
});
}
open(func:(o:Packet<T>) => void):void {
super.open(func);
this._tmp.forEach(o => this.yield(o));
this._tmp = null;
}
abort():void {
if (this._base) this.
_base.abort();
super.abort();
}
close():void {
if (this._base) this.
_base.close();
super.close();
}
}
class NoHup<T> extends Wrapper<T> {
constructor(base:Stream<T>) { super(base) }
close():void { /* no-op */ }
abort():void { /* no-op */ }
}
export var Never:Stream<any> = new Source<any>();
}
<html>
<head>
<title>stream.ts</title>
<script src="stream.js"></script>
<script>
var S = sf.Stream;
function start() {
var cv = document.getElementById("cv");
var cx = cv.getContext("2d");
testWithCanvas1(cv, cx);
// testWithCanvas2(cv, cx);
// testWithCanvas3(cv, cx);
//testMix();
testStep();
//testDisj();
//testPlus();
//testGroupBy();
testDistinct();
}
function testWithCanvas1(cv, cx) {
S.on(cv, "mousedown").inner1st
.flatMap(function (e) {
return S.pure(e).plus(S.on(cv, "mousemove")
.mix(S.failOn(cv, "mouseout"))
.mix(S.failOn(cv, "mouseup" )))
.recover(function (e) { return S.pure(null) });
})
.fold(function (a, b) {
if (b === null) return null;
if (a !== null) {
cx.beginPath();
cx.moveTo(a.offsetX, a.offsetY);
cx.lineTo(b.offsetX, b.offsetY);
cx.stroke();
}
return b;
}, null).next();
}
function testWithCanvas2(cv, cx) {
S.on(cv, "mousemove")
.disj(S.on(cv, "mousedown").fill(true)
.mix(
S.on(cv, "mouseout").mix(
S.on(cv, "mouseup" )).fill(false)))
.fold(function (a, b) {
var z = b[0];
if (b[1] && a !== null) {
cx.beginPath();
cx.moveTo(a.offsetX, a.offsetY);
cx.lineTo(z.offsetX, z.offsetY);
cx.stroke();
}
return z;
}, null).next();
}
function testWithCanvas3(cv, cx) {
var drawn = false;
var coord = null;
$(cv).mousedown(function (e) {
drawn = true;
coord = e;
});
$(cv).mousemove(function (b) {
if (!drawn) return;
var a = coord;
cx.beginPath();
cx.moveTo(a.offsetX, a.offsetY);
cx.lineTo(b.offsetX, b.offsetY);
cx.stroke();
coord = b;
});
$(cv).mouseout(function () { drawn = false });
$(cv).mouseup (function () { drawn = false });
}
function testMix() {
var chan1 = sf.Stream.channel();
var chan2 = sf.Stream.channel();
chan1
.filter(function (e)
{
return e.charAt(6) !== "B";
})
.map(function (e) { return e + "+" })
.mix(chan2)
.delay(1000)
.next(function (e)
{
console.log(e);
});
chan1.push("chan1-A");
chan2.push("chan2-A");
chan1.push("chan1-B");
chan2.push("chan2-B");
chan1.push("chan1-C");
setTimeout(function ()
{
chan2.push("chan2-C");
chan2.done();
chan1.done();
}, 100);
}
function testStep() {
S.step(1, 10, 1)
.map(function (e) { return "step:" + e })
.next(function (e) {
console.log(e);
});
}
function testDisj() {
var counter1 = 1;
var counter2 = 2;
var s1 = S.tick(800, function () { return "A :: "+ (counter1 +=2) });
var s2 = S.tick(800, function () { return "B :: "+ (counter2 +=2) });
s1.disj(s2).next(function (e) { console.log(e); });
}
function testPlus() {
var counter1 = 1;
var counter2 = 2;
var s1 = S.tick(800, function () { return "A :: "+ (counter1 +=2) }).take(3);
var s2 = S.tick(800, function () { return "B :: "+ (counter2 +=2) }).take(3);
s1.plus(s2).next(function (e) { console.log(e); });
}
function testGroupBy() {
var s = S.forEach(
"a:foo",
"b:bar",
"b:baz",
"a:(^_^;)",
"c:(-_-;)").delay(400);
s.groupBy(function (e) {
return e.charAt(0);
})
.fold(function (n, e) {
console.log("group: "+ e._1 + " as #" + n);
e._2.next(function (e) {
console.log("#"+ n + " --> " + e);
});
return n + 1;
}, 1)
.next();
}
function testDistinct() {
var s = sf.Stream.forEach(
"a:foo",
"b:bar",
"b:baz",
"a:(^_^;)",
"c:(-_-;)").delay(300);
s.distinct(function (e) { return e.charAt(0) })
.next(function (e)
{
console.log(e);
});
}
</script>
</head>
<body onload="start()">
<h1>stream.ts</h1>
<div>
<a href="https://gist.github.com/hisui/6261547">gist</a>
<div style="user-select:none;">
<canvas id="cv" width="300" height="300" style="background:#eeeeee;"></canvas>
</div>
</div>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment