Skip to content

Instantly share code, notes, and snippets.

@t0yv0
Created August 7, 2011 03:40
Show Gist options
  • Save t0yv0/1130033 to your computer and use it in GitHub Desktop.
Save t0yv0/1130033 to your computer and use it in GitHub Desktop.
F# map-reduce with fully asynchronous arbitrary-order mappers and reducers.
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let run (a: Async<'T>) (k: 'T -> unit) =
Async.StartWithContinuations(a, k, ignore, ignore)
Async.FromContinuations <| fun (ok, _, _) ->
let k = ref 0
let agent =
new MailboxProcessor<_>(fun chan ->
async {
for i in 2 .. k.Value do
let! x = chan.Receive()
let! y = chan.Receive()
return run (reduce x y) chan.Post
let! r = chan.Receive()
return ok r
})
k :=
(0, input)
||> Seq.fold (fun count x ->
run (map x) agent.Post
count + 1)
agent.Start()
@t0yv0
Copy link
Author

t0yv0 commented Aug 8, 2011

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment