A few years ago when I read the presentation motivating the design behind Nessos Streams I was struck by the beauty of simplistic push streams.
type PushStream<'T> = ('T -> bool) -> bool
LINQ (in .NET) is a pull stream, ie we pull values out of the stream by calling MoveNext
+ Current
. One of the problems with pull streams is the constant checking "Are we done?" at each level in the stream.
Push streams avoids this by letting the source pushing values to a receiver down stream. If the receiver had enough it returns false which propagated to the source which then stops iteration.
This results in a faster data streams library but also elegant code. If there are drawbacks it is that a push stream once started runs to the end (simplifies resource life-time managment though) and the push stream definition implies mutable state in the sinks.
Nessos Streams supports both pull and push to get the best of both worlds but it complicates the design.
For succinctness I am going to not specify the types of push streams library. I don't think this makes the code more readable but it makes it shorter.
Let's try to understand what the push stream is:
type PushStream<'T> = ('T -> bool) -> bool
The definition says that the push streams accepts a receiver function. Once invoked with the receiver function the push stream is supposed to pass values to the receiver until the push stream runs out of values or the receiver returns false
meaning the receiver doesn't want more values. The push stream then returns either true
meaning it ran to the end or false
it got interrupted by the receiver at some point.
Let's start with the empty
push stream:
let empty = fun r -> true
empty
accepts a receiver function r
but since it's empty there's nothing to do and it just return true.
Now we can "simplify" the empty function by moving r
to the left
let empty r = true
This definition is equivalent but we saved a few characters.
The singleton
push stream is also simple to implement:
let singleton v r = r v
The push stream consists of a single value v
and we just invoke the receiver r
with it returning the result.
Because reality sucks I am going to introduce two helper functions that helps push values on a receiver or starting a push stream.
let inline push r v = r v
let inline start s r = s r
Currently they look meaningless but they will allow us to address a problem with how F# and the .NET jitter interacts at a later stage. Like I said; reality sucks.
singleton
implemented using the functions above:
let inline singleton v r = push r v
We can implement other push stream sources as well
let rec ofList vs r = match vs with [] -> true | h::t -> push r h && ofList t r
let rec range b e r = if b <= e then push r b && range (b + 1) e r else true
ofList
allows us to make a 'T list
into a push stream myList |> ofList
. range 0 10
is a push stream producing numbers from 0 to 10 inclusive.
Of course we need other functions to compose the data streams. append
is a pipe that appends two push streams into one push stream and is simple to implement:
let inline append f s r = start f r && start s r
This says: start push stream f
using receiver r
, once that is done and the receiver wants more values start push stream s
using receiver r
.
We can now write:
let pushStream = append (range 0 10) (range 100 110)
map
is also easy to implement:
let inline map m s r = start s <| fun v -> push r (m v)
Start push stream s
with our receiver that given the value v
maps it using m
and pushes the mapped value to receiver r
.
Other push stream pipes comes easy as well:
let inline choose c s r = start s <| fun v -> match c v with ValueSome v -> push r v | _ -> true
let inline collect c s r = start s <| fun v -> start (c v) r
let inline filter f s r = start s <| fun v -> if f v then push r v else true
collect
aka flat map looks suprisingly simple but actually works. c
is a map function that given a value returns a push stream that should be appended to the resulting push stream. So start push stream s
using our receiver that given then value v
maps it using c
to a push stream. This push stream we start using receiver r
.
Using these functions it's possible to start composing push streams:
let ps = range 0 50 |> map int64 |> filter (fun v -> (v &&& 1L) = 0L) |> map ((+) 1L)
So far everything has looked fine but unfortunately when we get into the sinks we realize that the simple definition we have implies mutability.
Let's start with the good one isEmpty
:
let inline isEmpty s = start s (fun _ -> false)
This tests if a push stream is empty and we do it by start the push stream s
with our receiver, we immedietly return false
. The push stream s
shall only return true
if it ran to the end successfully, the only way it can do it with definition above is if it's empty.
tryFirst
is a slightly more complicated version of above:
// Discard the first argument
let inline (>>.) _ v = v
let inline tryFirst s = let a = ref ValueNone in start s (fun v -> a := ValueSome v; false) >>. !a
The aggregate a
is initialized to ref ValueNone
, our receiver stores any result in a
as a ValueSome _
and returns false to stop the source from pushing values. We discard the boolean result and returns !a
that is ValueNone
if the push stream was empty, ValueSome _
otherwise.
fold
let us fold all values in a push stream.
let inline fold f z s = let a = ref z in start s (fun v -> a := f !a v; true) >>. !a
From this we can define derived sinks
let inline sum s = fold (+) LanguagePrimitives.GenericZero s
let inline toList s = fold (fun s v -> v::s) [] s |> List.rev
We can use the sinks to evaluate our push streams:
let s = range 0 50 |> map int64 |> filter (fun v -> (v &&& 1L) = 0L) |> map ((+) 1L) |> sum
printfn "%A" s
FsCheck
, one of the best libraries in the whole .NET ecosystem, can be used to check that we have actually implemented a push streams library that behaves as List
.
I will cheat and check a few fundamental properties of the push stream but use List
as an oracle for the vast majority of the tests as List
well implemented, well tested and used a lot.
The most imporant property to test is that the ofList >> toList
is an identity function. Most other tests relies on this:
static member ``ofList >> toList => identity`` (vs : int list) =
let e = vs
let a = e |> ofList |> toList
e = a
We ask FsCheck
to give us a list, any list and for all lists ofList >> toList
should generate a new list that is structurally equal to the original list. FsCheck will generate lists of many different sizes and make sure we return true for all of them.
Once we know this property is true the other tests using List
as oracle makes sense:
static member ``map <=> List.map`` (v : int) (vs : int list) =
let m = (+) v
let e = vs |> List.map m
let a = vs |> ofList |> map m |> toList
e = a
So this says that we want our map
to behave in the same way as List.map
. We check that by first applying the map function using List.map
. We then construct a data stream that should give the same result for push streams: ofList >> map m >> toList
, that is first make the list into a push stream, then map the push stream and finally make it into a list again.
After that we just need to check that: e = a
Performance is an important property of a data streams library. Here is where grandiose plans often hits reality hard and you have to start to "kill your darlings" (grim).
Remember that I mentioned above that reality sucks? In this particular case what sucks is that the .NET jitter doesn't generate performant code when running into virtual calls marked with .tail
for functions that requires stack frames (most functions do unless trivial). Since push stream calls are tail calls which should be a good thing in order to not have performanced killed we need to suppress F# generating .tail
attributes. We can do it through compiler options but sometimes we have tail calls in other parts of the program that need .tail
. A hacky and clever way that also eliminates .tail
was introduced to me by manofstick
in his brilliant seq
replacement PR.
// Pushes a value to a receiver
let inline push r v = if r v then true else false
// Starts a push stream
let inline start s r = if s r then true else false
This seems useless but it does eliminate the .tail
attribute in the current version of F#
. In addition, F#
is smart enough to eliminate the unnecessary test.
To enable this was the main motivation for the push
and start
function.
We construct a data stream that uses very cheap functions to make any overhead introduced by the push stream library as visible as possible. If the functions were expensive they would hide the overhead.
let test n =
range 0 (n - 1)
|> map int64
|> filter (fun v -> (v &&& 1L) = 0L)
|> map ((+) 1L)
|> sum
We implement data stream for Seq
, Array
, List
, LINQ
, Nessos.Streams
and a base line.
We should compare the push stream performance to a base line, the good old for loop:
let mutable sum = 0L
for i = 0 to (n - 1) do
let i = int64 i
if (i &&& 1L) = 0L then
let i = i + 1L
sum <- sum + i
sum
I variate the size of n
but the tests comes up pretty similar:
Test run, total: 10000000, outer: 1000, inner: 10000
Running test case 'baseline' ...
... it took 7 ms with (0, 0, 0) cc and produced: 25000000L
Running test case 'pushstream' ...
... it took 36 ms with (0, 0, 0) cc and produced: 25000000L
Running test case 'nessos' ...
... it took 65 ms with (0, 0, 0) cc and produced: 25000000L
Running test case 'array' ...
... it took 55 ms with (64, 0, 0) cc and produced: 25000000L
Running test case 'list ' ...
... it took 523 ms with (199, 85, 0) cc and produced: 25000000L
Running test case 'linq' ...
... it took 201 ms with (0, 0, 0) cc and produced: 25000000L
Running test case 'seq' ...
... it took 1684 ms with (661, 0, 0) cc and produced: 25000000L
The baseline crushes the competition as expected. Nessos Streams suffers a bit from not having an init
function requiring initInfinite id >> take n
which makes the data stream longer for Nessos Streams.
baseline
, pushstream
and nessos
don't trigger GC runs (0, 0, 0) cc
where as array
doing well performance wise requires a few generation 0 collection because of the constant allocation of arrays.
list
being a single-linked list is slower than array
as expected. seq
is significantly slower than linq
which is perhaps a bit strange as both are based around IEnumerable<_>
.
Note; if you run the performance tests on DotNetCore 3.0 preview you need to turn off tiered compilation which is now turned on by default. Tiered compilation by the definition makes performance measuring harder but also I found that tiered compilation introduces performance regression in this kind of code. From powershell it looks like this:
$env:COMPlus_TieredCompilation=0
I am fascinated by the simplicity of push stream definition and the implied simplicity in many of its operations. As I had some time this easter I wanted to share this simple idea with you, the reader. Also the performance from this simple definition is surprisingly good.
If you find the mutability of the sinks hard to swallow or you want to reclaim the ability to pause an iteration I encourage you to take a look at transducers, commonly used in the clojure
ecosystem. A (too) simple definition for a transducer could look something like this:
// A transducer works by transforming the folder function rather than the data stream
type Transducer<'S, 'T, 'U> = TS of (('S -> 'U -> 'S) -> ('S -> 'T -> 'S))
module MinimalisticPushStream =
// Disable Tiered compilation using powershell: $env:COMPlus_TieredCompilation=0
module Details =
#if ENABLE_TAIL_CALLS
// Pushes a value to a receiver
let inline push r v = r v
// Starts a push stream
let inline start s r = s r
#else
// This pattern disables .tail attribute generation, thanks to manofstick
// for showing me this trick. Of course future versions of F# might be smart
// enough to generate .tail attribute anyway but hopefully the jitter is doing
// a better job with .tail calls by then.
// Pushes a value to a receiver
let inline push r v = if r v then true else false
// Starts a push stream
let inline start s r = if s r then true else false
#endif
let inline (>>.) _ v = v
open Details
// Sources
let inline empty _ = true
let rec ofList vs r = match vs with [] -> true | h::t -> push r h && ofList t r
let rec range b e r = if b <= e then push r b && range (b + 1) e r else true
let inline singleton v r = push r v
// Pipes
let inline append f s r = start f r && start s r
let inline choose c s r = start s <| fun v -> match c v with ValueSome v -> push r v | _ -> true
let inline collect c s r = start s <| fun v -> start (c v) r
let inline filter f s r = start s <| fun v -> if f v then push r v else true
let inline map m s r = start s <| fun v -> push r (m v)
// Derived pipes
let inline concat s = collect id s
// Sinks
let inline isEmpty s = start s (fun _ -> false)
let inline fold f z s = let a = ref z in start s (fun v -> a := f !a v; true) >>. !a
let inline tryFirst s = let a = ref ValueNone in start s (fun v -> a := ValueSome v; false) >>. !a
// Derived sinks
let inline sum s = fold (+) LanguagePrimitives.GenericZero s
let inline toList s = fold (fun s v -> v::s) [] s |> List.rev
module FunctionalTests =
open MinimalisticPushStream
module MinimalisticPushStream =
type Properties =
class
// The round-trip property all other tests relies on
static member ``ofList >> toList => identity`` (vs : int list) =
let e = vs
let a = e |> ofList |> toList
e = a
static member ``empty |> toList => empty list`` () =
let e = []
let a = (empty |> toList)
e = a
static member ``singleton v |> toList => [v]`` (v : int) =
let e = [v]
let a = (singleton v |> toList)
e = a
static member ``range b e |> toList => [v]`` (f : int) (l : int) =
let e = List.init (l - f + 1 |> max 0) ((+) f)
let a = range f l |> toList
e = a
static member ``append <=> List.append`` (f : int list) (s : int list) =
let e = List.append f s
let a = append (ofList f) (ofList s) |> toList
e = a
static member ``choose <=> List.choose`` (v : int) (vs : int list) =
let c s n v' = if (v + v') % 2 = 0 then s (v + 1) else n
let e = vs |> List.choose (c Some None)
let a = vs |> ofList |> choose (c ValueSome ValueNone) |> toList
e = a
static member ``collect <=> List.collect`` (vs : int list list) =
let e = vs |> List.collect id
let a = vs |> ofList |> collect ofList |> toList
e = a
static member ``filter <=> List.filter`` (v : int) (vs : int list) =
let f v' = (v + v') % 2 = 0
let e = vs |> List.filter f
let a = vs |> ofList |> filter f |> toList
e = a
static member ``map <=> List.map`` (v : int) (vs : int list) =
let m = (+) v
let e = vs |> List.map m
let a = vs |> ofList |> map m |> toList
e = a
static member ``isEmpty <=> List.isEmpty`` (vs : int list) =
let e = vs |> List.isEmpty
let a = vs |> ofList |> isEmpty
e = a
static member ``fold <=> List.fold`` (vs : int list) =
let e = vs |> List.fold (*) 1
let a = vs |> ofList |> fold (*) 1
e = a
static member ``sum <=> List.sum`` (vs : int list) =
let e = vs |> List.sum
let a = vs |> ofList |> sum
e = a
static member ``non-trivial stream`` (vs : int list list) =
let f v = v % 2 = 0
let m v = string v
let e = vs |> List.collect (List.filter f) |> List.map m
let a = vs |> ofList |> collect (ofList >> filter f) |> map m |> toList
e = a
end
open FsCheck
let run () =
let count = 100
let config = { Config.Quick with MaxTest = count; MaxFail = count }
Check.All<MinimalisticPushStream.Properties> config
module PerformanceTests =
open MinimalisticPushStream
open System
open System.Diagnostics
let sw =
let sw = Stopwatch ()
sw.Start ()
sw
let time n a =
let v = a ()
let inline cc n = GC.CollectionCount n
let bcc0, bcc1, bcc2 = cc 0, cc 1, cc 2
let before = sw.ElapsedMilliseconds
for i = 1 to n do
a () |> ignore
let after = sw.ElapsedMilliseconds
let acc0, acc1, acc2 = cc 0, cc 1, cc 2
v, after - before, (acc0 - bcc0, acc1 - bcc1, acc2 - bcc2)
module PerfBaseline =
let test n =
let mutable sum = 0L
for i = 0 to (n - 1) do
let i = int64 i
if (i &&& 1L) = 0L then
let i = i + 1L
sum <- sum + i
sum
module PerfArray =
let test n =
Array.init n id
|> Array.map int64
|> Array.filter (fun v -> (v &&& 1L) = 0L)
|> Array.map ((+) 1L)
|> Array.sum
module PerfList =
let test n =
List.init n id
|> List.map int64
|> List.filter (fun v -> (v &&& 1L) = 0L)
|> List.map ((+) 1L)
|> List.sum
module PerfNessos =
open Nessos.Streams
let test n =
Stream.initInfinite id
|> Stream.take n
|> Stream.map int64
|> Stream.filter (fun v -> (v &&& 1L) = 0L)
|> Stream.map ((+) 1L)
|> Stream.sum
module PerfPushStream =
let test n =
range 0 (n - 1)
|> map int64
|> filter (fun v -> (v &&& 1L) = 0L)
|> map ((+) 1L)
|> sum
module PerfLinq =
open System.Linq
let test n = Enumerable.Range(0, n).Select(int64).Where(fun v -> (v &&& 1L) = 0L).Select((+) 1L).Sum()
module PerfSeq =
let test n =
Seq.init n id
|> Seq.map int64
|> Seq.filter (fun v -> (v &&& 1L) = 0L)
|> Seq.map ((+) 1L)
|> Seq.sum
let run () =
let total = 10000000
let outers = [| 100; 1000; 10000; 100000 |]
let testCases =
[|
"baseline" , fun n () -> PerfBaseline.test n
"pushstream" , fun n () -> PerfPushStream.test n
"nessos" , fun n () -> PerfNessos.test n
"array" , fun n () -> PerfArray.test n
"list" , fun n () -> PerfList.test n
"linq" , fun n () -> PerfLinq.test n
"seq" , fun n () -> PerfSeq.test n
|]
for outer in outers do
let inner = total / outer
assert (inner > 0)
assert (total % outer = 0)
printfn "Test run, total: %d, outer: %d, inner: %d" total outer inner
for n, a in testCases do
printfn " Running test case '%s' ..." n
let v, ms, cc = time outer (a inner)
printfn " ... it took %d ms with %A cc and produced: %A" ms cc v
[<EntryPoint>]
let main argv =
FunctionalTests.run ()
PerformanceTests.run ()
0
Not sure if I had informed you I had yet another "hero's journey" attempt at this, this time I took it right to the dragon's lair - corefx with a replacement for linq :-) (and thus a c# implementation)
Anyway, couldn't muster any other troops, so got slaughtered on the field.
But have been meaning to drag the implementation out into it's own nuget package, but just haven't got around to it yet.
It was somewhat simpler that my Seq replacement as didn't have to cater for the edge cases that Seq entailed (mainly around skip/take from memory, but also the general wrapper around the defined "undefined" behaviour (as far as docs go) around Current in pre/post stream scenerios).
Also the implementation resolved a long standing problem I'd faced around performance of small streams, tied to
JIT_VirtualFunctionPointer
. Wasn't solved particularly elegantly (I'm not sure if there is an truly element solution), but by including two functions to handle theseq<'a> -> seq<'b>
and theseq<'a> -> seq<'a>
cases.There were some other goodies there too.
Now if somebody wanted to help me pull this out into a separate nuget package... :-)
dotnet/corefx#34208 (from https://github.com/manofstick/corefx/tree/ChainLinqFence - changes only in the
System.Linq
directory....)