Last active
May 17, 2024 15:09
-
-
Save Horusiath/9c790691130150b524aaa9ab426ed982 to your computer and use it in GitHub Desktop.
Custom fibers implementation in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// MIT License | |
/// | |
/// Copyright (c) 2024 Bartosz Sypytkowski | |
/// | |
/// Permission is hereby granted, free of charge, to any person obtaining a copy | |
/// of this software and associated documentation files (the "Software"), to deal | |
/// in the Software without restriction, including without limitation the rights | |
/// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
/// copies of the Software, and to permit persons to whom the Software is | |
/// furnished to do so, subject to the following conditions: | |
/// | |
/// The above copyright notice and this permission notice shall be included in all | |
/// copies or substantial portions of the Software. | |
/// | |
/// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
/// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
/// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
/// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
/// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
/// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
/// SOFTWARE. | |
open System | |
open System.Threading | |
type FiberResult<'a> = Result<'a, exn> option | |
[<Sealed;AllowNullLiteral>] | |
type Cancel(parent: Cancel) = | |
let mutable flag: int = 0 | |
let mutable children: Cancel list = [] | |
new() = Cancel(null) | |
/// Check if token was cancelled | |
member __.Cancelled = flag = 1 | |
/// Remove child token | |
member private __.RemoveChild(child) = | |
let rec loop child = | |
let children' = children | |
let nval = children' |> List.filter ((<>) child) | |
if not (obj.ReferenceEquals(children', Interlocked.CompareExchange(&children, nval, children'))) | |
then loop child | |
if not (List.isEmpty children) then loop child | |
/// Create a new child token and return it. | |
member this.AddChild () = | |
let rec loop child = | |
let children' = children | |
if (obj.ReferenceEquals(children', Interlocked.CompareExchange(&children, child::children', children'))) | |
then child | |
else loop child | |
loop (Cancel this) | |
/// Cancel a token | |
member this.Cancel() = | |
if Interlocked.Exchange(&flag, 1) = 0 then | |
for child in Interlocked.Exchange(&children, []) do child.Cancel() | |
if not (isNull parent) then parent.RemoveChild(this) | |
[<Interface>] | |
type IScheduler = | |
abstract Schedule: (unit -> unit) -> unit | |
abstract Delay: TimeSpan * (unit -> unit) -> unit | |
type Fiber<'a> = Fiber of (IScheduler * Cancel -> (FiberResult<'a> -> unit) -> unit) | |
[<RequireQualifiedAccess>] | |
module Fiber = | |
/// Wraps value into fiber. | |
let success r = Fiber <| fun (_, c) next -> if c.Cancelled then next None else next (Some (Ok r)) | |
/// Wraps exception into fiber. | |
let fail e = Fiber <| fun (_, c) next -> if c.Cancelled then next None else next (Some (Error e)) | |
/// Returns a cancelled fiber. | |
let cancelled<'a> = Fiber <| fun _ next -> next None | |
/// Returns a fiber, which will delay continuation execution after a given timeout. | |
let delay timeout = | |
Fiber <| fun (s, c) next -> | |
if c.Cancelled then next None | |
else s.Delay(timeout, fun () -> | |
if c.Cancelled | |
then next None | |
else next (Some (Ok ()))) | |
/// Maps result of Fiber execution to another value and returns new Fiber with mapped value. | |
let mapResult fn (Fiber call) = Fiber <| fun (s, c) next -> | |
if c.Cancelled then next None | |
else | |
try | |
call (s, c) (fun result -> | |
if c.Cancelled then next None | |
else next (Option.map fn result)) | |
with e -> next (Some (Error e)) | |
/// Maps successful result of Fiber execution to another value and returns new Fiber with mapped value. | |
let map fn fiber = mapResult (Result.map fn) fiber | |
/// Allows to recover from exception (if `fn` returns Ok) or recast it (if `fn` returns Error). | |
let catch fn fiber = mapResult (function Error e -> fn e | other -> other) fiber | |
let bind fn (Fiber call) = Fiber <| fun (s, c) next -> | |
if c.Cancelled then next None | |
else | |
try | |
call (s, c) (fun result -> | |
if c.Cancelled then next None | |
else match result with | |
| Some (Ok r) -> | |
let (Fiber call2) = fn r | |
call2 (s, c) next | |
| None -> next None | |
| Some (Error e) -> next (Some(Error e)) | |
) | |
with e -> next (Some(Error e)) | |
/// Starts both fibers running in parallel, returning the result from the winner | |
/// (the one which completed first) while cancelling the other. | |
let race (Fiber left) (Fiber right): Fiber<Choice<'a, 'b>> = | |
Fiber <| fun (s, c) next -> | |
if c.Cancelled then next None | |
else | |
let mutable flag = 0 | |
let child = c.AddChild() | |
let run fiber choice = | |
s.Schedule (fun () -> | |
fiber (s, child) (fun result -> | |
if Interlocked.Exchange(&flag, 1) = 0 then | |
child.Cancel() | |
if c.Cancelled then next None | |
else match result with | |
| None -> next None | |
| Some(Ok v) -> next (Some(Ok(choice v))) | |
| Some(Error e) -> next (Some(Error e)))) | |
run left Choice1Of2 | |
run right Choice2Of2 | |
let timeout (t: TimeSpan) fiber = | |
Fiber <| fun (s, c) next -> | |
let (Fiber call) = race (delay t) fiber | |
call (s, c) (fun result -> | |
if c.Cancelled then next None | |
else match result with | |
| None -> next None | |
| Some(Ok (Choice1Of2 _)) -> next None // timeout won | |
| Some(Ok (Choice2Of2 v)) -> next (Some(Ok v)) | |
| Some(Error e) -> next (Some(Error e)) | |
) | |
/// Executes a bunch of Fiber operations in parallel, returning an Fiber which may contain | |
/// a gathered set of results or (potential) failures that have happened during the execution. | |
let parallel fibs = | |
Fiber <| fun (s, c) next -> | |
if c.Cancelled then next None | |
else | |
let mutable remaining = Array.length fibs | |
let successes = Array.zeroCreate remaining | |
let childCancel = c.AddChild() | |
fibs |> Array.iteri (fun i (Fiber call) -> | |
s.Schedule (fun () -> | |
call (s, childCancel) (fun result -> | |
match result with | |
| Some (Ok success) -> | |
successes.[i] <- success | |
if c.Cancelled && Interlocked.Exchange(&remaining, -1) > 0 then | |
next None | |
elif Interlocked.Decrement(&remaining) = 0 then | |
if c.Cancelled then next None | |
else next (Some (Ok successes)) | |
| Some (Error fail) -> | |
if Interlocked.Exchange(&remaining, -1) > 0 then | |
childCancel.Cancel() | |
if c.Cancelled then next None | |
else next (Some (Error fail)) | |
| None -> | |
if Interlocked.Exchange(&remaining, -1) > 0 then | |
next None)) | |
) | |
/// Blocks current execution thread, executing given Fiber, and returning result of execution. | |
let blocking (s: IScheduler) (cancel: Cancel) (Fiber fn) = | |
use waiter = new ManualResetEventSlim(false) | |
let mutable res = None | |
s.Schedule(fun () -> fn (s, cancel) (fun result -> | |
if not cancel.Cancelled then | |
Interlocked.Exchange(&res, Some result) |> ignore | |
waiter.Set())) | |
waiter.Wait() | |
res.Value | |
/// Converts given Fiber into F# Async. | |
let toAsync s (Fiber call) = Async.FromContinuations <| fun (onSuccess, onError, onCancel) -> | |
call (s, Cancel()) <| fun result -> | |
match result with | |
| None -> onCancel (OperationCanceledException "") | |
| Some (Ok value) -> onSuccess value | |
| Some (Error e) -> onError e | |
[<RequireQualifiedAccess>] | |
module Scheduler = | |
/// Default environment, which is backed by .NET Thread pool. | |
let shared = | |
{ new IScheduler with | |
member __.Schedule fn = System.Threading.ThreadPool.QueueUserWorkItem(WaitCallback (ignore>>fn)) |> ignore | |
member __.Delay (timeout: TimeSpan, fn) = | |
let mutable t = Unchecked.defaultof<Timer> | |
let callback = fun _ -> | |
t.Dispose() | |
fn() | |
() | |
t <- new Timer(callback, null, int timeout.TotalMilliseconds, Timeout.Infinite) | |
} | |
type TestScheduler(now: DateTime) = | |
let mutable running = false | |
let mutable currentTime = now.Ticks | |
let mutable timeline = Map.empty | |
let schedule delay fn = | |
let at = currentTime + delay | |
timeline <- | |
match Map.tryFind at timeline with | |
| None -> Map.add at [fn] timeline | |
| Some fns -> Map.add at (fn::fns) timeline | |
let rec run () = | |
match Seq.tryHead timeline with | |
| None -> running <- false | |
| Some (KeyValue(time, bucket)) -> | |
timeline <- Map.remove time timeline | |
currentTime <- time | |
for fn in List.rev bucket do | |
fn () | |
run () | |
member __.UtcNow () = DateTime(currentTime) | |
interface IScheduler with | |
member this.Schedule fn = | |
schedule 0L fn | |
if not running then | |
running <- true | |
run () | |
member this.Delay (timeout: TimeSpan, fn) = schedule timeout.Ticks fn | |
let test(cancel, fiber) = | |
let s = TestScheduler(DateTime.UtcNow) | |
Fiber.blocking s cancel fiber | |
[<Struct>] | |
type FiberBuilder = | |
member inline __.Zero = Fiber.success (Unchecked.defaultof<_>) | |
member inline __.ReturnFrom fib = fib | |
member inline __.Return value = Fiber.success value | |
member inline __.Bind(fib, fn) = Fiber.bind fn fib | |
[<AutoOpen>] | |
module FiberBuilder = | |
let fib = FiberBuilder() | |
//--------------------- | |
// run some actual code | |
//--------------------- | |
let inline millis n = TimeSpan.FromMilliseconds (float n) | |
let program = fib { | |
let a = fib { | |
do! Fiber.delay (millis 2000) | |
return 3 | |
} | |
let! b = a |> Fiber.timeout (millis 3000) | |
return b } | |
[<EntryPoint>] | |
let main argv = | |
let cancel = Cancel () | |
let result = Scheduler.test(cancel, program) | |
printfn "Result: %A" result | |
0 // return an integer exit code |
Yes, this is by design, since a
got cancelled by Fiber.timeout
before it could have been executed. For the same reason the output is None
(since cancelled fiber didn't produce any value). Change the Fiber.delay
value in example to be smaller than Fiber.timeout
and you'll see the results.
🤯 Indeed I see now some output!
this code is subject of a license? can I used it anywhere?
It's MIT licensed - I've attached the license header.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey, thank you for answering.
There should be a lot of things I do not understand, yet.
What I'm seeing is that the result of the program if ran on FSI is None also if i put printfn statements within a and program they does not show on the console output.