-
-
Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
namespace Marvel.Hopac | |
open Hopac | |
open Hopac.Extra | |
open Hopac.Job.Infixes | |
open Hopac.Alt.Infixes | |
/// Different representation of EagerSeq<'a> | |
type HopacSeq<'a> = Job<HopacSeqStep<'a>> | |
and HopacSeqStep<'a> = | |
| Emit of 'a * HopacSeq<'a> | |
| Halt | |
module HopacSeq = | |
let empty<'a> : HopacSeq<'a> = | |
Halt |> Job.result | |
let emitOne a : HopacSeq<'a> = | |
Emit (a,empty) |> Job.result | |
let ofJob (j:Job<'a>) : HopacSeq<'a> = | |
j |>> fun a -> Emit (a, empty) | |
let rec map (f:'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> Emit (f a, map f tail) | |
let rec mapJob (f:'a -> Job<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> | |
f a |>> fun b -> Emit (b, mapJob f tail) | |
let mapi (f:int -> 'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
let rec go i a = | |
a |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> Emit (f i a, go (i + 1) tail) | |
go 0 s | |
let indexed (s:HopacSeq<'a>) : HopacSeq<int * 'a> = | |
mapi (fun i a -> i,a) s | |
let rec append (s1:HopacSeq<'a>) (s2:HopacSeq<'a>) : HopacSeq<'a> = | |
s1 >>= function | |
| Halt -> s2 | |
| Emit (a,tail) -> Emit (a, append tail s2) |> Job.result | |
let rec collectJob (f:'a -> HopacSeq<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> append (f a) (collectJob f tail) | |
let rec scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> f b a |>> fun b -> Emit (b, scanJob f b tail) | |
let rec foldJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : Job<'b> = | |
s >>= function | |
| Halt -> b |> Job.result | |
| Emit (a,tail) -> f b a >>= fun b -> foldJob f b tail | |
let rec iterJob (f:'a -> Job<unit>) (s:HopacSeq<'a>) : Job<unit> = | |
s >>= function | |
| Halt -> Job.unit() | |
| Emit (a,tail) -> f a >>. iterJob f tail | |
let rec iter (f:'a -> unit) (s:HopacSeq<'a>) : Job<unit> = | |
s >>= function | |
| Halt -> Job.unit() | |
| Emit (a,tail) -> f a ; iter f tail | |
let first (s:HopacSeq<'a>) : Job<'a option> = | |
s |>> function | |
| Halt -> None | |
| Emit (a,_) -> Some a | |
let last (s:HopacSeq<'a>) : Job<'a option> = | |
let rec loop last s = | |
s >>= function | |
| Halt -> last |> Job.result | |
| Emit (a,tail) -> loop (Some a) tail | |
loop None s | |
let rec filterJob (f:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (a,tail) -> | |
f a >>= fun b -> | |
if b then Emit (a,filterJob f tail) |> Job.result | |
else filterJob f tail | |
let rec chooseJob (f:'a -> Job<'b option>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (a,tail) -> | |
f a >>= function | |
| Some b -> Emit (b, chooseJob f tail) |> Job.result | |
| None -> chooseJob f tail | |
let rec skip (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
if count = 0 then s | |
else | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (_,tail) -> skip (count - 1) tail | |
let rec skipWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) as s -> | |
p a >>= fun b -> | |
if b then skipWhileJob p tail | |
else s |> Job.result | |
let rec take (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
if count = 0 then empty | |
else | |
s |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> | |
Emit (a, take (count - 1) tail) | |
let rec takeWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> | |
p a |>> fun b -> | |
if b then Emit (a, takeWhileJob p tail) | |
else Halt | |
let rec unfoldJob (f:'s -> Job<('a * 's) option>) (s:'s) : HopacSeq<'a> = | |
f s |>> function | |
| Some (a,s) -> Emit (a, unfoldJob f s) | |
| None -> Halt | |
let buffer (size:int) (s:HopacSeq<'a>) : HopacSeq<'a list> = | |
let rec loop (buffer:'a list) (s:HopacSeq<'a>) = | |
if (buffer.Length = size) then Emit (buffer |> List.rev, loop [] s) |> Job.result | |
else | |
s >>= function | |
| Halt -> | |
if buffer.Length > 0 then Emit (buffer |> List.rev, empty) |> Job.result | |
else empty | |
| Emit (a,tail) -> | |
loop (a::buffer) tail | |
loop [] s | |
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> = | |
Job.using (s.GetEnumerator()) <| fun enum -> | |
let rec loop() = | |
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result | |
else empty | |
loop() | |
// what is optimal way to perform non-deterministic merge? | |
let rec ofMVar (mv:MVar<'a option>) : HopacSeq<'a> = | |
MVar.take mv | |
|>> function | |
| Some a -> Emit (a, ofMVar mv) | |
| None -> Halt | |
let toMVar (s:HopacSeq<'a>) : Job<MVar<'a option>> = | |
MVar.create() >>= fun mv -> | |
let rec loop (a:HopacSeq<'a>) = | |
a >>= function | |
| Emit (a,tail) -> MVar.fill mv (Some a) >>. loop tail | |
| Halt -> MVar.fill mv None | |
Job.start (loop s) >>% mv | |
let private mergeMVar (mv1:MVar<'a>) (mv2:MVar<'a>) : Job<MVar<'a>> = | |
MVar.create() >>= fun mvOut -> | |
let go mv = Job.delay <| fun() -> | |
MVar.take mv >>= MVar.fill mvOut | |
Job.foreverServer (go mv1) >>. Job.foreverServer (go mv2) >>% mvOut | |
let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> = | |
toMVar a >>= fun a -> toMVar b >>= mergeMVar a >>= ofMVar | |
My only requirements are that the sequence is lazy and bounded. The use case is reading from a stream database such as Kafka or EventStore. The underlying API allows the reading of a slice of events of a specific size, from a specific offset.
The way I understand it, the sequence isn't memoized is due to Thunk<'x>
and this is what seems to get in the way of using selective communication to do the merge - if one of the inputs is selected, the other one may still produce a value, which should be emitted directly to the output sequence as opposed to being ignored.
If I were to use an IVar
for each "step" of the sequence much like EagerSeq
, would that be better suited to merging using selective communication?
A way to get around the merge issue completely is to use a push-based model:
// observer
type RxObs<'a> = 'a option -> Job<unit>
// observable (subscription function)
type Rx<'a> = RxObs<'a> -> Job<unit>
let merge (rx1:Rx<'a>) (rx2:Rx<'a>) : Rx<'a> =
fun (obs:RxObs<'a>) ->
rx1 obs <*> rx2 obs >>% ()
Full source: https://gist.github.com/eulerfx/2bcfd5e0ef67d4ae330e
Here is one kind of merge
:
let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> =
Promise.startAsAlt a >>= fun aP ->
Promise.startAsAlt b >>= fun bP ->
mergeAB aP bP
and mergeAB aP bP =
(aP >>=? function
| Halt -> upcast bP
| Emit (x, a) -> Job.result (Emit (x, mergeB a bP))) <|>
(bP >>=? function
| Halt -> upcast aP
| Emit (x, b) -> Job.result (Emit (x, mergeA aP b)))
and mergeB a bP =
Promise.startAsAlt a >>= fun aP -> mergeAB aP bP
and mergeA aP b =
Promise.startAsAlt b >>= fun bP -> mergeAB aP bP
The idea here is that a merged seq, once demanded, starts evaluating both of the sequences in parallel as promises. The one that becomes ready first is then produced. In this version, the other promise is still kept and reused in the next merge so that the elements are evaluated only once.
Note that a Hopac job does not necessarily behave non-deterministically. Hopac jobs are run cooperatively and there is a large subset of Hopac that basically behaves deterministically.
Consider changing Promise.startAsAlt
to Promise.queueAsAlt
.
Regarding use of IVars. They would work. IVars can do everything that Promises can and more, but require more intricate programming. I think in this case Promises (I should have named them Futures originally) lead to simpler code—assuming the above merge
is basically what you were looking for?
Ahh ok, I was missing the Promise.startAsAlt
and Promise.queueAsAlt
, thanks.
What I meant by non-deterministically is that its uncertain which of the two input sequences will produce a value first.
Also, just so I understand this properly, in mergeAB
when either aP
or bP
is selected via <|>
, the result of the other is memoized because we're using a Promise
?
Basically yes. I'd say that the other result is just not thrown away. While the merge is being run lazily, each started promise is kept around (not thrown away) until the promise becomes determined.
Thanks! I now have much clearer understanding of the similarities and differences between the Hopac/CML model and F# async.
No problem! Actually, thinking about this, I think I should reintroduce lazy promises, which make it convenient to memoize jobs. You could then easily write a memoize: HopacSeq<'x> -> HopacSeq<'x>
function. Looking at the old code for lazy promises, it seems like I had a performance bug (a totally unnecessary allocation) in the old implementation...
FYI, I reintroduced lazy promises to Hopac. I haven't yet released binaries. I will likely not do that in a couple of days. With lazy promises you could implement memoize
as follows:
let rec memoize (xs: HopacSeq<'x>) =
(xs |>> function
| Halt -> Halt
| Emit (x, xs) -> Emit (x, memoize xs))
|> Promise.Now.delay :> HopacSeq<_>
Also, here is a bit more concise version of merge
:
let rec mergeEither aP bP =
mergeFirst aP bP <|> mergeFirst bP aP
and mergeFirst aP bP =
(aP >>=? function
| Halt -> upcast bP
| Emit (x, a) ->
Job.result (Emit (x, mergeEither bP (Promise.Now.delay a))))
let merge (a: HopacSeq<'a>) (b: HopacSeq<'a>) : HopacSeq<'a> =
mergeEither (Promise.Now.delay a) (Promise.Now.delay b)
Look at the last line of the mergeFirst
function above. This version intentionally swaps the a
and b
streams so that the stream that didn't manage to produce a value is given "priority" (evaluated on the left hand side of <|>
) on the next round.
One more thing. I haven't looked carefully through all of the HopacSeq
combinators, but I happened to notice that the ofSeq
combinator
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> =
Job.using (s.GetEnumerator()) <| fun enum ->
let rec loop() =
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result
else empty
loop()
doesn't quite work correctly. What happens is that the enum
is disposed once the first result is produced.
Actually your ofSeq
work correctly as it goes through the sequence eagerly. Sorry about the false alarm!
Here is a lazy version:
type Disposer<'x when 'x :> IDisposable> =
val Value: 'x
interface IDisposable with
override this.Dispose () =
this.Value.Dispose ()
GC.SuppressFinalize this
override this.Finalize () =
this.Value.Dispose ()
new (value: 'x) = {Value = value}
let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
let enum = new Disposer<_>(s.GetEnumerator ())
let rec loop () = Job.thunk <| fun () ->
if enum.Value.MoveNext () then
Emit (enum.Value.Current, loop ())
else
(enum :> IDisposable).Dispose ()
Halt
loop ()
Heh... In fact, my lazy version has a bug. Here is a correct version:
let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
let enum = new Disposer<_>(s.GetEnumerator ())
let rec loop () = Promise.Now.delayAsJob << Job.thunk <| fun () ->
if enum.Value.MoveNext () then
Emit (enum.Value.Current, loop ())
else
(enum :> IDisposable).Dispose ()
Halt
loop ()
The crucial difference being is that now it is made sure that the enumerator is used sequentially.
At any rate, thanks for asking about these sequences! Inspired to make improvements to Hopac!
I think that PAC sequences are very tricky to get right, because there are so many semantic choices. Could you tell more about the way these sequences should behave? Should they be memoized, ephemeral, lazy, eager, bounded, unbounded, parallel, ...? It may also be possible to want to have different combinators behave differently (e.g. map lazy & merge eager or map eager & merge lazy or ...).
Note that a
Job<'x>
value is much like aThunk<'x> = unit -> 'x
value. So, most of the combinators above basically produce a kind of lazy, ephemeral sequences. That may be exactly what is wanted, but I know I've personally gotten these things wrong many times.