Created
May 31, 2018 19:37
-
-
Save eulerfx/73aa3c2aa39cc2324e82a0539a84debc to your computer and use it in GitHub Desktop.
Sample of polling reader of EventStore.ClientAPI in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/fsprojects/FSharp.Control.AsyncSeq | |
// RetryPolicy from https://github.com/jet/kafunk/blob/master/src/kafunk/Utility/Faults.fs#L89 | |
/// Returns an Async computation which evaluates the input computation until the specified condition is met | |
/// with delays between attempts dictated by the specified retry policy. | |
let pollUntil (rp:RetryPolicy) (condition:'a -> bool) (a:Async<'a>) : Async<'a option> = | |
(AsyncSeq.replicateInfiniteAsync a, RetryPolicy.delayStream rp) | |
||> AsyncSeq.interleaveChoice | |
|> AsyncSeq.tryPick (function Choice1Of2 a when condition a -> Some a | _ -> None) | |
// EventStore client | |
type Conn = string | |
type Position = int64 | |
type Event = string | |
let readBatch (c:Conn) (p:Position) : Async<Event[] * Position> = failwith "TODO" | |
let stream (c:Conn) (p:Position) (rp:RetryPolicy) : AsyncSeq<Event[]> = | |
p |> AsyncSeq.unfoldAsync (readBatch c >> pollUntil rp (fun (es,_) -> es.Length > 0)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment