Last active
February 18, 2018 14:55
-
-
Save ncthbrt/73a424291e3f4ee9d856d13d2ed07835 to your computer and use it in GitHub Desktop.
Playing around with stream ideas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type stream('a, 'e, 'sourceType); | |
/* | |
* A cold stream is one which flow control is handled by the consumer. | |
* The consumer invokes a function to get the next value. | |
*/ | |
module Cold: { | |
type t; | |
type result('a, 'e) = | |
| Ok('a) | |
| Error('e) | |
| Complete; | |
let make: (unit => result('a, 'e)) => stream('a, 'e, t); | |
}; | |
/* | |
* A hot stream is one in which flow control is handled by the producer. | |
* The producer produces events on its own volition and it is up to the consumer to | |
* consume them expediently | |
*/ | |
module Hot: { | |
type t; | |
type callback('t) = ('t => unit); | |
let make: (~next: callback('a) => unit, ~error:callback('e) => unit, ~complete:callback(unit) => unit) => stream('a, 'e, t); | |
}; | |
let fromPromise: Repromise.t('a, 'e) => stream('a, 'e, Hot.t); | |
let single: 'a => stream('a, 'e, Cold.t); | |
let flatMap: ('a => stream('b, 'e, 'sourceType), stream('a, 'e, 'sourceType)) => stream('b, 'e, 'sourceType); | |
let map: ('a => 'b, stream('a,'e, 'sourceType)) => stream('b, 'e, 'sourceType); | |
let filter: ('a => bool, stream('a, 'e, 'sourceType)) => stream('a, 'e, 'sourceType); | |
let aggregate: (('acc,'a) => 'acc, 'acc, stream('a, 'e, 'sourceType)) => stream('acc, 'e, 'sourceType); | |
let reduce: (('acc,'a) => 'acc, 'acc, stream('a, 'e, 'sourceType)) => Repromise.t('acc, 'e); | |
let merge: (stream('a, 'e, 'sourceType), stream('a, 'e, 'sourceType)) => stream('a, 'e, 'sourceType); | |
let zip: (stream('a, 'e, 'sourceType), stream('b, 'e, 'sourceType)) => stream(('a, 'b), 'e, 'sourceType); | |
let buffer: (~size: int=?, stream('a, 'e, Hot.t)) => stream('a, 'e, Cold.t); | |
let unbuffer: (stream('a, 'e, Cold.t)) => stream('a, 'e, Hot.t); | |
let observe: ('a => 'ignored, stream('a, 'e, 'sourceType)) => Repromise.t(unit, 'e); | |
let drain: (stream('a, 'e, 'sourceType)) => Repromise.t(unit, 'e); | |
let catch: ('e => 'a, stream('a, 'e, 'sourceType)) => stream('a, 'e2, 'sourceType); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment