Last active
December 17, 2015 07:09
-
-
Save mausch/5570451 to your computer and use it in GitHub Desktop.
Clojure reducers in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// http://clojure.com/blog/2012/05/08/reducers-a-library-and-model-for-collection-processing.html | |
// original by Nick Palladinos: http://fssnip.net/ip | |
#r "FSharp.PowerPack.Parallel.Seq.dll" | |
module Reducer = | |
open System | |
open System.Text | |
open System.Collections.Generic | |
open System.Linq | |
let inline flip f a b = f b a | |
[<AbstractClass>] | |
type Monoid<'a>() = | |
abstract Zero: unit -> 'a | |
abstract Combine: 'a -> 'a -> 'a | |
member x.Concat l = Seq.fold x.Combine (x.Zero()) l | |
let inline sumMonoid() = | |
{ new Monoid<_>() with | |
member x.Zero() = LanguagePrimitives.GenericZero | |
member x.Combine a b = a + b } | |
// mutates things, not really a monoid | |
let stringBuilderMonoid = | |
{ new Monoid<StringBuilder>() with | |
member x.Zero() = StringBuilder() | |
member x.Combine a b = a.Append b } | |
// mutates things, not really a monoid | |
let inline listMonoid() = | |
{ new Monoid<List<_>>() with | |
member x.Zero() = List() | |
member x.Combine a b = a.AddRange b; a } | |
// mutates things, not really a monoid | |
let inline dictionaryListMonoid aggregateF = | |
{ new Monoid<Dictionary<_,List<_>>>() with | |
member x.Zero() = Dictionary() | |
member x.Combine left right = | |
for keyValue in right do | |
if left.ContainsKey(keyValue.Key) then | |
left.[keyValue.Key].AddRange(right.[keyValue.Key]) | |
let result = aggregateF (keyValue.Key, left.[keyValue.Key]) | |
left.[keyValue.Key].Clear() | |
left.[keyValue.Key].Add(result) | |
else | |
left.[keyValue.Key] <- List<_>([| aggregateF (keyValue.Key, keyValue.Value) |]) | |
left } | |
// mutates things, not really a monoid | |
let inline dictionaryIntMonoid() = | |
{ new Monoid<Dictionary<_,int>>() with | |
member x.Zero() = Dictionary() | |
member x.Combine left right = | |
for keyValue in right do | |
left.[keyValue.Key] <- | |
if left.ContainsKey(keyValue.Key) | |
then left.[keyValue.Key] + right.[keyValue.Key] | |
else keyValue.Value | |
left } | |
type ReduceFunc<'T, 'R> = 'T -> 'R -> 'R | |
type CombineFunc<'R> = 'R -> 'R -> 'R | |
type Reducer<'T> = | |
abstract Apply<'R> : ReduceFunc<'T, 'R> * Monoid<'R> -> 'R | |
// executor functions | |
let inline toSeqReducer (values : seq<'T>) : Reducer<'T> = | |
{ | |
new Reducer<'T> with | |
member self.Apply<'R>(rf, monoid) : 'R = | |
Seq.fold (flip rf) (monoid.Zero()) values | |
} | |
let inline toParallelReducer (seqReduceCount : int) (values : 'T []) : Reducer<'T> = | |
{ | |
new Reducer<'T> with | |
member self.Apply<'R>(rf, monoid) : 'R = | |
let rec reduceCombine s e = | |
async { | |
if e - s <= seqReduceCount then | |
let s' = if s > 0 then s + 1 else s | |
return Seq.fold (fun s i -> rf values.[i] s) (monoid.Zero()) {s'..e} | |
else | |
let m = (s + e) / 2 | |
let! result = Async.Parallel [| reduceCombine s m; reduceCombine m e |] | |
return monoid.Combine result.[0] result.[1] | |
} | |
reduceCombine 0 (values.Length - 1) |> Async.RunSynchronously | |
} | |
let inline toParallelLinqReducer (values : seq<'T>) : Reducer<'T> = | |
{ | |
new Reducer<'T> with | |
member self.Apply<'R>(rf, monoid) : 'R = | |
ParallelEnumerable.Aggregate(values.AsParallel(), Func<'R>(monoid.Zero), | |
Func<'R, 'T, 'R>(fun acc v -> rf v acc), | |
Func<'R, 'R, 'R>(monoid.Combine), Func<'R, 'R>(id)) | |
} | |
// transform functions | |
let inline collect (f : 'A -> Reducer<'B>) (input : Reducer<'A>) : Reducer<'B> = | |
{ | |
new Reducer<'B> with | |
member self.Apply<'R> (rf, monoid) = | |
// not a monoid (???) | |
let nmonoid r = | |
{ new Monoid<_>() with | |
member m.Zero() = r | |
member m.Combine a b = monoid.Combine a b } | |
input.Apply<'R>((fun a r -> (f a).Apply(rf, nmonoid r)), monoid) | |
} | |
let inline map (f : 'A -> 'B) (input : Reducer<'A>) : Reducer<'B> = | |
{ | |
new Reducer<'B> with | |
member self.Apply<'R> (rf, monoid) = | |
input.Apply<'R>((fun a r -> rf (f a) r), monoid) | |
} | |
let inline filter (p : 'A -> bool) (input : Reducer<'A>) : Reducer<'A> = | |
{ | |
new Reducer<'A> with | |
member self.Apply<'R> (rf, monoid) = | |
input.Apply<'R>((fun a r -> if p a then rf a r else r), monoid) | |
} | |
// reduce functions | |
let inline reduce (reducef : 'T -> 'R -> 'R) monoid (reducer : Reducer<'T>) : 'R = | |
reducer.Apply(reducef, monoid) | |
let inline sum (reducer : Reducer<int>) : int = | |
reduce (+) (sumMonoid()) reducer | |
let inline length (reducer : Reducer<'T>) : int = | |
reduce (fun _ r -> r + 1) (sumMonoid()) reducer | |
let inline concat (reducer : Reducer<string>) : string = | |
let result = | |
reduce (fun (v : string) (builder : StringBuilder) -> builder.Append(v)) | |
stringBuilderMonoid | |
reducer | |
result.ToString() | |
let inline toArray (reducer : Reducer<'T>) : 'T [] = | |
let result = | |
reduce (fun v (list : List<'T>) -> list.Add(v); list) | |
(listMonoid()) | |
reducer | |
result.ToArray() | |
let inline groupBy (selectorF : 'T -> 'Key) | |
(transformF : 'T -> 'Elem) | |
(aggregateF : 'Key * seq<'Elem> -> 'Elem) | |
(reducer : Reducer<'T>) : seq<'Key * 'Elem> = | |
let inline reduceF (v : 'T) (r : Dictionary<'Key, List<'Elem>>) = | |
let key = selectorF v | |
let elem = transformF v | |
if r.ContainsKey(key) then | |
r.[key].Add(elem) | |
let result = aggregateF (key, r.[key]) | |
r.[key].Clear() | |
r.[key].Add(result) | |
else | |
r.Add(key, List<_>([| elem |])) | |
r | |
let result = reduce reduceF (dictionaryListMonoid aggregateF) reducer | |
result |> Seq.map (fun keyValue -> (keyValue.Key, aggregateF (keyValue.Key, keyValue.Value))) | |
let inline countBy (selectorF : 'T -> 'Key) (reducer : Reducer<'T>) : seq<'Key * int> = | |
let inline reduceF (v : 'T) (r : Dictionary<'Key, int>) = | |
let key = selectorF v | |
r.[key] <- | |
if r.ContainsKey(key) | |
then r.[key] + 1 | |
else 1 | |
r | |
let result = reduce reduceF (dictionaryIntMonoid()) reducer | |
result |> Seq.map (fun keyValue -> keyValue.Key, keyValue.Value) | |
// Example - wordcount | |
let lines = System.IO.File.ReadAllLines("largefile.txt") | |
lines | |
|> Reducer.toParallelReducer 10 | |
|> Reducer.collect (fun line -> Reducer.toSeqReducer <| line.Split(' ')) | |
|> Reducer.groupBy id (fun _ -> 1) (fun (_, items) -> Seq.sum items) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment