Created
December 14, 2017 11:02
-
-
Save dustinlacewell-wk/59d7e0820874e560191f25930d7228fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env fsharpi | |
#I "../../.nuget/packages/hopac/0.3.21/lib/net45/" | |
#r "Hopac.Platform" | |
#r "Hopac.Core" | |
#r "Hopac" | |
open System.Collections.Generic | |
open Hopac | |
open Hopac.Infixes | |
open Hopac.Stream | |
let inline (^) x = x | |
let log s = printfn "%s" s | |
module Server = | |
type State = | |
| Waiting | |
| Available of int | |
| Finished of int | |
type Request = | |
| Acquire of IVar<State> | |
| Validate of int * IVar<bool> | |
type Server = { | |
reqCh: Ch<Request> | |
feed: seq<State> | |
mutable index: int | |
} | |
let acquire s = | |
log "calling acquire" | |
s.reqCh *<-=>- fun repCh -> Acquire repCh | |
let validate s v = | |
s.reqCh *<-=>- fun repCh -> Validate(v, repCh) | |
let create () = | |
// create a few Waitings at the start of the list | |
let waiting = Seq.map (fun x-> Waiting) [1 .. 5] | |
// create all the data points | |
let values = [1;2;3;4;5;6;7;8;9;] | |
// a sequence of Available(int) for each data point except the last | |
let items = Seq.map Available (List.rev values |> List.tail |> List.rev) | |
// an infinite sequence of Finished(int) containing the last data point | |
let finished = Seq.initInfinite (fun x-> State.Finished (List.last values)) | |
// a sequence of: waiting @ items @ finished | |
let feed = Seq.append waiting <| Seq.append items finished | |
let s = {feed = feed; reqCh = Ch (); index = 0} | |
s.reqCh >>= function | |
| Validate (v, repCh) -> | |
repCh *<= (v = List.sum values) | |
| Acquire repCh -> | |
log "Server is sending value" | |
s.index <- s.index + 1 | |
repCh *<= Seq.item (s.index - 1) feed | |
|> Job.foreverServer | |
>>-. s | |
let s = Server.create () |> run | |
module Client = | |
type Client = { | |
server: Server.Server | |
mutable total: int | |
} | |
let fetch () = indefinitely <| job { return! Server.acquire s } | |
let finished = function | |
| Server.State.Finished x -> | |
printfn "saw finished %i" x | |
true | |
| _ -> false | |
let available = function | |
| Server.State.Available x -> | |
printfn "saw available %i" x | |
true | |
| _ -> false | |
let client () = | |
let c = { server = s; total = 0 } | |
let fetchStream = fetch() | |
let finishedStream = filterFun finished fetchStream |> take (int64 1) | |
let availableStream = filterFun available fetchStream | |
availableStream |> takeUntil finishedStream | |
|> iterJob ^ fun x -> | |
printfn "State: %A" x | |
x |> function | |
| Server.State.Waiting -> job { do printfn "Waiting %s" "" } | |
| Server.State.Available x -> job { do printfn "Available %A" x } | |
| Server.State.Finished x -> job { do printfn "Finished %A" x } | |
printfn "Starting client! %s" "" | |
Client.client () |> run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment