This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let race (a:Async<'a>) (b:Async<'a>) : Async<'a * Async<'a>> = async { | |
return! | |
Async.FromContinuations <| fun (ok,err,cnc) -> | |
let state = ref 0 | |
let iv = new TaskCompletionSource<_>() | |
let ok a = | |
if (Interlocked.CompareExchange(state, 1, 0) = 0) then | |
ok (a, iv.Task |> Async.AwaitTask) | |
else | |
iv.SetResult a |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let cache (a:Async<'a>) : Async<'a> = | |
let tcs = TaskCompletionSource<'a>() | |
let state = ref 0 | |
async { | |
if (Interlocked.CompareExchange(state, 1, 0) = 0) then | |
Async.StartWithContinuations( | |
a, | |
tcs.SetResult, | |
tcs.SetException, | |
(fun _ -> tcs.SetCanceled())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let timeoutNone (timeoutMs:int) (a:Async<'a>) : Async<'a option> = async { | |
let! ct = Async.CancellationToken | |
let res = TaskCompletionSource<_>() | |
use cts = CancellationTokenSource.CreateLinkedTokenSource ct | |
res.Task.ContinueWith (fun _ -> cts.Cancel ()) |> ignore | |
use timer = new Timer((fun _ -> res.TrySetResult None |> ignore), null, timeoutMs, Timeout.Infinite) | |
Async.StartThreadPoolWithContinuations ( | |
a, | |
(fun a -> res.TrySetResult (Some a) |> ignore), | |
(fun e -> res.TrySetException e |> ignore), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let withCancellation (ct:CancellationToken) (a:Async<'a>) : Async<'a> = async { | |
let! ct2 = Async.CancellationToken | |
use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2) | |
let tcs = new TaskCompletionSource<'a>() | |
use _reg = cts.Token.Register (fun () -> tcs.TrySetCanceled() |> ignore) | |
let a = async { | |
try | |
let! a = a | |
tcs.TrySetResult a |> ignore | |
with ex -> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/fsprojects/FSharp.Control.AsyncSeq | |
// RetryPolicy from https://github.com/jet/kafunk/blob/master/src/kafunk/Utility/Faults.fs#L89 | |
/// Returns an Async computation which evaluates the input computation until the specified condition is met | |
/// with delays between attempts dictated by the specified retry policy. | |
let pollUntil (rp:RetryPolicy) (condition:'a -> bool) (a:Async<'a>) : Async<'a option> = | |
(AsyncSeq.replicateInfiniteAsync a, RetryPolicy.delayStream rp) | |
||> AsyncSeq.interleaveChoice | |
|> AsyncSeq.tryPick (function Choice1Of2 a when condition a -> Some a | _ -> None) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Threading | |
let private disposable (dispose:unit -> unit) = | |
{ new IDisposable with member __.Dispose () = dispose () } | |
/// Creates an observable which triggers based on intervals specified by the input sequence. | |
let ofDelays (delays:TimeSpan seq) : IObservable<DateTimeOffset> = | |
{ new IObservable<_> with | |
member __.Subscribe obs = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let ParallelThrottledIgnore (startOnCallingThread:bool) (parallelism:int) (xs:seq<Async<_>>) = async { | |
let! ct = Async.CancellationToken | |
let sm = new SemaphoreSlim(parallelism) | |
let count = ref 1 | |
let res = TaskCompletionSource<_>() | |
let tryWait () = | |
try sm.Wait () ; true | |
with _ -> false | |
let tryComplete () = | |
if Interlocked.Decrement count = 0 then |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// A learner for function type 'a -> 'b. | |
type Learner<'p, 'a, 'b> = { | |
/// An function parameterized by 'p implementing 'a -> 'b. | |
i : 'p * 'a -> 'b | |
/// Updates the parameter 'p based on training pair ('a,'b). | |
u : 'p * 'a * 'b -> 'p | |
/// Requests an input 'a based on parameter 'p and training pair ('a,'b). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type ExecContext () = | |
class | |
// TODO: scheduler, storage | |
end | |
[<AbstractClass>] | |
type Cont<'a> () = | |
abstract RunCont : ExecContext * 'a -> unit | |
[<AbstractClass>] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Marvel | |
open System.Collections.Concurrent | |
open System.Collections.Generic | |
type Pid = int | |
type Op = { | |
op : string | |
sn : SN | |
} |
NewerOlder