Skip to content

Instantly share code, notes, and snippets.

@dustinlacewell-wk
Created December 14, 2017 11:02
Show Gist options
  • Save dustinlacewell-wk/59d7e0820874e560191f25930d7228fe to your computer and use it in GitHub Desktop.
Save dustinlacewell-wk/59d7e0820874e560191f25930d7228fe to your computer and use it in GitHub Desktop.
#!/usr/bin/env fsharpi
#I "../../.nuget/packages/hopac/0.3.21/lib/net45/"
#r "Hopac.Platform"
#r "Hopac.Core"
#r "Hopac"
open System.Collections.Generic
open Hopac
open Hopac.Infixes
open Hopac.Stream
let inline (^) x = x
let log s = printfn "%s" s
module Server =
type State =
| Waiting
| Available of int
| Finished of int
type Request =
| Acquire of IVar<State>
| Validate of int * IVar<bool>
type Server = {
reqCh: Ch<Request>
feed: seq<State>
mutable index: int
}
let acquire s =
log "calling acquire"
s.reqCh *<-=>- fun repCh -> Acquire repCh
let validate s v =
s.reqCh *<-=>- fun repCh -> Validate(v, repCh)
let create () =
// create a few Waitings at the start of the list
let waiting = Seq.map (fun x-> Waiting) [1 .. 5]
// create all the data points
let values = [1;2;3;4;5;6;7;8;9;]
// a sequence of Available(int) for each data point except the last
let items = Seq.map Available (List.rev values |> List.tail |> List.rev)
// an infinite sequence of Finished(int) containing the last data point
let finished = Seq.initInfinite (fun x-> State.Finished (List.last values))
// a sequence of: waiting @ items @ finished
let feed = Seq.append waiting <| Seq.append items finished
let s = {feed = feed; reqCh = Ch (); index = 0}
s.reqCh >>= function
| Validate (v, repCh) ->
repCh *<= (v = List.sum values)
| Acquire repCh ->
log "Server is sending value"
s.index <- s.index + 1
repCh *<= Seq.item (s.index - 1) feed
|> Job.foreverServer
>>-. s
let s = Server.create () |> run
module Client =
type Client = {
server: Server.Server
mutable total: int
}
let fetch () = indefinitely <| job { return! Server.acquire s }
let finished = function
| Server.State.Finished x ->
printfn "saw finished %i" x
true
| _ -> false
let available = function
| Server.State.Available x ->
printfn "saw available %i" x
true
| _ -> false
let client () =
let c = { server = s; total = 0 }
let fetchStream = fetch()
let finishedStream = filterFun finished fetchStream |> take (int64 1)
let availableStream = filterFun available fetchStream
availableStream |> takeUntil finishedStream
|> iterJob ^ fun x ->
printfn "State: %A" x
x |> function
| Server.State.Waiting -> job { do printfn "Waiting %s" "" }
| Server.State.Available x -> job { do printfn "Available %A" x }
| Server.State.Finished x -> job { do printfn "Finished %A" x }
printfn "Starting client! %s" ""
Client.client () |> run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment