Skip to content

Instantly share code, notes, and snippets.

@hodzanassredin
Last active August 29, 2015 14:09
Show Gist options
  • Save hodzanassredin/a666d3f66fdbfe89cb9e to your computer and use it in GitHub Desktop.
Save hodzanassredin/a666d3f66fdbfe89cb9e to your computer and use it in GitHub Desktop.
reducers vs seq vs stream vs array
namespace test
module ConcreteSeq =
// Shadowing the iter function is probably ok.
let inline iter f (seq : seq<'a>) =
match seq with
| :? array<'a> -> Array.iter f (seq :?> array<'a>)
| :? list<'a> -> List.iter f (seq :?> list<'a>)
| _ -> Seq.iter f seq
// Though you probably shouldn't shadow the map function!!
let inline map f (seq : seq<'a>) =
match seq with
| :? array<'a> -> Array.map f (seq :?> array<'a>) :> seq<'a>
| :? list<'a> -> List.map f (seq :?> list<'a>) :> seq<'a>
| _ -> Seq.map f seq
let inline filter f (seq : seq<'a>) =
match seq with
| :? array<'a> -> Array.filter f (seq :?> array<'a>) :> seq<'a>
| :? list<'a> -> List.filter f (seq :?> list<'a>) :> seq<'a>
| _ -> Seq.filter f seq
let inline sum (seq : seq<'a>) =
match seq with
| :? array<'a> -> Array.sum (seq :?> array<'a>)
| :? list<'a> -> List.sum (seq :?> list<'a>)
| _ -> Seq.sum seq
let toArray (seq : seq<'a>) =
match seq with
| :? array<'a> -> seq :?> array<'a>
| _ -> Seq.toArray seq
let toList (seq : seq<'a>) =
match seq with
| :? list<'a> -> seq :?> list<'a>
| _ -> Seq.toList seq
namespace test
module Reducer =
open System
open System.Text
open System.Collections.Generic
open System.Linq
open System.Threading.Tasks
type ReduceFunc<'T,'R> = 'R -> 'T -> 'R
type CombineFunc<'R> = 'R -> 'R -> 'R
type Reducer<'T> =
abstract Apply<'R> : ReduceFunc<'T, 'R> * CombineFunc<'R> * (unit -> 'R) -> 'R
// executor functions
let inline toSeqReducer (values : seq<'T>) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, _, init : unit -> 'R) : 'R =
let mutable r = init ()
for value in values do
r <- rf r value
r
}
let inline toArrReducer (values : array<'T>) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, _, init : unit -> 'R) : 'R =
Array.fold rf (init ()) values
}
let totalWorkers = Environment.ProcessorCount
let inline toParallelTaskReducer (source : array<'T>) : Reducer<'T> =
{
new Reducer<'T> with
member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, cf : CombineFunc<'R>, init : unit -> 'R) : 'R =
let r = init()
if source.Length = 0 then r
else
let results : 'R[] = Array.create totalWorkers Unchecked.defaultof<'R>
let job p =
let mutable r:'R = r
let s,e = source.Length * p / totalWorkers, source.Length * (p + 1) / totalWorkers
for i in s..e-1 do
r <- rf r source.[i]
results.[p] <- r
Parallel.For(0, totalWorkers, job) |> ignore
Array.reduce cf results
}
// transform functions
let inline collect (f : 'A -> Reducer<'B>) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>( (fun r a -> (f a).Apply(rf, cf, fun () -> r)), cf, init)
}
let inline map (f : 'A -> 'B) (input : Reducer<'A>) : Reducer<'B> =
{
new Reducer<'B> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>( (fun r a -> rf r (f a)), cf, init)
}
let inline filter (p : 'A -> bool) (input : Reducer<'A>) : Reducer<'A> =
{
new Reducer<'A> with
member self.Apply<'R> (rf, cf, init) =
input.Apply<'R>( (fun r a -> if p a then rf r a else r), cf, init)
}
// reduce functions
let inline reduce (reducef : 'R ->'T -> 'R) (combineF : 'R -> 'R -> 'R) (init : (unit -> 'R)) (reducer : Reducer<'T>) : 'R =
reducer.Apply( reducef, combineF, init)
let inline sum (reducer : Reducer<int64>) : int64 =
reduce (+) (+) (fun () -> 0L) reducer
let inline length (reducer : Reducer<'T>) : int =
reduce (fun _ r -> r + 1) (+) (fun () -> 0) reducer
let inline concat (reducer : Reducer<string>) : string =
let result =
reduce (fun (builder : StringBuilder) (v : string) -> builder.Append(v))
(fun (left : StringBuilder) (right : StringBuilder) -> left.Append(right))
(fun () -> new StringBuilder())
reducer
result.ToString()
let inline toArray (reducer : Reducer<'T>) : 'T [] =
let result =
reduce (fun (list : List<'T>) v -> list.Add(v); list)
(fun (left : List<'T>) (right : List<'T>) -> left.AddRange(right); left)
(fun () -> new List<'T>())
reducer
result.ToArray()
let inline groupBy (selectorF : 'T -> 'Key)
(transformF : 'T -> 'Elem)
(aggregateF : 'Key * seq<'Elem> -> 'Elem)
(reducer : Reducer<'T>) : seq<'Key * 'Elem> =
let inline reduceF (v : 'T) (r : Dictionary<'Key, List<'Elem>>) =
let key = selectorF v
let elem = transformF v
if r.ContainsKey(key) then
r.[key].Add(elem)
let result = (key, r.[key]) |> aggregateF
r.[key].Clear()
r.[key].Add(result)
else
r.Add(key, new List<_>([| elem |]))
r
let inline combineF (left : Dictionary<'Key, List<'Elem>>) (right : Dictionary<'Key, List<'Elem>>) =
for keyValue in right do
if left.ContainsKey(keyValue.Key) then
left.[keyValue.Key].AddRange(right.[keyValue.Key])
let result = (keyValue.Key, left.[keyValue.Key]) |> aggregateF
left.[keyValue.Key].Clear()
left.[keyValue.Key].Add(result)
else
left.[keyValue.Key] <- new List<_>([| (keyValue.Key, keyValue.Value) |> aggregateF |])
left
let result =
reduce reduceF combineF
(fun () -> new Dictionary<'Key, List<'Elem>>())
reducer
result |> Seq.map (fun keyValue -> (keyValue.Key, (keyValue.Key, keyValue.Value) |> aggregateF))
let inline countBy (selectorF : 'T -> 'Key) (reducer : Reducer<'T>) : seq<'Key * int> =
let inline reduceF (v : 'T) (r : Dictionary<'Key, int>) =
let key = selectorF v
if r.ContainsKey(key) then
r.[key] <- r.[key] + 1
else
r.[key] <- 1
r
let inline combineF (left : Dictionary<'Key, int>) (right : Dictionary<'Key, int>) =
for keyValue in right do
if left.ContainsKey(keyValue.Key) then
left.[keyValue.Key] <- left.[keyValue.Key] + right.[keyValue.Key]
else
left.[keyValue.Key] <- keyValue.Value
left
let result =
reduce reduceF combineF
(fun () -> new Dictionary<'Key, int>())
reducer
result |> Seq.map (fun keyValue -> (keyValue.Key, keyValue.Value))
//results:
//stream 2.290740 sec, result = 625000050000000
//seq 3.187981 sec, result = 625000050000000
//concrete_seq 1.288102 sec, result = 625000050000000
//array 1.261778 sec, result = 625000050000000
//reducer 2.592996 sec, result = 625000050000000
//par Stream 1.171090 sec, result = 625000050000000
//par Seq 1.176698 sec, result = 625000050000000
//par array 0.732016 sec, result = 625000050000000
//par Reducer 1.800725 sec, result = 625000050000000
open Nessos.Streams
open test
open Microsoft.FSharp.Collections
let check name (data:int64[]) finit filter fmap fsum=
let stopWatch = System.Diagnostics.Stopwatch.StartNew()
let res = data |> finit |> filter (fun x -> x % 2L = 0L) |> fmap (fun x -> x + 1L) |> fsum
stopWatch.Stop()
printfn "%-10s\t %f sec, \t result = %d" name stopWatch.Elapsed.TotalSeconds res
[<EntryPoint>]
let main argv =
let data = [|1..50000000|] |> Array.map int64
// Sequential
check "stream" data Stream.ofArray Stream.filter Stream.map Stream.sum
check "seq" data id Seq.filter Seq.map Seq.sum
check "concrete_seq" data id ConcreteSeq.filter ConcreteSeq.map ConcreteSeq.sum
check "array" data id Array.filter Array.map Array.sum
check "reducer" data Reducer.toArrReducer Reducer.filter Reducer.map Reducer.sum
// Parallel
check "par Stream" data ParStream.ofArray ParStream.filter ParStream.map ParStream.sum
check "par Seq" data Seq.ofArray PSeq.filter PSeq.map PSeq.sum
check "par array" data id PSeq.filter PSeq.map PSeq.sum
check "par Reducer" data Reducer.toParallelTaskReducer Reducer.filter Reducer.map Reducer.sum
System.Console.ReadLine() |> ignore
0 // return an integer exit code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment