Last active
August 21, 2020 21:20
-
-
Save hisui/6261547 to your computer and use it in GitHub Desktop.
Functional Reactive Programming (FRP) in TypeScript.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// stream.ts - A TypeScript module for FRP. | |
// - https://gist.github.com/hisui/6261547 | |
// - tsc stream.ts -t es5 --sourcemap --noImplicitAny | |
module sf { | |
export class Packet<T> { | |
constructor(private _flags:number, private _value:any=void 0) {} | |
static next<A>(e:A):Packet<A> { | |
return new Packet<A>(0, e); | |
} | |
static done<A>():Packet<A> { | |
return new Packet<A>(1); | |
} | |
static fail<A>(e:Error):Packet<A> { | |
return new Packet<A>(3, e); | |
} | |
get done():boolean { return (this._flags & 1) !== 0 } | |
get fail():boolean { return (this._flags & 2) !== 0 } | |
get value():T { return !this.done ? this._value: void 0 } | |
get error():Error { return this.fail ? this._value: void 0 } | |
} | |
var done = Packet.done; | |
var fail = Packet.fail; | |
var next = Packet.next; | |
// https://developer.mozilla.org/en-US/docs/Web/API/EventTarget | |
export interface EventTarget { | |
addEventListener(type:string, listener:any):void; | |
addEventListener(type:string, listener:any, useCapture:boolean):void; | |
removeEventListener(type:string, listener:any):void; | |
removeEventListener(type:string, listener:any, useCapture:boolean):void; | |
dispatchEvent(event:any):boolean; | |
} | |
function on<A>(target:EventTarget, type:string, useCapture:boolean, f:(chan:Channel<A>, e:Event) => void):Stream<A> { | |
var chan = new Source2<A>( | |
() => target. addEventListener(type, listener, useCapture) | |
, () => target.removeEventListener(type, listener, useCapture)) | |
; | |
function listener(e:Event):void { | |
f(chan, e); | |
} | |
return chan; | |
} | |
export class Stream<T> { | |
static channel<A>(autofork:boolean=false):Channel<A> { | |
return autofork ? new Autofork<A>(new Source<A>()): new Source<A>(); | |
} | |
static flatten<A>(s:Stream<Stream<A>>):Stream<A> { | |
return s.flatMap(e => e, "sync"); | |
} | |
static forEach<A>(... args:A[]):Stream<A> { | |
return new List<A>(args.map(next).concat(done<A>())); | |
} | |
static from<A>(gen:() => Packet<A>):Stream<A> { | |
return new Generator<A>(gen); | |
} | |
static zero<A>():Stream<A> { | |
return new Zero<A>(); | |
} | |
static fail<A>(e:Error):Stream<A> { | |
return new List<A>([fail<A>(e)]); | |
} | |
static pure<A>(value:A):Stream<A> { | |
return Stream.forEach(value); | |
} | |
static list<A>(a:Packet<A>[]):Stream<A> { return new List<A>(a); } | |
static step(a:number, b:number, n:number=1):Stream<number> { | |
if (n <= 0) { | |
throw new Error("'n' must be greater than 0."); | |
} | |
if (Math.abs(a - b) % n) { | |
throw new Error("Precondition failed: |a - b| mod n == 0"); | |
} | |
if (a > b) n = -n; | |
return new Generator<number>(() => { | |
try { | |
return a !== b ? next(a): done<number>(); | |
} | |
finally { a += n } | |
}); | |
} | |
static tick<A>(time:number, gen:() => A = (():A => null)):Stream<A> { | |
var timer = NaN; | |
return new Source2<A>(function () { | |
var self:Source2<A> = this; | |
if (!isNaN(timer)) { | |
throw new Error("[bug] !isNaN(timer)"); | |
} | |
timer = setInterval(() => self.yield(next(gen())), time); | |
} | |
, () => (clearInterval(timer), timer = NaN)); | |
} | |
static on(target:EventTarget, type:string, useCapture:boolean=false):Stream<Event> { | |
return on<Event>(target, type, useCapture, (chan, e) => chan.push(e)); | |
} | |
static doneOn(target:EventTarget, type:string, useCapture:boolean=false):Stream<Event> { | |
return on<Event>(target, type, useCapture, (chan, e) => chan.done()); | |
} | |
static failOn(target:EventTarget, type:string, useCapture:boolean=false):Stream<Event> { | |
return on<Event>(target, type, useCapture, (chan, e) => chan.fail(new Error(e.type))); | |
} | |
static disj<A>(... args:Stream<A>[]):Stream<any[]> { | |
return new Disjunction(args); | |
} | |
static conj<A>(... args:Stream<A>[]):Stream<any[]> { | |
return new Conjunction(args); | |
} | |
disj<U>(s:Stream<U>):Stream<any[]/*[T, U]*/> { | |
return new Disjunction([this, s]); | |
} | |
conj<U>(s:Stream<U>):Stream<any[]/*[T, U]*/> { | |
return new Conjunction([this, s]); | |
} | |
plus(s:Stream<T>):Stream<T> { | |
return Stream.flatten(Stream.forEach(this, s)); | |
} | |
next(func:(e:T) => void=null, fatal:boolean=true):void { | |
this.open(e => { | |
if (!e.done) func && func(e.value); | |
else if (e.error && fatal) { | |
throw e.error; | |
} | |
}); | |
} | |
open(func: (f:Packet<T>) => void):void {} | |
close():void {} | |
abort():void {} | |
$<U>(func:(s:Stream<T>) => Stream<U>):Stream<U> { return func(this) } | |
connect(chan:Channel<T>):void { | |
this.open(o => chan.yield(o)); | |
} | |
channel():Channel<T> { | |
return new Autofork(new Identity(this)); | |
} | |
mix(that:Stream<T>):Stream<T> { return new Mix<T>(this, that) } | |
bind<U>(func:(o:Packet<T>) => Stream<Packet<U>>, auto:boolean, mode:string=null):Stream<U> { | |
return new Binder<T,U>(this, func, auto, mode || "sync"); | |
} | |
hold(limit:number=Infinity):Stream<T> { | |
return new Buffer<T>(this, limit); | |
} | |
fork(n:number=2):Stream<T>[] { return new Splitter(this, n)._list; } | |
get outer1st():Stream<T> { return new BinderOption(this, "stop") } | |
get inner1st():Stream<T> { return new BinderOption(this, "skip") } | |
get synchronize():Stream<T> { return new BinderOption(this, "sync") } | |
get nohup():Stream<T> { return new NoHup<T>(this); } | |
map0<U>(func:(o:Packet<T>) => Packet<U>, auto:boolean=true):Stream<U> { | |
return this.bind(o => pure(func(o)), auto, "sync"); | |
} | |
map<U>(func:(e:T) => U):Stream<U> { | |
return this.map0(o => next(func(o.value))); | |
} | |
halfMap<U>(func:(e:T) => Packet<U>):Stream<U> { | |
return this.bind(o => pure(func(o.value)), true, "sync"); | |
} | |
flatMap<U>(func:(e:T) => Stream<U>, mode:string=null):Stream<U> { | |
return this.bind(o => func(o.value).map0(next), true, mode); | |
} | |
recover(func:(e:Error) => Stream<T>=null):Stream<T> { | |
return this.bind((o:Packet<T>) => ! o.fail ? pure(o) | |
: func === null ? pure(done<T>()) | |
: func(o.error).map0(next, false) | |
, false, "sync"); | |
} | |
fill<U>(e:U):Stream<U> { return this.map(_ => e); } | |
distinct<U>(lens:(e:T) => U= <X>(e:X) => e, eq:(a:U, b:U) => boolean= (a, b) => a === b):Stream<T> { | |
var last:T = undefined; | |
return this.flatMap(e => | |
last !== undefined && eq(lens(e), lens(last)) | |
? zero<T>() | |
: pure<T>(last = e)); | |
} | |
filter(func:(e:T) => boolean):Stream<T> { | |
return this.flatMap((e:T) => func(e) ? pure<T>(e): zero<T>(), "sync"); | |
} | |
// ISSUE tsc-0.9.2 crashes | |
groupBy(keyof: (e:T) => string):Stream<{_1:string; _2:Stream<T>}> { | |
var map:{ [key:string]: Source<T> } = {}; | |
return this.bind((o:Packet<T>):Stream<Packet<{_1:string; _2:Stream<T>}>> => { | |
if (o.done) { | |
for (var e in map) { | |
map[e].yield(o); | |
} | |
return pure(done<{_1:string; _2:Stream<T>}>()); | |
} | |
var key = keyof(o.value); | |
var out = map[key]; | |
try { | |
if (out !== undefined) | |
return zero<Packet<{_1:string; _2:Stream<T>}>>(); | |
return pure( | |
next<{_1:string; _2:Stream<T>}>({ | |
_1: key, | |
_2: (map[key] = out = new Source<T>()).hold() | |
})); | |
} finally { | |
out.yield(o); | |
} | |
}, false, "sync"); | |
} | |
fatal(e:Error=new Error()):Stream<T> { | |
return this.map0(o => (!o.done || o.fail) ? o: fail<T>(e), false); | |
} | |
onClose(func:() => void):Stream<T> { | |
return new Closer<T>(this, func); | |
} | |
fold<U>(func:(acc:U, e:T) => U, z:U):Stream<U> { | |
return this.bind((o:Packet<T>) => { | |
if (o.fail) return Stream.fail<Packet<U>>(o.error); | |
if (o.done) return Stream.list<Packet<U>>( | |
[ next(next<U>(z)) | |
, next(done<U>()) ]); | |
z = func(z, o.value); | |
return zero<Packet<U>>(); | |
}, false, "sync"); | |
} | |
takeWhile(func:(e:T) => boolean):Stream<T> { | |
return this.halfMap((e:T) => func(e) ? next<T>(e): done<T>()); | |
} | |
take(n:number):Stream<T> { | |
if (n === 0) { | |
return zero<T>().onClose(() => this.close()); // TODO: abort | |
} | |
if (n > 0) { | |
return this.bind<T>((o:Packet<T>) => | |
new List([next(o)].concat(--n > 0 ? done<Packet<T>>(): next(done<T>()))), false, "sync"); | |
} | |
return this.fold((acc:T[], e:T) => { | |
if (acc.push(e) > -n) { | |
acc.shift(); | |
} | |
return acc; | |
}, []).flatMap<T>(e => Stream.forEach.apply(null, e)); | |
} | |
skipUntil(func:(e:T) => boolean):Stream<T> { | |
var b = false; | |
return this.filter(e => b || (b = func(e))); | |
} | |
skip(n:number):Stream<T> { | |
if (n >= 0) { | |
return this.skipUntil(e => --n < 0); | |
} | |
return this.fold((acc:T[], e:T) => (acc.push(e), acc), []) | |
.flatMap<T>(a => Stream | |
.forEach.apply(null, a.slice(n)), "skip"); | |
} | |
// ISSUE tsc-0.9.2 crashes | |
behind<T>(that:Stream<Stream<T>>):Stream<T> { | |
var chan = new Source<T>(); | |
var lock = true; | |
return (<Stream<any>> this).tag(1) | |
.mix(chan.tag(2)) | |
.mix(that.tag(3)) | |
.flatMap((e:{_1:any; _2:number}) => { | |
switch (e._2) { | |
case 1: if (!lock) break; | |
case 2: return pure(e._1); | |
case 3: | |
if (lock) { | |
lock = false; | |
(<Stream<T>> e._1).open((o:Packet<T>) => { | |
if (o.fail || !o.done) { | |
chan.yield(o); | |
} | |
else lock = true; | |
}); | |
} | |
} | |
return zero(); | |
}, "sync"); | |
} | |
each(f:(e:T) => void):Stream<T> { return this.map(e => (f(e), e)) } | |
delay(time:number):Stream<T> { | |
return this.flatMap(e => Stream.tick(time, () => e).take(1)); | |
} | |
tag<U>(tag:U):Stream<{_1:T; _2:U}> { return this.map(e => ({_1: e, _2: tag})); } | |
} | |
export interface Channel<T> { | |
yield(o:Packet<T>, interrupt?:boolean):void; | |
push(value:T):void; | |
done():void; | |
fail(error:Error):void; | |
stream():Stream<T>; | |
} | |
var pure = Stream.pure; | |
var zero = Stream.zero; | |
class LinearScheduler { | |
private _tasks:Array<() => void> = []; | |
private _timer = NaN; | |
schedule(func:() => void):void { | |
if (isNaN(this._timer)) { | |
this._timer = setTimeout(() => this.eventLoop(), 0); | |
} | |
this._tasks.push(func); | |
} | |
private eventLoop():void { | |
var count = 0; | |
for (;;) { | |
var n = this._tasks.length; | |
if (n === 0) { | |
break; | |
} | |
while (n-- !== 0) { | |
this._tasks.shift()(); | |
++count; | |
} | |
} | |
this._timer = NaN; | |
// console.log(count +" tasks done.."); | |
} | |
} | |
class Source<T> extends Stream<T> implements Channel<T> { | |
static scheduler = new LinearScheduler(); | |
private _func:(o:Packet<T>) => void = null; | |
private _disp = 0; | |
stream():Stream<T> { return <Stream<T>> this; } | |
open(func:(o:Packet<T>) => void):void { | |
if (this._func !== null) throw new Error("stream in use."); | |
if (this._disp === -1) throw new Error("exhausted"); | |
this._func = ( func || ((o:Packet<T>) => {})); | |
this._disp ++; | |
} | |
yield(o:Packet<T>, interrupt:boolean=false):void { | |
if (this._disp === -1) { | |
throw new Error("exhausted"); | |
} | |
var disp = interrupt ? ++this._disp: this._disp; | |
Source.scheduler.schedule(() => { | |
var func = this._func; | |
if (func !== null && this._disp === disp) { | |
func(o); | |
if (!this.closed && o.done) { | |
this.close(); | |
} | |
} | |
}); | |
} | |
push(value:T):void { | |
this.yield(next(value)); | |
} | |
done(value:T=undefined):void { | |
if (value !== undefined) | |
this.yield(next(value)); | |
this.yield(done<T>()); | |
} | |
fail(error:Error):void { | |
this.yield(fail<T>(error)); | |
} | |
close():void { | |
if (this.closed) { | |
throw new Error("closed"); | |
} | |
this._func = null; | |
this._disp++; | |
} | |
abort():void { | |
this.yield(fail<T>(new Error("aborted")), true); | |
} | |
get closed():boolean { | |
return this._func === null; | |
} | |
} | |
class Source2<T> extends Source<T> { | |
constructor(private _open:() => void | |
, private _close:() => void | |
, private _abort:() => void=null) | |
{ | |
super(); | |
} | |
open(func:(o:Packet<T>) => void):void { | |
super.open(func); | |
this._open(); | |
} | |
close():void { | |
super.close(); | |
this._close(); | |
} | |
abort():void { | |
super.abort(); | |
this._abort(); | |
} | |
} | |
class Autofork<T> implements Channel<T> { | |
private _tail:Stream<T>; | |
constructor(private _chan:Channel<T>) { | |
this._tail = _chan.stream(); | |
} | |
yield(o:Packet<T>, interrupt:boolean=false):void { | |
this._chan.yield(o, interrupt); | |
} | |
push(value:T):void { | |
this._chan.push(value); | |
} | |
done():void { | |
this._chan.done(); | |
} | |
fail(error:Error):void { | |
this._chan.fail(error); | |
} | |
stream():Stream<T> { | |
var pair = | |
this._tail.fork(2); | |
this._tail = pair[0]; | |
return pair[1]; | |
} | |
} | |
class Zero<T> extends Source<T> { | |
constructor() { super(); } | |
open(func:(o:Packet<T>) => void) { | |
super.open(func), this.yield(done<T>()); | |
} | |
} | |
class List<T> extends Source<T> { | |
constructor(private _list:Packet<T>[]) { super(); } | |
open(func:(o:Packet<T>) => void) { | |
super.open(func); | |
this._list.forEach(e => this.yield(e)); | |
} | |
} | |
class Generator<T> extends Source<T> { | |
constructor(private _gen:() => Packet<T>) { super(); } | |
open(func:(o:Packet<T>) => void) { | |
super.open(o => { | |
func(o); | |
if (!(o.done || this.closed)) { | |
this.yield(this._gen()); | |
} | |
}); | |
this.yield(this._gen()); | |
} | |
} | |
class Wrapper<T> extends Stream<T> { | |
constructor(public _base:Stream<T>) { super(); } | |
open(func:(o:Packet<T>) => void):void { | |
this._base.open(func); | |
} | |
abort():void { this._base.abort() } | |
close():void { this._base.close() } | |
} | |
class Filter<T,U> extends Source<U> { | |
private _open = 0; | |
constructor(private _base:Stream<T>) { super(); } | |
open(func:(o:Packet<U>) => void):void { | |
super.open(func); | |
this._open = 1; | |
this._base.open(o => (this.trap(o), this._open &= +! o.done)); | |
} | |
abort():void { | |
if (this._open) this._base.abort(); | |
} | |
close():void { | |
this._open && this._base.close(); | |
this._open = 0; | |
super.close(); | |
} | |
trap(o:Packet<T>):void {} | |
} | |
class Identity<T> extends Filter<T,T> { | |
constructor(base:Stream<T>) { super(base); } | |
trap(o:Packet<T>):void { | |
this.yield(o); | |
} | |
} | |
class Closer<T> extends Identity<T> { | |
constructor(base:Stream<T>, private _action:() => void) { | |
super(base); | |
} | |
close():void { | |
super.close(), this._action(); | |
} | |
} | |
class Binder<T,U> extends Filter<T,U> { | |
private _next:Stream<Packet<U>> = null; | |
private _ring:Packet<T>[] = null; | |
constructor(base:Stream<T> | |
, private _call:(o:Packet<T>) => Stream<Packet<U>> | |
, private _auto:boolean | |
, private _mode:string) | |
{ | |
super(base); | |
if (_mode == "sync") this._ring = []; | |
} | |
trap(o:Packet<T>):void { | |
if (this._auto && o.fail) { | |
this.yield(<Packet<any>>o); | |
return; | |
} | |
if (this._next !== null) { | |
switch (this._mode) { | |
case "sync": this._ring.unshift(o); | |
case "skip": | |
return; | |
case "stop": | |
this._next.close(); | |
this._next = null; | |
break; | |
} | |
} | |
if (this._auto && o.done) { | |
this.yield(<Packet<any>>o); | |
return; | |
} | |
(this._next = this._call(o)).open((o:Packet<Packet<U>>) => { | |
if (o.done) { | |
this._next = null; | |
if (o.fail) { | |
this.yield(<Packet<any>>o); | |
} | |
else if (this._ring !== null && this._ring.length > 0) { | |
this.trap(this._ring.pop()) | |
} | |
} | |
else this.yield(o.value); | |
}); | |
} | |
close():void { | |
if (this._next) this. | |
_next.close(); | |
super.close(); | |
} | |
} | |
class BinderOption<T> extends Wrapper<T> { | |
constructor(base:Stream<T>, private _mode:string) { | |
super(base); | |
} | |
bind<U>(func:(o:Packet<T>) => Stream<Packet<U>>, auto:boolean, mode:string=null):Stream<U> { | |
return this._base.bind(func, auto, mode || this._mode); | |
} | |
} | |
class Splitter<T> { | |
private _used = 0; | |
public _list:ForkSplit<T>[] = []; | |
constructor(private _base:Stream<T>, n:number) { | |
for (var i = 0; i < n; ++i) { | |
this._list.push(new ForkSplit<T>(this)); | |
} | |
} | |
inc():void { | |
if(++this._used === 1) { | |
this._base.open((o:Packet<T>):void => { | |
if (o.done) { | |
this._base = null; | |
} | |
this._list.forEach((s:ForkSplit<T>) => s.yield(o)); | |
}); | |
} | |
} | |
dec():void { | |
if (this._base !== null && --this._used === 0) { | |
this._base.close(); | |
} | |
} | |
abort():void { | |
if (this._base !== null) this._base.abort(); | |
} | |
} | |
class ForkSplit<T> extends Source<T> { | |
constructor(private _parent:Splitter<T>) { super(); } | |
open(func:(o:Packet<T>) => void):void { | |
super.open(func); | |
this._parent.inc(); | |
} | |
close():void { | |
super.close(); | |
this._parent.dec(); | |
} | |
abort():void { | |
this._parent.abort(); | |
} | |
} | |
// TODO reopenable | |
class Mix<T> extends Source<T> { | |
constructor(private _a:Stream<T>, private _b:Stream<T>) { | |
super(); | |
} | |
open(func:(o:Packet<T>) => void):void { | |
super.open(func); | |
this._a.open(this.block(0)); | |
this._b.open(this.block(1)); | |
} | |
abort():void { | |
this._a.abort(); | |
this._b.abort(); | |
} | |
close():void { | |
if (this._a !== null) this._a.close(); | |
if (this._b !== null) this._b.close(); | |
super.close(); | |
} | |
private block(i:number):(o:Packet<T>) => void { | |
return o => { | |
// trace("[debug] Mix#block("+ i +"): "+ o); | |
if (!o.done) { | |
this.yield(o); | |
return; | |
} | |
if (!o.fail) { | |
if (i === 0 | |
? (this._a = null, this._b === null) | |
: (this._b = null, this._a === null)) { | |
this.yield(done<T>()); | |
} | |
return; | |
} | |
var c = i === 0 ? this._b: this._a; | |
if (c) c.close(); | |
this._a = null; | |
this._b = null; | |
this.yield(o); | |
}; | |
} | |
} | |
// TODO reopenable | |
class Disjunction extends Source<any[]> { | |
private _close:number; | |
private _empty:number; | |
constructor(private _list:Stream<any>[]) { super(); } | |
open(func:(o:Packet<any>) => void):void { | |
super.open(func); | |
var a:any[] = []; | |
this._close = (1 << this._list.length) - 1; | |
this._empty = (1 << this._list.length) - 1; | |
for (var i = 0; i < this._list.length; ++i) { | |
this._list[i].open(((i:number) => | |
(o:Packet<any>) => { | |
if (!o.done) { | |
a[i] = o.value; | |
if((this._empty &= ~(1 << i)) === 0) { | |
this.yield(next(a.concat())); | |
} | |
return; | |
} | |
this._close &= ~(1 << i); | |
if (!o.fail) { | |
if (this._close === 0) { | |
this.yield(done<any[]>()); | |
} | |
return; | |
} | |
this.yield(o); | |
})(i)); | |
a.push(null); | |
} | |
} | |
abort():void { | |
this._list.forEach(s => s.abort()); | |
super.abort(); | |
} | |
close():void { | |
this._list.forEach((s, i) => (this._close & (1 << i)) !== 0 && s.close()); | |
this._close = 0; | |
super.close(); | |
} | |
} | |
// TODO reopenable | |
class Conjunction extends Source<any[]> { | |
constructor(private _list:Stream<any>[]) { super(); } | |
open(func:(o:Packet<any>) => void):void { | |
super.open(func); | |
var g:number[] = [0]; | |
var a: any[][] = [ ]; | |
for (var i = 0; i < this._list.length; ++i) { | |
this._list[i].open(((i:number) => | |
(o:Packet<any>) => { | |
if (o.done) { | |
this._list.forEach((s, j) => i !== j && s.close()); | |
this.yield(o); | |
return; | |
} | |
var n = a[i].push(o.value) - 1; | |
if (g.length <= n) { | |
g.push(0); | |
} | |
++g[n]; | |
if (n === 0 && g[0] === a.length) { | |
g.shift(); | |
this.yield(next(a.reduce((acc, a) => (acc.push(a.pop()), acc), []))); | |
} | |
})(i)); | |
a.push([]); | |
} | |
} | |
abort():void { | |
this._list.forEach(s => s.abort()); | |
super.abort(); | |
} | |
close():void { | |
this._list.forEach(s => s.close()); | |
super.close(); | |
} | |
} | |
class Buffer<T> extends Source<T> { | |
private _tmp:Packet<T>[] = []; | |
constructor(private _base:Stream<T>, private _limit:number) { | |
super(); | |
_base.open((o:Packet<T>):void => { | |
if (this._tmp !== null) { | |
if (this._tmp.push(o) > this._limit) this._tmp.pop(); | |
} | |
else this.yield(o); | |
if (o.done) { | |
this._base = null; | |
} | |
}); | |
} | |
open(func:(o:Packet<T>) => void):void { | |
super.open(func); | |
this._tmp.forEach(o => this.yield(o)); | |
this._tmp = null; | |
} | |
abort():void { | |
if (this._base) this. | |
_base.abort(); | |
super.abort(); | |
} | |
close():void { | |
if (this._base) this. | |
_base.close(); | |
super.close(); | |
} | |
} | |
class NoHup<T> extends Wrapper<T> { | |
constructor(base:Stream<T>) { super(base) } | |
close():void { /* no-op */ } | |
abort():void { /* no-op */ } | |
} | |
export var Never:Stream<any> = new Source<any>(); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<html> | |
<head> | |
<title>stream.ts</title> | |
<script src="stream.js"></script> | |
<script> | |
var S = sf.Stream; | |
function start() { | |
var cv = document.getElementById("cv"); | |
var cx = cv.getContext("2d"); | |
testWithCanvas1(cv, cx); | |
// testWithCanvas2(cv, cx); | |
// testWithCanvas3(cv, cx); | |
//testMix(); | |
testStep(); | |
//testDisj(); | |
//testPlus(); | |
//testGroupBy(); | |
testDistinct(); | |
} | |
function testWithCanvas1(cv, cx) { | |
S.on(cv, "mousedown").inner1st | |
.flatMap(function (e) { | |
return S.pure(e).plus(S.on(cv, "mousemove") | |
.mix(S.failOn(cv, "mouseout")) | |
.mix(S.failOn(cv, "mouseup" ))) | |
.recover(function (e) { return S.pure(null) }); | |
}) | |
.fold(function (a, b) { | |
if (b === null) return null; | |
if (a !== null) { | |
cx.beginPath(); | |
cx.moveTo(a.offsetX, a.offsetY); | |
cx.lineTo(b.offsetX, b.offsetY); | |
cx.stroke(); | |
} | |
return b; | |
}, null).next(); | |
} | |
function testWithCanvas2(cv, cx) { | |
S.on(cv, "mousemove") | |
.disj(S.on(cv, "mousedown").fill(true) | |
.mix( | |
S.on(cv, "mouseout").mix( | |
S.on(cv, "mouseup" )).fill(false))) | |
.fold(function (a, b) { | |
var z = b[0]; | |
if (b[1] && a !== null) { | |
cx.beginPath(); | |
cx.moveTo(a.offsetX, a.offsetY); | |
cx.lineTo(z.offsetX, z.offsetY); | |
cx.stroke(); | |
} | |
return z; | |
}, null).next(); | |
} | |
function testWithCanvas3(cv, cx) { | |
var drawn = false; | |
var coord = null; | |
$(cv).mousedown(function (e) { | |
drawn = true; | |
coord = e; | |
}); | |
$(cv).mousemove(function (b) { | |
if (!drawn) return; | |
var a = coord; | |
cx.beginPath(); | |
cx.moveTo(a.offsetX, a.offsetY); | |
cx.lineTo(b.offsetX, b.offsetY); | |
cx.stroke(); | |
coord = b; | |
}); | |
$(cv).mouseout(function () { drawn = false }); | |
$(cv).mouseup (function () { drawn = false }); | |
} | |
function testMix() { | |
var chan1 = sf.Stream.channel(); | |
var chan2 = sf.Stream.channel(); | |
chan1 | |
.filter(function (e) | |
{ | |
return e.charAt(6) !== "B"; | |
}) | |
.map(function (e) { return e + "+" }) | |
.mix(chan2) | |
.delay(1000) | |
.next(function (e) | |
{ | |
console.log(e); | |
}); | |
chan1.push("chan1-A"); | |
chan2.push("chan2-A"); | |
chan1.push("chan1-B"); | |
chan2.push("chan2-B"); | |
chan1.push("chan1-C"); | |
setTimeout(function () | |
{ | |
chan2.push("chan2-C"); | |
chan2.done(); | |
chan1.done(); | |
}, 100); | |
} | |
function testStep() { | |
S.step(1, 10, 1) | |
.map(function (e) { return "step:" + e }) | |
.next(function (e) { | |
console.log(e); | |
}); | |
} | |
function testDisj() { | |
var counter1 = 1; | |
var counter2 = 2; | |
var s1 = S.tick(800, function () { return "A :: "+ (counter1 +=2) }); | |
var s2 = S.tick(800, function () { return "B :: "+ (counter2 +=2) }); | |
s1.disj(s2).next(function (e) { console.log(e); }); | |
} | |
function testPlus() { | |
var counter1 = 1; | |
var counter2 = 2; | |
var s1 = S.tick(800, function () { return "A :: "+ (counter1 +=2) }).take(3); | |
var s2 = S.tick(800, function () { return "B :: "+ (counter2 +=2) }).take(3); | |
s1.plus(s2).next(function (e) { console.log(e); }); | |
} | |
function testGroupBy() { | |
var s = S.forEach( | |
"a:foo", | |
"b:bar", | |
"b:baz", | |
"a:(^_^;)", | |
"c:(-_-;)").delay(400); | |
s.groupBy(function (e) { | |
return e.charAt(0); | |
}) | |
.fold(function (n, e) { | |
console.log("group: "+ e._1 + " as #" + n); | |
e._2.next(function (e) { | |
console.log("#"+ n + " --> " + e); | |
}); | |
return n + 1; | |
}, 1) | |
.next(); | |
} | |
function testDistinct() { | |
var s = sf.Stream.forEach( | |
"a:foo", | |
"b:bar", | |
"b:baz", | |
"a:(^_^;)", | |
"c:(-_-;)").delay(300); | |
s.distinct(function (e) { return e.charAt(0) }) | |
.next(function (e) | |
{ | |
console.log(e); | |
}); | |
} | |
</script> | |
</head> | |
<body onload="start()"> | |
<h1>stream.ts</h1> | |
<div> | |
<a href="https://gist.github.com/hisui/6261547">gist</a> | |
<div style="user-select:none;"> | |
<canvas id="cv" width="300" height="300" style="background:#eeeeee;"></canvas> | |
</div> | |
</div> | |
</body> | |
</html> | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment