Skip to content

Instantly share code, notes, and snippets.

@t0yv0
Created December 15, 2011 21:17
Show Gist options
  • Save t0yv0/1482929 to your computer and use it in GitHub Desktop.
Save t0yv0/1482929 to your computer and use it in GitHub Desktop.
type Async<'T> = ('T -> unit) -> unit
[<Sealed>]
type Async() =
static let self = Async()
member inline this.Return(x: 'T) : Async<'T> =
fun f -> f x
member inline this.ReturnFrom(x: Async<'T>) = x
member inline this.Bind
(x: Async<'T1>, f: 'T1 -> Async<'T2>) : Async<'T2> =
fun k -> x (fun v -> f v k)
static member inline Start(x: Async<unit>) =
Pooling.Pool.Spawn(fun () -> x ignore)
static member inline RunSynchronously(x: Async<'T>) : 'T =
let res = ref Unchecked.defaultof<_>
let sem = new System.Threading.SemaphoreSlim(0)
Pooling.Pool.Spawn(fun () ->
x (fun v ->
res := v
ignore (sem.Release())))
sem.Wait()
!res
static member inline FromContinuations
(f : ('T -> unit) *
(exn -> unit) *
(System.OperationCanceledException -> unit)
-> unit) : Async<'T> =
fun k -> f (k, ignore, ignore)
let async = Async()
[<Sealed>]
type Channel<'T>() =
let readers = Queue()
let writers = Queue()
member this.Read ok =
let task =
lock readers <| fun () ->
if writers.Count = 0 then
readers.Enqueue ok
None
else
Some (writers.Dequeue())
match task with
| None -> ()
| Some (value, cont) ->
spawn cont
ok value
member this.Write(x: 'T, ok) =
let task =
lock readers <| fun () ->
if readers.Count = 0 then
writers.Enqueue(x, ok)
None
else
Some (readers.Dequeue())
match task with
| None -> ()
| Some cont ->
spawn ok
cont x
member inline this.Read() =
Async.FromContinuations(fun (ok, _, _) ->
this.Read ok)
member inline this.Write x =
Async.FromContinuations(fun (ok, _, _) ->
this.Write(x, ok))
let test (n: int) =
let chan = Channel()
let rec writer (i: int) =
async {
if i = 0 then
return! chan.Write 0
else
do! chan.Write i
return! writer (i - 1)
}
let rec reader sum =
async {
let! x = chan.Read()
if x = 0
then return sum
else return! reader (sum + x)
}
Async.Start(writer n)
let clock = System.Diagnostics.Stopwatch()
clock.Start()
let r = Async.RunSynchronously(reader 0)
stdout.WriteLine("Hops per second: {0}",
float n / clock.Elapsed.TotalSeconds)
r
open System.Collections.Concurrent
open System.Threading.Tasks
type Pool private () =
let queue = new BlockingCollection<_>(ConcurrentBag())
let work () =
while true do
queue.Take()()
let long = TaskCreationOptions.LongRunning
let task = Task.Factory.StartNew(work, long)
static let self = Pool ()
member private this.Add f = queue.Add f
static member Spawn(f: unit -> unit) = self.Add f
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment