Created August 24, 2018 12:59
Experimenting with a batch/streams API for scalaz-analytics
// Note that this is written in terms of functions for now. We will rewrite this with arrows etc
// and work out the reified encoding once we agree on the API
* At a high level, there are two core concepts here
* 1. Data
* 2. Computation
* The Data can be thought of as Bounded or Unbounded.
* This generally corresponds to a "batch" source or a "streaming" source.
* The types of computations that can be done on each of these types of data varies.
* Specifically, you can only do bounded operations on bounded data, however you can do both bounded or unbounded
* operations on unbounded data by windowing over the unbounded data.
* This is represented in the heirachy below with
* - Dataset[A] representing bounded data of type A
* - DataStream[A] representing unbounded data of type A (potentially infinite)
* There are also specialised forms of each of these that allow specific analytical computations to be expressed more naturally
* - GroupedDataset[A, B] representing a dataset with values of type A that have been grouped according to B
* - WindowedDataStream[A] representing a bounded view on unbounded data to do meaningful aggregations on streams
// A bounded data source
type Dataset[_]
// A grouped (bounded) data source
type GroupedDataset[_, _]
// A *potentially* unbounded data source
type DataStream[_]
// A fixed view on a stream. This is semantically different to a Dataset
type WindowedDataStream[_]
// todo - DataStreamView? - Emulate the functionality of a KTable? - Probably out of scope for initial build
// todo work out the types and semantics of the various windows
// Would love a combinator styled way to construct these similar to how ZIO looks.
// A window can be defined by the arbitrary combination of a set of conditions?
sealed trait Window
// todo - Joins
trait BoundedOperations {
def map[A, B](ds: Dataset[A])(f: A => B): Dataset[B]
def filter[A](ds: Dataset[A])(f: A => Boolean): Dataset[A]
def flatMap[A, B](ds: Dataset[A])(f: A => Dataset[B]): Dataset[B]
def fold[A, B](ds: Dataset[A])(initial: B)(f: (B, A) => B): Dataset[B]
def distinct[A](ds: Dataset[A]): Dataset[A]
def sort[A](ds: Dataset[A]): Dataset[A]
// todo - should these all be variadic?
def union[A](ds1: Dataset[A], dsx: Dataset[A]*): Dataset[A]
def intersect[A](ds1: Dataset[A], dsx: Dataset[A]*): Dataset[A]
def diff[A](ds1: Dataset[A], dsx: Dataset[A]*): Dataset[A]
// Aggregations
def groupBy[A, B](ds: Dataset[A])(f: A => B): GroupedDataset[A, B]
// todo aggregate with many? applicative fusion? Whats the right type signature for this? How do we make it variadic and maintain types?
def aggregate[A, B, C](gds: GroupedDataset[A, B])(zero: C)(f: (C, A) => C): Dataset[C]
def toDataStream[A](ds: Dataset[A]): DataStream[A]
// todo - should this provide a Stream[A] to avoid potential OOM errors? We can then let users chunk the results etc?
def collect[A](ds: Dataset[A]): IO[Error, List[A]]
// todo - Joins
trait UnboundedOperations {
def map[A, B](ds: DataStream[A])(f: A => B): DataStream[B]
def filter[A](ds: DataStream[A])(f: A => Boolean): DataStream[A]
def flatMap[A, B](ds: DataStream[A])(f: A => DataStream[B]): DataStream[B]
// Split and merge streams - todo should this be done by arrows?
// todo not sure of the semantics of this
def union[A](ds1: DataStream[A], dsx: DataStream[A]*): DataStream[A]
// todo think about "keyed streams". Flink uses them to run parallel computations on some key.
// todo Can we define this in terms of arrow ops instead? We already have a split there.
// todo can we get rid of this by just letting users reuse the same `val`? If they never converge again, how do we know to "key by" rather than split the streams?
def split[A, B](ds: DataStream[A])(f: A => B): List[DataStream[A]]
// Streaming aggregations
def scanLeft[A, B](ds: DataStream[A])(initial: B)(f: (B, A) => B): DataStream[B]
def aggregation[A, B](ds: DataStream[A])(f: (A, A) => A): DataStream[A]
// Windowed aggregations
def fold[A, B](ds: WindowedDataStream[A])(initial: B)(f: (B, A) => B): DataStream[A]
def reduce[A, B](ds: WindowedDataStream[A])(f: (A, A) => A): DataStream[A]
def distinct[A](ds: WindowedDataStream[A]): DataStream[A]
def window[A](ds: DataStream[A], window: Window): WindowedDataStream[A]
// todo - this can't be a moving window
def toDataset[A](ds: DataStream[A], window: Window): Dataset[A]
def collect[A](ds: Dataset[A]): IO[Error, Stream[A]]
// todo - do we want to support Dataset <-> Datastream joins as well? Probably (standard "stream enrichment" use case)
// Syntax for DataStreams/Datasets
implicit class DatasetSyntax[A](ds: Dataset[A]) {
def map[B](f: A => B): Dataset[B] = ???
def filter(f: A => Boolean): Dataset[A] = ???
def flatMap[B](f: A => Dataset[B]): Dataset[B] = ???
def fold[B](initial: B)(f: (B, A) => B): Dataset[B] = ???
def distinct: Dataset[A] = ???
def sort: Dataset[A] = ???
def union(dsx: Dataset[A]*): Dataset[A] = ???
def intersect(dsx: Dataset[A]*): Dataset[A] = ???
def diff(dsx: Dataset[A]*): Dataset[A] = ???
def groupBy[B](f: A => B): GroupedDataset[A, B] = ???
def toDataStream: DataStream[A] = ???
def collect: IO[Error, List[A]] = ???
implicit class GroupedDatasetSyntax[A, B](ds: GroupedDataset[A, B]) {
def aggregate[C](zero: C)(f: (C, A) => C): Dataset[C] = ???
implicit class DataStreamSyntax[A](ds: DataStream[A]) {
def map[B](f: A => B): DataStream[B] = ???
def filter(f: A => Boolean): DataStream[A] = ???
def flatMap[B](f: A => DataStream[B]): DataStream[B] = ???
def union(dsx: DataStream[A]*): DataStream[A] = ???
def split[B](f: A => B): List[DataStream[A]] = ???
def scanLeft[B](initial: B)(f: (B, A) => B): DataStream[B] = ???
def aggregation[B](f: (A, A) => A): DataStream[A] = ???
def window(window: Window): WindowedDataStream[A] = ???
def toDataset(window: Window): Dataset[A] = ???
def collect: IO[Error, Stream[A]] = ???
implicit class WindowedDataStreamSyntax[A](ds: WindowedDataStream[A]) {
def fold[B](initial: B)(f: (B, A) => B): DataStream[A] = ???
def reduce[B](f: (A, A) => A): DataStream[A] = ???
def distinct: DataStream[A] = ???
// ---------- Usage examples ----------
val stream: DataStream[Int] = ???
val file: Dataset[Int] = ???
.window(new Window {})
.fold(0)(_ + _)
.toDataset(new Window {})
.map(i => (i, "abc", 7L))
.aggregate(List.empty[String])((acc, curr) => s"${curr._1} + ${curr._2} + ${curr._3}" +: acc)
.window(new Window {})
.reduce(_ ++ _)
