Skip to content

Instantly share code, notes, and snippets.

@mrange
Last active March 11, 2022 10:02
Show Gist options
  • Save mrange/4bc1d6d9be246573bee071f37400df4e to your computer and use it in GitHub Desktop.
Save mrange/4bc1d6d9be246573bee071f37400df4e to your computer and use it in GitHub Desktop.
[F#/OCaml] Implementing a data streams library using a bunch of one-liners

[F#/OCaml] Implementing a data streams library using a bunch of one-liners

A few years ago when I read the presentation motivating the design behind Nessos Streams I was struck by the beauty of simplistic push streams.

type PushStream<'T> = ('T -> bool) -> bool

LINQ (in .NET) is a pull stream, ie we pull values out of the stream by calling MoveNext + Current. One of the problems with pull streams is the constant checking "Are we done?" at each level in the stream.

Push streams avoids this by letting the source pushing values to a receiver down stream. If the receiver had enough it returns false which propagated to the source which then stops iteration.

This results in a faster data streams library but also elegant code. If there are drawbacks it is that a push stream once started runs to the end (simplifies resource life-time managment though) and the push stream definition implies mutable state in the sinks.

Nessos Streams supports both pull and push to get the best of both worlds but it complicates the design.

How simple can a data streams library be?

For succinctness I am going to not specify the types of push streams library. I don't think this makes the code more readable but it makes it shorter.

Let's try to understand what the push stream is:

type PushStream<'T> = ('T -> bool) -> bool

The definition says that the push streams accepts a receiver function. Once invoked with the receiver function the push stream is supposed to pass values to the receiver until the push stream runs out of values or the receiver returns false meaning the receiver doesn't want more values. The push stream then returns either true meaning it ran to the end or false it got interrupted by the receiver at some point.

Push stream sources

Let's start with the empty push stream:

let empty = fun r -> true

empty accepts a receiver function r but since it's empty there's nothing to do and it just return true.

Now we can "simplify" the empty function by moving r to the left

let empty r = true

This definition is equivalent but we saved a few characters.

The singleton push stream is also simple to implement:

let singleton v r = r v

The push stream consists of a single value v and we just invoke the receiver r with it returning the result.

Because reality sucks I am going to introduce two helper functions that helps push values on a receiver or starting a push stream.

let inline push   r v = r v
let inline start  s r = s r

Currently they look meaningless but they will allow us to address a problem with how F# and the .NET jitter interacts at a later stage. Like I said; reality sucks.

singleton implemented using the functions above:

let inline singleton v r = push r v

We can implement other push stream sources as well

let rec ofList vs  r = match vs with [] -> true | h::t -> push r h && ofList t r
let rec range  b e r = if b <= e then push r b && range (b + 1) e r else true

ofList allows us to make a 'T list into a push stream myList |> ofList. range 0 10 is a push stream producing numbers from 0 to 10 inclusive.

Push stream pipes

Of course we need other functions to compose the data streams. append is a pipe that appends two push streams into one push stream and is simple to implement:

let inline append f s r = start f r && start s r

This says: start push stream f using receiver r, once that is done and the receiver wants more values start push stream s using receiver r.

We can now write:

let pushStream = append (range 0 10) (range 100 110)

map is also easy to implement:

let inline map m s r = start s <| fun v -> push r (m v)

Start push stream s with our receiver that given the value v maps it using m and pushes the mapped value to receiver r.

Other push stream pipes comes easy as well:

let inline choose  c s r = start s <| fun v -> match c v with ValueSome v -> push r v | _ -> true
let inline collect c s r = start s <| fun v -> start (c v) r
let inline filter  f s r = start s <| fun v -> if f v then push r v else true

collect aka flat map looks suprisingly simple but actually works. c is a map function that given a value returns a push stream that should be appended to the resulting push stream. So start push stream s using our receiver that given then value v maps it using c to a push stream. This push stream we start using receiver r.

Using these functions it's possible to start composing push streams:

let ps = range 0 50 |> map int64 |> filter (fun v -> (v &&& 1L) = 0L) |> map ((+) 1L)

Push stream sinks

So far everything has looked fine but unfortunately when we get into the sinks we realize that the simple definition we have implies mutability.

Let's start with the good one isEmpty:

let inline isEmpty s = start s (fun _ -> false)

This tests if a push stream is empty and we do it by start the push stream s with our receiver, we immedietly return false. The push stream s shall only return true if it ran to the end successfully, the only way it can do it with definition above is if it's empty.

tryFirst is a slightly more complicated version of above:

// Discard the first argument
let inline (>>.)  _ v = v
let inline tryFirst s = let a = ref ValueNone in start s (fun v -> a := ValueSome v; false) >>. !a

The aggregate a is initialized to ref ValueNone, our receiver stores any result in a as a ValueSome _ and returns false to stop the source from pushing values. We discard the boolean result and returns !a that is ValueNone if the push stream was empty, ValueSome _ otherwise.

fold let us fold all values in a push stream.

let inline fold f z s = let a = ref z in start s (fun v -> a := f !a v; true) >>. !a

From this we can define derived sinks

let inline sum    s = fold (+) LanguagePrimitives.GenericZero s
let inline toList s = fold (fun s v -> v::s) [] s |> List.rev

We can use the sinks to evaluate our push streams:

let s = range 0 50 |> map int64 |> filter (fun v -> (v &&& 1L) = 0L) |> map ((+) 1L) |> sum
printfn "%A" s

Testing our push streams library

Functional testing using FsCheck

FsCheck, one of the best libraries in the whole .NET ecosystem, can be used to check that we have actually implemented a push streams library that behaves as List.

I will cheat and check a few fundamental properties of the push stream but use List as an oracle for the vast majority of the tests as List well implemented, well tested and used a lot.

The most imporant property to test is that the ofList >> toList is an identity function. Most other tests relies on this:

static member ``ofList >> toList => identity``  (vs : int list) =
  let e = vs
  let a = e |> ofList |> toList
  e = a

We ask FsCheck to give us a list, any list and for all lists ofList >> toList should generate a new list that is structurally equal to the original list. FsCheck will generate lists of many different sizes and make sure we return true for all of them.

Once we know this property is true the other tests using List as oracle makes sense:

static member ``map <=> List.map`` (v : int) (vs : int list) =
  let m = (+) v
  let e = vs |> List.map m
  let a = vs |> ofList |> map m |> toList
  e = a

So this says that we want our map to behave in the same way as List.map. We check that by first applying the map function using List.map. We then construct a data stream that should give the same result for push streams: ofList >> map m >> toList, that is first make the list into a push stream, then map the push stream and finally make it into a list again.

After that we just need to check that: e = a

Performance testing

Performance is an important property of a data streams library. Here is where grandiose plans often hits reality hard and you have to start to "kill your darlings" (grim).

Remember that I mentioned above that reality sucks? In this particular case what sucks is that the .NET jitter doesn't generate performant code when running into virtual calls marked with .tail for functions that requires stack frames (most functions do unless trivial). Since push stream calls are tail calls which should be a good thing in order to not have performanced killed we need to suppress F# generating .tail attributes. We can do it through compiler options but sometimes we have tail calls in other parts of the program that need .tail. A hacky and clever way that also eliminates .tail was introduced to me by manofstick in his brilliant seq replacement PR.

// Pushes a value to a receiver
let inline push   r v = if r v then true else false
// Starts a push stream
let inline start  s r = if s r then true else false

This seems useless but it does eliminate the .tail attribute in the current version of F#. In addition, F# is smart enough to eliminate the unnecessary test.

To enable this was the main motivation for the push and start function.

Test case

We construct a data stream that uses very cheap functions to make any overhead introduced by the push stream library as visible as possible. If the functions were expensive they would hide the overhead.

let test n =
  range           0 (n - 1)
  |> map          int64
  |> filter       (fun v -> (v &&& 1L) = 0L)
  |> map          ((+) 1L)
  |> sum

We implement data stream for Seq, Array, List, LINQ , Nessos.Streams and a base line.

Performance base line

We should compare the push stream performance to a base line, the good old for loop:

let mutable sum = 0L
for i = 0 to (n - 1) do
  let i = int64 i
  if (i &&& 1L) = 0L then
    let i = i + 1L
    sum <- sum + i
sum

Running performance tests on .NET 4.7.2

I variate the size of n but the tests comes up pretty similar:

Test run, total: 10000000, outer: 1000, inner: 10000
  Running test case 'baseline' ...
  ... it took 7 ms with (0, 0, 0) cc and produced: 25000000L
  Running test case 'pushstream' ...
  ... it took 36 ms with (0, 0, 0) cc and produced: 25000000L
  Running test case 'nessos' ...
  ... it took 65 ms with (0, 0, 0) cc and produced: 25000000L
  Running test case 'array' ...
  ... it took 55 ms with (64, 0, 0) cc and produced: 25000000L
  Running test case 'list ' ...
  ... it took 523 ms with (199, 85, 0) cc and produced: 25000000L
  Running test case 'linq' ...
  ... it took 201 ms with (0, 0, 0) cc and produced: 25000000L
  Running test case 'seq' ...
  ... it took 1684 ms with (661, 0, 0) cc and produced: 25000000L

The baseline crushes the competition as expected. Nessos Streams suffers a bit from not having an init function requiring initInfinite id >> take n which makes the data stream longer for Nessos Streams.

baseline, pushstream and nessos don't trigger GC runs (0, 0, 0) cc where as array doing well performance wise requires a few generation 0 collection because of the constant allocation of arrays.

list being a single-linked list is slower than array as expected. seq is significantly slower than linq which is perhaps a bit strange as both are based around IEnumerable<_>.

Note; if you run the performance tests on DotNetCore 3.0 preview you need to turn off tiered compilation which is now turned on by default. Tiered compilation by the definition makes performance measuring harder but also I found that tiered compilation introduces performance regression in this kind of code. From powershell it looks like this:

$env:COMPlus_TieredCompilation=0

Conclusion

I am fascinated by the simplicity of push stream definition and the implied simplicity in many of its operations. As I had some time this easter I wanted to share this simple idea with you, the reader. Also the performance from this simple definition is surprisingly good.

If you find the mutability of the sinks hard to swallow or you want to reclaim the ability to pause an iteration I encourage you to take a look at transducers, commonly used in the clojure ecosystem. A (too) simple definition for a transducer could look something like this:

// A transducer works by transforming the folder function rather than the data stream
type Transducer<'S, 'T, 'U> = TS of (('S -> 'U -> 'S) -> ('S -> 'T -> 'S))

[F#] Full source code

module MinimalisticPushStream =
  // Disable Tiered compilation using powershell: $env:COMPlus_TieredCompilation=0
  module Details =
#if ENABLE_TAIL_CALLS
    // Pushes a value to a receiver
    let inline push   r v = r v
    // Starts a push stream
    let inline start  s r = s r
#else
    // This pattern disables .tail attribute generation, thanks to manofstick
    //  for showing me this trick. Of course future versions of F# might be smart
    //  enough to generate .tail attribute anyway but hopefully the jitter is doing
    //  a better job with .tail calls by then.

    // Pushes a value to a receiver
    let inline push   r v = if r v then true else false
    // Starts a push stream
    let inline start  s r = if s r then true else false
#endif

    let inline (>>.)  _ v = v
  open Details

  // Sources
  let inline empty          _ = true
  let rec    ofList     vs  r = match vs with [] -> true | h::t -> push r h && ofList t r
  let rec    range      b e r = if b <= e then push r b && range (b + 1) e r else true
  let inline singleton  v   r = push r v

  // Pipes
  let inline append     f s r = start f r && start s r
  let inline choose     c s r = start s <| fun v -> match c v with ValueSome v -> push r v | _ -> true
  let inline collect    c s r = start s <| fun v -> start (c v) r
  let inline filter     f s r = start s <| fun v -> if f v then push r v else true
  let inline map        m s r = start s <| fun v -> push r (m v)

  // Derived pipes
  let inline concat       s   = collect id s

  // Sinks
  let inline isEmpty        s = start s (fun _ -> false)
  let inline fold       f z s = let a = ref z in start s (fun v -> a := f !a v; true) >>. !a
  let inline tryFirst       s = let a = ref ValueNone in start s (fun v -> a := ValueSome v; false) >>. !a

  // Derived sinks
  let inline sum            s = fold (+) LanguagePrimitives.GenericZero s
  let inline toList         s = fold (fun s v -> v::s) [] s |> List.rev

module FunctionalTests =
  open MinimalisticPushStream
  module MinimalisticPushStream =
    type Properties =
      class
        // The round-trip property all other tests relies on
        static member ``ofList >> toList => identity``  (vs : int list) =
          let e = vs
          let a = e |> ofList |> toList
          e = a

        static member ``empty |> toList => empty list`` () =
          let e = []
          let a = (empty |> toList)
          e = a

        static member ``singleton v |> toList => [v]``  (v : int) =
          let e = [v]
          let a = (singleton v |> toList)
          e = a

        static member ``range b e |> toList => [v]``    (f : int) (l : int) =
          let e = List.init (l - f + 1 |> max 0) ((+) f)
          let a = range f l |> toList
          e = a

        static member ``append <=> List.append`` (f : int list) (s : int list) =
          let e = List.append f s
          let a = append (ofList f) (ofList s) |> toList
          e = a

        static member ``choose <=> List.choose`` (v : int) (vs : int list) =
          let c s n v' = if (v + v') % 2 = 0 then s (v + 1) else n
          let e = vs |> List.choose (c Some None)
          let a = vs |> ofList |> choose (c ValueSome ValueNone) |> toList
          e = a

        static member ``collect <=> List.collect`` (vs : int list list) =
          let e = vs |> List.collect id
          let a = vs |> ofList |> collect ofList |> toList
          e = a

        static member ``filter <=> List.filter`` (v : int) (vs : int list) =
          let f v' = (v + v') % 2 = 0
          let e = vs |> List.filter f
          let a = vs |> ofList |> filter f |> toList
          e = a

        static member ``map <=> List.map`` (v : int) (vs : int list) =
          let m = (+) v
          let e = vs |> List.map m
          let a = vs |> ofList |> map m |> toList
          e = a

        static member ``isEmpty <=> List.isEmpty`` (vs : int list) =
          let e = vs |> List.isEmpty
          let a = vs |> ofList |> isEmpty
          e = a

        static member ``fold <=> List.fold`` (vs : int list) =
          let e = vs |> List.fold (*) 1
          let a = vs |> ofList |> fold (*) 1
          e = a

        static member ``sum <=> List.sum`` (vs : int list) =
          let e = vs |> List.sum
          let a = vs |> ofList |> sum
          e = a

        static member ``non-trivial stream`` (vs : int list list) =
          let f v = v % 2 = 0
          let m v = string v
          let e = vs |> List.collect (List.filter f) |> List.map m
          let a = vs |> ofList |> collect (ofList >> filter f) |> map m |> toList
          e = a

      end

  open FsCheck
  let run () =
    let count  = 100
    let config = { Config.Quick with MaxTest = count; MaxFail = count }
    Check.All<MinimalisticPushStream.Properties> config

module PerformanceTests =
  open MinimalisticPushStream
  open System
  open System.Diagnostics

  let sw =
    let sw = Stopwatch ()
    sw.Start ()
    sw

  let time n a =
    let v = a ()

    let inline cc n = GC.CollectionCount n
    let bcc0, bcc1, bcc2 = cc 0, cc 1, cc 2
    let before = sw.ElapsedMilliseconds

    for i = 1 to n do
      a () |> ignore

    let after = sw.ElapsedMilliseconds
    let acc0, acc1, acc2 = cc 0, cc 1, cc 2

    v, after - before, (acc0 - bcc0, acc1 - bcc1, acc2 - bcc2)

  module PerfBaseline =
    let test n =
      let mutable sum = 0L
      for i = 0 to (n - 1) do
        let i = int64 i
        if (i &&& 1L) = 0L then
          let i = i + 1L
          sum <- sum + i
      sum

  module PerfArray =
    let test n =
      Array.init      n id
      |> Array.map    int64
      |> Array.filter (fun v -> (v &&& 1L) = 0L)
      |> Array.map    ((+) 1L)
      |> Array.sum

  module PerfList =
    let test n =
      List.init       n id
      |> List.map     int64
      |> List.filter  (fun v -> (v &&& 1L) = 0L)
      |> List.map     ((+) 1L)
      |> List.sum

  module PerfNessos =
    open Nessos.Streams
    let test n =
      Stream.initInfinite id
      |> Stream.take      n
      |> Stream.map       int64
      |> Stream.filter    (fun v -> (v &&& 1L) = 0L)
      |> Stream.map       ((+) 1L)
      |> Stream.sum

  module PerfPushStream =
    let test n =
      range           0 (n - 1)
      |> map          int64
      |> filter       (fun v -> (v &&& 1L) = 0L)
      |> map          ((+) 1L)
      |> sum

  module PerfLinq =
    open System.Linq
    let test n = Enumerable.Range(0, n).Select(int64).Where(fun v -> (v &&& 1L) = 0L).Select((+) 1L).Sum()

  module PerfSeq =
    let test n =
      Seq.init        n id
      |> Seq.map      int64
      |> Seq.filter   (fun v -> (v &&& 1L) = 0L)
      |> Seq.map      ((+) 1L)
      |> Seq.sum

  let run () =
    let total     = 10000000
    let outers    = [| 100; 1000; 10000; 100000 |]
    let testCases =
      [|
        "baseline"    , fun n () -> PerfBaseline.test   n
        "pushstream"  , fun n () -> PerfPushStream.test n
        "nessos"      , fun n () -> PerfNessos.test     n
        "array"       , fun n () -> PerfArray.test      n
        "list"        , fun n () -> PerfList.test       n
        "linq"        , fun n () -> PerfLinq.test       n
        "seq"         , fun n () -> PerfSeq.test        n
      |]

    for outer in outers do
      let inner = total / outer
      assert (inner > 0)
      assert (total % outer = 0)

      printfn "Test run, total: %d, outer: %d, inner: %d" total outer inner

      for n, a in testCases do
        printfn "  Running test case '%s' ..." n
        let v, ms, cc = time outer (a inner)
        printfn "  ... it took %d ms with %A cc and produced: %A" ms cc v

[<EntryPoint>]
let main argv =
  FunctionalTests.run ()
  PerformanceTests.run ()
  0
@manofstick
Copy link

Not sure if I had informed you I had yet another "hero's journey" attempt at this, this time I took it right to the dragon's lair - corefx with a replacement for linq :-) (and thus a c# implementation)

Anyway, couldn't muster any other troops, so got slaughtered on the field.

But have been meaning to drag the implementation out into it's own nuget package, but just haven't got around to it yet.

It was somewhat simpler that my Seq replacement as didn't have to cater for the edge cases that Seq entailed (mainly around skip/take from memory, but also the general wrapper around the defined "undefined" behaviour (as far as docs go) around Current in pre/post stream scenerios).

Also the implementation resolved a long standing problem I'd faced around performance of small streams, tied to JIT_VirtualFunctionPointer. Wasn't solved particularly elegantly (I'm not sure if there is an truly element solution), but by including two functions to handle the seq<'a> -> seq<'b> and the seq<'a> -> seq<'a> cases.

There were some other goodies there too.

Now if somebody wanted to help me pull this out into a separate nuget package... :-)

dotnet/corefx#34208 (from https://github.com/manofstick/corefx/tree/ChainLinqFence - changes only in the System.Linq directory....)

@mrange
Copy link
Author

mrange commented May 2, 2019

@manofstick sounds just like you. What are the performance numbers for the LINQ improvment?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment