Skip to content

Instantly share code, notes, and snippets.

@palladin
Created May 7, 2013 19:01
Show Gist options
  • Save palladin/5535178 to your computer and use it in GitHub Desktop.
Save palladin/5535178 to your computer and use it in GitHub Desktop.
Reducers
// http://clojure.com/blog/2012/05/08/reducers-a-library-and-model-for-collection-processing.html
module Reducer =
open System
open System.Text
open System.Collections.Generic
type ReduceFunc<'T,'R> =
abstract Invoke : 'T * 'R -> 'R
type CombineFunc<'R> =
abstract Invoke : 'R * 'R -> 'R
type Reducer<'T> =
abstract Apply<'R> : ReduceFunc<'T, 'R> * CombineFunc<'R> * (unit -> 'R) -> 'R
// helper functions
let inline toReduceFunc (f : 'T -> 'R -> 'R) =
{
new ReduceFunc<'T, 'R> with
member self.Invoke(value : 'T, acc : 'R) = f value acc
}
let inline toCombineFunc (f : 'R -> 'R -> 'R) =
{
new CombineFunc<'R> with
member self.Invoke(left : 'R, right : 'R) = f left right
}
// executor functions
let inline toSeqReducer (values : seq<'T>) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, _, init : unit -> 'R) : 'R =
let mutable r = init ()
for value in values do
r <- rf.Invoke(value, r)
r
}
let inline toParallelReducer (seqReduceCount : int) (values : 'T []) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, cf : CombineFunc<'R>, init : unit -> 'R) : 'R =
let rec reduceCombine s e =
async {
if e - s <= seqReduceCount then
let s' = if s > 0 then s + 1 else s
let result = { s' .. e } |> Seq.fold (fun r i -> rf.Invoke(values.[i], r)) (init ())
return result
else
let m = (s + e) / 2
let! result = Async.Parallel [| reduceCombine s m; reduceCombine m e |]
return cf.Invoke(result.[0], result.[1])
}
reduceCombine 0 (values.Length - 1) |> Async.RunSynchronously
}
// transform functions
let inline collect (f : 'A -> Reducer<'B>) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>(toReduceFunc (fun a r -> (f a).Apply(rf, cf, fun () -> r)), cf, init)
}
let inline map (f : 'A -> 'B) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>(toReduceFunc (fun a r -> rf.Invoke(f a, r)), cf, init)
}
let inline filter (p : 'A -> bool) (input : Reducer<'A>) : Reducer<'A> =
{
new Reducer<'A> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>(toReduceFunc (fun a r -> if p a then rf.Invoke(a, r) else r), cf, init)
}
// reduce functions
let inline reduce (reducef : 'T -> 'R -> 'R) (combineF : 'R -> 'R -> 'R) (init : (unit -> 'R)) (reducer : Reducer<'T>) : 'R =
reducer.Apply(toReduceFunc reducef, toCombineFunc combineF, init)
let inline sum (reducer : Reducer<int>) : int =
reduce (+) (+) (fun () -> 0) reducer
let inline length (reducer : Reducer<'T>) : int =
reduce (fun _ r -> r + 1) (+) (fun () -> 0) reducer
let inline concat (reducer : Reducer<string>) : string =
let result =
reduce (fun (v : string) (builder : StringBuilder) -> builder.Append(v))
(fun (left : StringBuilder) (right : StringBuilder) -> left.Append(right))
(fun () -> new StringBuilder())
reducer
result.ToString()
let inline toArray (reducer : Reducer<'T>) : 'T [] =
let result =
reduce (fun v (list : List<'T>) -> list.Add(v); list)
(fun (left : List<'T>) (right : List<'T>) -> left.AddRange(right); left)
(fun () -> new List<'T>())
reducer
result.ToArray()
let inline groupBy (selectorF : 'T -> 'Key)
(transformF : 'T -> 'Elem)
(aggregateF : 'Key * seq<'Elem> -> 'Elem)
(reducer : Reducer<'T>) : seq<'Key * 'Elem> =
let inline reduceF (v : 'T) (r : Dictionary<'Key, List<'Elem>>) =
let key = selectorF v
let elem = transformF v
if r.ContainsKey(key) then
r.[key].Add(elem)
else
r.Add(key, new List<_>([| elem |]))
r
let inline combineF (left : Dictionary<'Key, List<'Elem>>) (right : Dictionary<'Key, List<'Elem>>) =
for keyValue in left |> Seq.toArray do
if right.ContainsKey(keyValue.Key) then
left.[keyValue.Key].AddRange(right.[keyValue.Key])
left.[keyValue.Key] <- new List<_>([| (keyValue.Key, keyValue.Value) |> aggregateF |])
left
let result =
reduce reduceF combineF
(fun () -> new Dictionary<'Key, List<'Elem>>())
reducer
result |> Seq.map (fun keyValue -> (keyValue.Key, (keyValue.Key, keyValue.Value) |> aggregateF ))
// Example - wordcount
let lines = System.IO.File.ReadAllLines("largefile.txt")
lines
|> Reducer.toParallelReducer 10
|> Reducer.collect (fun line -> Reducer.toSeqReducer <| line.Split(' '))
|> Reducer.groupBy id (fun _ -> 1) (fun (_, items) -> Seq.sum items)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment