Created
May 7, 2013 19:01
-
-
Save palladin/5535178 to your computer and use it in GitHub Desktop.
Reducers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// http://clojure.com/blog/2012/05/08/reducers-a-library-and-model-for-collection-processing.html | |
module Reducer = | |
open System | |
open System.Text | |
open System.Collections.Generic | |
type ReduceFunc<'T,'R> = | |
abstract Invoke : 'T * 'R -> 'R | |
type CombineFunc<'R> = | |
abstract Invoke : 'R * 'R -> 'R | |
type Reducer<'T> = | |
abstract Apply<'R> : ReduceFunc<'T, 'R> * CombineFunc<'R> * (unit -> 'R) -> 'R | |
// helper functions | |
let inline toReduceFunc (f : 'T -> 'R -> 'R) = | |
{ | |
new ReduceFunc<'T, 'R> with | |
member self.Invoke(value : 'T, acc : 'R) = f value acc | |
} | |
let inline toCombineFunc (f : 'R -> 'R -> 'R) = | |
{ | |
new CombineFunc<'R> with | |
member self.Invoke(left : 'R, right : 'R) = f left right | |
} | |
// executor functions | |
let inline toSeqReducer (values : seq<'T>) : Reducer<'T> = | |
{ | |
new Reducer<'T> with | |
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, _, init : unit -> 'R) : 'R = | |
let mutable r = init () | |
for value in values do | |
r <- rf.Invoke(value, r) | |
r | |
} | |
let inline toParallelReducer (seqReduceCount : int) (values : 'T []) : Reducer<'T> = | |
{ | |
new Reducer<'T> with | |
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, cf : CombineFunc<'R>, init : unit -> 'R) : 'R = | |
let rec reduceCombine s e = | |
async { | |
if e - s <= seqReduceCount then | |
let s' = if s > 0 then s + 1 else s | |
let result = { s' .. e } |> Seq.fold (fun r i -> rf.Invoke(values.[i], r)) (init ()) | |
return result | |
else | |
let m = (s + e) / 2 | |
let! result = Async.Parallel [| reduceCombine s m; reduceCombine m e |] | |
return cf.Invoke(result.[0], result.[1]) | |
} | |
reduceCombine 0 (values.Length - 1) |> Async.RunSynchronously | |
} | |
// transform functions | |
let inline collect (f : 'A -> Reducer<'B>) (input : Reducer<'A>) : Reducer<'B> = | |
{ | |
new Reducer<'B> with | |
member self.Apply<'R> (rf, cf, init) = | |
input.Apply<'R>(toReduceFunc (fun a r -> (f a).Apply(rf, cf, fun () -> r)), cf, init) | |
} | |
let inline map (f : 'A -> 'B) (input : Reducer<'A>) : Reducer<'B> = | |
{ | |
new Reducer<'B> with | |
member self.Apply<'R> (rf, cf, init) = | |
input.Apply<'R>(toReduceFunc (fun a r -> rf.Invoke(f a, r)), cf, init) | |
} | |
let inline filter (p : 'A -> bool) (input : Reducer<'A>) : Reducer<'A> = | |
{ | |
new Reducer<'A> with | |
member self.Apply<'R> (rf, cf, init) = | |
input.Apply<'R>(toReduceFunc (fun a r -> if p a then rf.Invoke(a, r) else r), cf, init) | |
} | |
// reduce functions | |
let inline reduce (reducef : 'T -> 'R -> 'R) (combineF : 'R -> 'R -> 'R) (init : (unit -> 'R)) (reducer : Reducer<'T>) : 'R = | |
reducer.Apply(toReduceFunc reducef, toCombineFunc combineF, init) | |
let inline sum (reducer : Reducer<int>) : int = | |
reduce (+) (+) (fun () -> 0) reducer | |
let inline length (reducer : Reducer<'T>) : int = | |
reduce (fun _ r -> r + 1) (+) (fun () -> 0) reducer | |
let inline concat (reducer : Reducer<string>) : string = | |
let result = | |
reduce (fun (v : string) (builder : StringBuilder) -> builder.Append(v)) | |
(fun (left : StringBuilder) (right : StringBuilder) -> left.Append(right)) | |
(fun () -> new StringBuilder()) | |
reducer | |
result.ToString() | |
let inline toArray (reducer : Reducer<'T>) : 'T [] = | |
let result = | |
reduce (fun v (list : List<'T>) -> list.Add(v); list) | |
(fun (left : List<'T>) (right : List<'T>) -> left.AddRange(right); left) | |
(fun () -> new List<'T>()) | |
reducer | |
result.ToArray() | |
let inline groupBy (selectorF : 'T -> 'Key) | |
(transformF : 'T -> 'Elem) | |
(aggregateF : 'Key * seq<'Elem> -> 'Elem) | |
(reducer : Reducer<'T>) : seq<'Key * 'Elem> = | |
let inline reduceF (v : 'T) (r : Dictionary<'Key, List<'Elem>>) = | |
let key = selectorF v | |
let elem = transformF v | |
if r.ContainsKey(key) then | |
r.[key].Add(elem) | |
else | |
r.Add(key, new List<_>([| elem |])) | |
r | |
let inline combineF (left : Dictionary<'Key, List<'Elem>>) (right : Dictionary<'Key, List<'Elem>>) = | |
for keyValue in left |> Seq.toArray do | |
if right.ContainsKey(keyValue.Key) then | |
left.[keyValue.Key].AddRange(right.[keyValue.Key]) | |
left.[keyValue.Key] <- new List<_>([| (keyValue.Key, keyValue.Value) |> aggregateF |]) | |
left | |
let result = | |
reduce reduceF combineF | |
(fun () -> new Dictionary<'Key, List<'Elem>>()) | |
reducer | |
result |> Seq.map (fun keyValue -> (keyValue.Key, (keyValue.Key, keyValue.Value) |> aggregateF )) | |
// Example - wordcount | |
let lines = System.IO.File.ReadAllLines("largefile.txt") | |
lines | |
|> Reducer.toParallelReducer 10 | |
|> Reducer.collect (fun line -> Reducer.toSeqReducer <| line.Split(' ')) | |
|> Reducer.groupBy id (fun _ -> 1) (fun (_, items) -> Seq.sum items) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment