Last active
          August 29, 2015 14:09 
        
      - 
      
- 
        Save hodzanassredin/a666d3f66fdbfe89cb9e to your computer and use it in GitHub Desktop. 
    reducers vs seq vs stream vs array
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | namespace test | |
| module ConcreteSeq = | |
| // Shadowing the iter function is probably ok. | |
| let inline iter f (seq : seq<'a>) = | |
| match seq with | |
| | :? array<'a> -> Array.iter f (seq :?> array<'a>) | |
| | :? list<'a> -> List.iter f (seq :?> list<'a>) | |
| | _ -> Seq.iter f seq | |
| // Though you probably shouldn't shadow the map function!! | |
| let inline map f (seq : seq<'a>) = | |
| match seq with | |
| | :? array<'a> -> Array.map f (seq :?> array<'a>) :> seq<'a> | |
| | :? list<'a> -> List.map f (seq :?> list<'a>) :> seq<'a> | |
| | _ -> Seq.map f seq | |
| let inline filter f (seq : seq<'a>) = | |
| match seq with | |
| | :? array<'a> -> Array.filter f (seq :?> array<'a>) :> seq<'a> | |
| | :? list<'a> -> List.filter f (seq :?> list<'a>) :> seq<'a> | |
| | _ -> Seq.filter f seq | |
| let inline sum (seq : seq<'a>) = | |
| match seq with | |
| | :? array<'a> -> Array.sum (seq :?> array<'a>) | |
| | :? list<'a> -> List.sum (seq :?> list<'a>) | |
| | _ -> Seq.sum seq | |
| let toArray (seq : seq<'a>) = | |
| match seq with | |
| | :? array<'a> -> seq :?> array<'a> | |
| | _ -> Seq.toArray seq | |
| let toList (seq : seq<'a>) = | |
| match seq with | |
| | :? list<'a> -> seq :?> list<'a> | |
| | _ -> Seq.toList seq | |
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | namespace test | |
| module Reducer = | |
| open System | |
| open System.Text | |
| open System.Collections.Generic | |
| open System.Linq | |
| open System.Threading.Tasks | |
| type ReduceFunc<'T,'R> = 'R -> 'T -> 'R | |
| type CombineFunc<'R> = 'R -> 'R -> 'R | |
| type Reducer<'T> = | |
| abstract Apply<'R> : ReduceFunc<'T, 'R> * CombineFunc<'R> * (unit -> 'R) -> 'R | |
| // executor functions | |
| let inline toSeqReducer (values : seq<'T>) : Reducer<'T> = | |
| { | |
| new Reducer<'T> with | |
| member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, _, init : unit -> 'R) : 'R = | |
| let mutable r = init () | |
| for value in values do | |
| r <- rf r value | |
| r | |
| } | |
| let inline toArrReducer (values : array<'T>) : Reducer<'T> = | |
| { | |
| new Reducer<'T> with | |
| member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, _, init : unit -> 'R) : 'R = | |
| Array.fold rf (init ()) values | |
| } | |
| let totalWorkers = Environment.ProcessorCount | |
| let inline toParallelTaskReducer (source : array<'T>) : Reducer<'T> = | |
| { | |
| new Reducer<'T> with | |
| member self.Apply<'R>(rf : ReduceFunc<'T, 'R>, cf : CombineFunc<'R>, init : unit -> 'R) : 'R = | |
| let r = init() | |
| if source.Length = 0 then r | |
| else | |
| let results : 'R[] = Array.create totalWorkers Unchecked.defaultof<'R> | |
| let job p = | |
| let mutable r:'R = r | |
| let s,e = source.Length * p / totalWorkers, source.Length * (p + 1) / totalWorkers | |
| for i in s..e-1 do | |
| r <- rf r source.[i] | |
| results.[p] <- r | |
| Parallel.For(0, totalWorkers, job) |> ignore | |
| Array.reduce cf results | |
| } | |
| // transform functions | |
| let inline collect (f : 'A -> Reducer<'B>) (input : Reducer<'A>) : Reducer<'B> = | |
| { | |
| new Reducer<'B> with | |
| member self.Apply<'R> (rf, cf, init) = | |
| input.Apply<'R>( (fun r a -> (f a).Apply(rf, cf, fun () -> r)), cf, init) | |
| } | |
| let inline map (f : 'A -> 'B) (input : Reducer<'A>) : Reducer<'B> = | |
| { | |
| new Reducer<'B> with | |
| member self.Apply<'R> (rf, cf, init) = | |
| input.Apply<'R>( (fun r a -> rf r (f a)), cf, init) | |
| } | |
| let inline filter (p : 'A -> bool) (input : Reducer<'A>) : Reducer<'A> = | |
| { | |
| new Reducer<'A> with | |
| member self.Apply<'R> (rf, cf, init) = | |
| input.Apply<'R>( (fun r a -> if p a then rf r a else r), cf, init) | |
| } | |
| // reduce functions | |
| let inline reduce (reducef : 'R ->'T -> 'R) (combineF : 'R -> 'R -> 'R) (init : (unit -> 'R)) (reducer : Reducer<'T>) : 'R = | |
| reducer.Apply( reducef, combineF, init) | |
| let inline sum (reducer : Reducer<int64>) : int64 = | |
| reduce (+) (+) (fun () -> 0L) reducer | |
| let inline length (reducer : Reducer<'T>) : int = | |
| reduce (fun _ r -> r + 1) (+) (fun () -> 0) reducer | |
| let inline concat (reducer : Reducer<string>) : string = | |
| let result = | |
| reduce (fun (builder : StringBuilder) (v : string) -> builder.Append(v)) | |
| (fun (left : StringBuilder) (right : StringBuilder) -> left.Append(right)) | |
| (fun () -> new StringBuilder()) | |
| reducer | |
| result.ToString() | |
| let inline toArray (reducer : Reducer<'T>) : 'T [] = | |
| let result = | |
| reduce (fun (list : List<'T>) v -> list.Add(v); list) | |
| (fun (left : List<'T>) (right : List<'T>) -> left.AddRange(right); left) | |
| (fun () -> new List<'T>()) | |
| reducer | |
| result.ToArray() | |
| let inline groupBy (selectorF : 'T -> 'Key) | |
| (transformF : 'T -> 'Elem) | |
| (aggregateF : 'Key * seq<'Elem> -> 'Elem) | |
| (reducer : Reducer<'T>) : seq<'Key * 'Elem> = | |
| let inline reduceF (v : 'T) (r : Dictionary<'Key, List<'Elem>>) = | |
| let key = selectorF v | |
| let elem = transformF v | |
| if r.ContainsKey(key) then | |
| r.[key].Add(elem) | |
| let result = (key, r.[key]) |> aggregateF | |
| r.[key].Clear() | |
| r.[key].Add(result) | |
| else | |
| r.Add(key, new List<_>([| elem |])) | |
| r | |
| let inline combineF (left : Dictionary<'Key, List<'Elem>>) (right : Dictionary<'Key, List<'Elem>>) = | |
| for keyValue in right do | |
| if left.ContainsKey(keyValue.Key) then | |
| left.[keyValue.Key].AddRange(right.[keyValue.Key]) | |
| let result = (keyValue.Key, left.[keyValue.Key]) |> aggregateF | |
| left.[keyValue.Key].Clear() | |
| left.[keyValue.Key].Add(result) | |
| else | |
| left.[keyValue.Key] <- new List<_>([| (keyValue.Key, keyValue.Value) |> aggregateF |]) | |
| left | |
| let result = | |
| reduce reduceF combineF | |
| (fun () -> new Dictionary<'Key, List<'Elem>>()) | |
| reducer | |
| result |> Seq.map (fun keyValue -> (keyValue.Key, (keyValue.Key, keyValue.Value) |> aggregateF)) | |
| let inline countBy (selectorF : 'T -> 'Key) (reducer : Reducer<'T>) : seq<'Key * int> = | |
| let inline reduceF (v : 'T) (r : Dictionary<'Key, int>) = | |
| let key = selectorF v | |
| if r.ContainsKey(key) then | |
| r.[key] <- r.[key] + 1 | |
| else | |
| r.[key] <- 1 | |
| r | |
| let inline combineF (left : Dictionary<'Key, int>) (right : Dictionary<'Key, int>) = | |
| for keyValue in right do | |
| if left.ContainsKey(keyValue.Key) then | |
| left.[keyValue.Key] <- left.[keyValue.Key] + right.[keyValue.Key] | |
| else | |
| left.[keyValue.Key] <- keyValue.Value | |
| left | |
| let result = | |
| reduce reduceF combineF | |
| (fun () -> new Dictionary<'Key, int>()) | |
| reducer | |
| result |> Seq.map (fun keyValue -> (keyValue.Key, keyValue.Value)) | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | //results: | |
| //stream 2.290740 sec, result = 625000050000000 | |
| //seq 3.187981 sec, result = 625000050000000 | |
| //concrete_seq 1.288102 sec, result = 625000050000000 | |
| //array 1.261778 sec, result = 625000050000000 | |
| //reducer 2.592996 sec, result = 625000050000000 | |
| //par Stream 1.171090 sec, result = 625000050000000 | |
| //par Seq 1.176698 sec, result = 625000050000000 | |
| //par array 0.732016 sec, result = 625000050000000 | |
| //par Reducer 1.800725 sec, result = 625000050000000 | |
| open Nessos.Streams | |
| open test | |
| open Microsoft.FSharp.Collections | |
| let check name (data:int64[]) finit filter fmap fsum= | |
| let stopWatch = System.Diagnostics.Stopwatch.StartNew() | |
| let res = data |> finit |> filter (fun x -> x % 2L = 0L) |> fmap (fun x -> x + 1L) |> fsum | |
| stopWatch.Stop() | |
| printfn "%-10s\t %f sec, \t result = %d" name stopWatch.Elapsed.TotalSeconds res | |
| [<EntryPoint>] | |
| let main argv = | |
| let data = [|1..50000000|] |> Array.map int64 | |
| // Sequential | |
| check "stream" data Stream.ofArray Stream.filter Stream.map Stream.sum | |
| check "seq" data id Seq.filter Seq.map Seq.sum | |
| check "concrete_seq" data id ConcreteSeq.filter ConcreteSeq.map ConcreteSeq.sum | |
| check "array" data id Array.filter Array.map Array.sum | |
| check "reducer" data Reducer.toArrReducer Reducer.filter Reducer.map Reducer.sum | |
| // Parallel | |
| check "par Stream" data ParStream.ofArray ParStream.filter ParStream.map ParStream.sum | |
| check "par Seq" data Seq.ofArray PSeq.filter PSeq.map PSeq.sum | |
| check "par array" data id PSeq.filter PSeq.map PSeq.sum | |
| check "par Reducer" data Reducer.toParallelTaskReducer Reducer.filter Reducer.map Reducer.sum | |
| System.Console.ReadLine() |> ignore | |
| 0 // return an integer exit code | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment