Skip to content

Instantly share code, notes, and snippets.

@mausch
Last active December 17, 2015 07:09
Show Gist options
  • Save mausch/5570451 to your computer and use it in GitHub Desktop.
Save mausch/5570451 to your computer and use it in GitHub Desktop.
Clojure reducers in F#
// http://clojure.com/blog/2012/05/08/reducers-a-library-and-model-for-collection-processing.html
// original by Nick Palladinos: http://fssnip.net/ip
#r "FSharp.PowerPack.Parallel.Seq.dll"
module Reducer =
open System
open System.Text
open System.Collections.Generic
open System.Linq
let inline flip f a b = f b a
[<AbstractClass>]
type Monoid<'a>() =
abstract Zero: unit -> 'a
abstract Combine: 'a -> 'a -> 'a
member x.Concat l = Seq.fold x.Combine (x.Zero()) l
let inline sumMonoid() =
{ new Monoid<_>() with
member x.Zero() = LanguagePrimitives.GenericZero
member x.Combine a b = a + b }
// mutates things, not really a monoid
let stringBuilderMonoid =
{ new Monoid<StringBuilder>() with
member x.Zero() = StringBuilder()
member x.Combine a b = a.Append b }
// mutates things, not really a monoid
let inline listMonoid() =
{ new Monoid<List<_>>() with
member x.Zero() = List()
member x.Combine a b = a.AddRange b; a }
// mutates things, not really a monoid
let inline dictionaryListMonoid aggregateF =
{ new Monoid<Dictionary<_,List<_>>>() with
member x.Zero() = Dictionary()
member x.Combine left right =
for keyValue in right do
if left.ContainsKey(keyValue.Key) then
left.[keyValue.Key].AddRange(right.[keyValue.Key])
let result = aggregateF (keyValue.Key, left.[keyValue.Key])
left.[keyValue.Key].Clear()
left.[keyValue.Key].Add(result)
else
left.[keyValue.Key] <- List<_>([| aggregateF (keyValue.Key, keyValue.Value) |])
left }
// mutates things, not really a monoid
let inline dictionaryIntMonoid() =
{ new Monoid<Dictionary<_,int>>() with
member x.Zero() = Dictionary()
member x.Combine left right =
for keyValue in right do
left.[keyValue.Key] <-
if left.ContainsKey(keyValue.Key)
then left.[keyValue.Key] + right.[keyValue.Key]
else keyValue.Value
left }
type ReduceFunc<'T, 'R> = 'T -> 'R -> 'R
type CombineFunc<'R> = 'R -> 'R -> 'R
type Reducer<'T> =
abstract Apply<'R> : ReduceFunc<'T, 'R> * Monoid<'R> -> 'R
// executor functions
let inline toSeqReducer (values : seq<'T>) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf, monoid) : 'R =
Seq.fold (flip rf) (monoid.Zero()) values
}
let inline toParallelReducer (seqReduceCount : int) (values : 'T []) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf, monoid) : 'R =
let rec reduceCombine s e =
async {
if e - s <= seqReduceCount then
let s' = if s > 0 then s + 1 else s
return Seq.fold (fun s i -> rf values.[i] s) (monoid.Zero()) {s'..e}
else
let m = (s + e) / 2
let! result = Async.Parallel [| reduceCombine s m; reduceCombine m e |]
return monoid.Combine result.[0] result.[1]
}
reduceCombine 0 (values.Length - 1) |> Async.RunSynchronously
}
let inline toParallelLinqReducer (values : seq<'T>) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf, monoid) : 'R =
ParallelEnumerable.Aggregate(values.AsParallel(), Func<'R>(monoid.Zero),
Func<'R, 'T, 'R>(fun acc v -> rf v acc),
Func<'R, 'R, 'R>(monoid.Combine), Func<'R, 'R>(id))
}
// transform functions
let inline collect (f : 'A -> Reducer<'B>) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, monoid) =
// not a monoid (???)
let nmonoid r =
{ new Monoid<_>() with
member m.Zero() = r
member m.Combine a b = monoid.Combine a b }
input.Apply<'R>((fun a r -> (f a).Apply(rf, nmonoid r)), monoid)
}
let inline map (f : 'A -> 'B) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, monoid) =
input.Apply<'R>((fun a r -> rf (f a) r), monoid)
}
let inline filter (p : 'A -> bool) (input : Reducer<'A>) : Reducer<'A> =
{
new Reducer<'A> with
member self.Apply<'R> (rf, monoid) =
input.Apply<'R>((fun a r -> if p a then rf a r else r), monoid)
}
// reduce functions
let inline reduce (reducef : 'T -> 'R -> 'R) monoid (reducer : Reducer<'T>) : 'R =
reducer.Apply(reducef, monoid)
let inline sum (reducer : Reducer<int>) : int =
reduce (+) (sumMonoid()) reducer
let inline length (reducer : Reducer<'T>) : int =
reduce (fun _ r -> r + 1) (sumMonoid()) reducer
let inline concat (reducer : Reducer<string>) : string =
let result =
reduce (fun (v : string) (builder : StringBuilder) -> builder.Append(v))
stringBuilderMonoid
reducer
result.ToString()
let inline toArray (reducer : Reducer<'T>) : 'T [] =
let result =
reduce (fun v (list : List<'T>) -> list.Add(v); list)
(listMonoid())
reducer
result.ToArray()
let inline groupBy (selectorF : 'T -> 'Key)
(transformF : 'T -> 'Elem)
(aggregateF : 'Key * seq<'Elem> -> 'Elem)
(reducer : Reducer<'T>) : seq<'Key * 'Elem> =
let inline reduceF (v : 'T) (r : Dictionary<'Key, List<'Elem>>) =
let key = selectorF v
let elem = transformF v
if r.ContainsKey(key) then
r.[key].Add(elem)
let result = aggregateF (key, r.[key])
r.[key].Clear()
r.[key].Add(result)
else
r.Add(key, List<_>([| elem |]))
r
let result = reduce reduceF (dictionaryListMonoid aggregateF) reducer
result |> Seq.map (fun keyValue -> (keyValue.Key, aggregateF (keyValue.Key, keyValue.Value)))
let inline countBy (selectorF : 'T -> 'Key) (reducer : Reducer<'T>) : seq<'Key * int> =
let inline reduceF (v : 'T) (r : Dictionary<'Key, int>) =
let key = selectorF v
r.[key] <-
if r.ContainsKey(key)
then r.[key] + 1
else 1
r
let result = reduce reduceF (dictionaryIntMonoid()) reducer
result |> Seq.map (fun keyValue -> keyValue.Key, keyValue.Value)
// Example - wordcount
let lines = System.IO.File.ReadAllLines("largefile.txt")
lines
|> Reducer.toParallelReducer 10
|> Reducer.collect (fun line -> Reducer.toSeqReducer <| line.Split(' '))
|> Reducer.groupBy id (fun _ -> 1) (fun (_, items) -> Seq.sum items)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment