This is a little style and expressiveness test for various stream libraries. It is certainly not a proof of anything, so much as an ability to showcase how some basic features of streams fit together with different libraries.
Problem statement: given a library of streams, implement an API analogous to fgrep
, i.e., a literal string grepper.
Specifically, implement a function fgrep(test, filenames[, limit=10])
, which takes a source string, a list of filenames, and an optional concurrency limit, and produces a stream of Match
records { filename: string, lineNumber: number, line: string }
, representing all lines of all files that contain the string test
. For each matching record, filename
is the name of the file where the match was found, lineNumber
is the one-indexed line number where the match was found, and line
is the contents of the line where the match was found.
The limit
argument indicates the maximum number of concurrent filehandles that should be allowed to be open at any given time.
The basic shape of the problem solution might look something like this:
# string, string, number=10 -> Stream<Match>
fgrep(test, filenames, limit=10):
filenames.map(filename => fgrepFile(test, filename)) # Stream<Stream<Match>>
.rateLimit(limit) # Stream<Stream<Match>>
.concat() # Stream<Match>
# string, string -> Stream<Match>
fgrepFile(test, filename):
lazy-stream(() => line-stream(open-file-stream(filename))) # Stream<Line>
.filterMap((line, lineNumber) => # Stream<Match>
fgrepLine(test, filename, lineNumber, line))
# string, string, number, number -> Match | null
fgrepLine(test, filename, lineNumber, line):
if line.contains(test)
then { filename, lineNumber, line }
else null
From the top, this starts by creating a stream of lazy streams, each of which greps a single file. These are limited to limit
concurrent greps to avoid opening too many filehandles. This produces a stream of streams of matches, which are then flattened sequentially to a stream of matches.
This program is simple enough to be understandable and also useful, but it also introduces a few challenges:
- combines both binary and value/object streams
- requires a low-level line splitter, which showcases either transformations from binary streams to value streams or a stdlib of basic file manipulation combinators
- requires lazy file manipulation and throttling input to avoid dying from too many open filehandles
- result should be made more readable with simple combinators like
map
orfilterMap
orflatMap
- should also work well with backpressure, although this isn't directly demonstrated without using the function in a larger program
A naïve transformation of the basic shape description to Scala weighs at about 30 lines. A little more simplification results in:
The non-trivial part was getting grouping right. Since
grouped
works by transformingSeq[A]
toIterator[Seq[A]]
, the typeIterator[Seq[Iterator[Fgrep.Match]]]
needed to be flattened twice to getIterator[Fgrep.Match]
.