Created
August 24, 2018 12:59
-
-
Save camjo/10cb0f25b9da10f08f9b30cbd9419985 to your computer and use it in GitHub Desktop.
Experimenting with a batch/streams API for scalaz-analytics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Note that this is written in terms of functions for now. We will rewrite this with arrows etc | |
// and work out the reified encoding once we agree on the API | |
/** | |
* At a high level, there are two core concepts here | |
* 1. Data | |
* 2. Computation | |
* | |
* The Data can be thought of as Bounded or Unbounded. | |
* This generally corresponds to a "batch" source or a "streaming" source. | |
* | |
* The types of computations that can be done on each of these types of data varies. | |
* Specifically, you can only do bounded operations on bounded data, however you can do both bounded or unbounded | |
* operations on unbounded data by windowing over the unbounded data. | |
* | |
* This is represented in the heirachy below with | |
* - Dataset[A] representing bounded data of type A | |
* - DataStream[A] representing unbounded data of type A (potentially infinite) | |
* | |
* There are also specialised forms of each of these that allow specific analytical computations to be expressed more naturally | |
* - GroupedDataset[A, B] representing a dataset with values of type A that have been grouped according to B | |
* - WindowedDataStream[A] representing a bounded view on unbounded data to do meaningful aggregations on streams | |
*/ | |
// A bounded data source | |
type Dataset[_] | |
// A grouped (bounded) data source | |
type GroupedDataset[_, _] | |
// A *potentially* unbounded data source | |
type DataStream[_] | |
// A fixed view on a stream. This is semantically different to a Dataset | |
type WindowedDataStream[_] | |
// todo - DataStreamView? - Emulate the functionality of a KTable? - Probably out of scope for initial build | |
// todo work out the types and semantics of the various windows | |
// Would love a combinator styled way to construct these similar to how ZIO looks. | |
// A window can be defined by the arbitrary combination of a set of conditions? | |
sealed trait Window | |
// todo - Joins | |
trait BoundedOperations { | |
def map[A, B](ds: Dataset[A])(f: A => B): Dataset[B] | |
def filter[A](ds: Dataset[A])(f: A => Boolean): Dataset[A] | |
def flatMap[A, B](ds: Dataset[A])(f: A => Dataset[B]): Dataset[B] | |
def fold[A, B](ds: Dataset[A])(initial: B)(f: (B, A) => B): Dataset[B] | |
def distinct[A](ds: Dataset[A]): Dataset[A] | |
def sort[A](ds: Dataset[A]): Dataset[A] | |
// todo - should these all be variadic? | |
def union[A](ds1: Dataset[A], dsx: Dataset[A]*): Dataset[A] | |
def intersect[A](ds1: Dataset[A], dsx: Dataset[A]*): Dataset[A] | |
def diff[A](ds1: Dataset[A], dsx: Dataset[A]*): Dataset[A] | |
// Aggregations | |
def groupBy[A, B](ds: Dataset[A])(f: A => B): GroupedDataset[A, B] | |
// todo aggregate with many? applicative fusion? Whats the right type signature for this? How do we make it variadic and maintain types? | |
def aggregate[A, B, C](gds: GroupedDataset[A, B])(zero: C)(f: (C, A) => C): Dataset[C] | |
def toDataStream[A](ds: Dataset[A]): DataStream[A] | |
// todo - should this provide a Stream[A] to avoid potential OOM errors? We can then let users chunk the results etc? | |
def collect[A](ds: Dataset[A]): IO[Error, List[A]] | |
} | |
// todo - Joins | |
trait UnboundedOperations { | |
def map[A, B](ds: DataStream[A])(f: A => B): DataStream[B] | |
def filter[A](ds: DataStream[A])(f: A => Boolean): DataStream[A] | |
def flatMap[A, B](ds: DataStream[A])(f: A => DataStream[B]): DataStream[B] | |
// Split and merge streams - todo should this be done by arrows? | |
// todo not sure of the semantics of this | |
def union[A](ds1: DataStream[A], dsx: DataStream[A]*): DataStream[A] | |
// todo think about "keyed streams". Flink uses them to run parallel computations on some key. | |
// todo Can we define this in terms of arrow ops instead? We already have a split there. | |
// todo can we get rid of this by just letting users reuse the same `val`? If they never converge again, how do we know to "key by" rather than split the streams? | |
def split[A, B](ds: DataStream[A])(f: A => B): List[DataStream[A]] | |
// Streaming aggregations | |
def scanLeft[A, B](ds: DataStream[A])(initial: B)(f: (B, A) => B): DataStream[B] | |
def aggregation[A, B](ds: DataStream[A])(f: (A, A) => A): DataStream[A] | |
// Windowed aggregations | |
def fold[A, B](ds: WindowedDataStream[A])(initial: B)(f: (B, A) => B): DataStream[A] | |
def reduce[A, B](ds: WindowedDataStream[A])(f: (A, A) => A): DataStream[A] | |
def distinct[A](ds: WindowedDataStream[A]): DataStream[A] | |
def window[A](ds: DataStream[A], window: Window): WindowedDataStream[A] | |
// todo - this can't be a moving window | |
def toDataset[A](ds: DataStream[A], window: Window): Dataset[A] | |
def collect[A](ds: Dataset[A]): IO[Error, Stream[A]] | |
} | |
// todo - do we want to support Dataset <-> Datastream joins as well? Probably (standard "stream enrichment" use case) | |
// Syntax for DataStreams/Datasets | |
implicit class DatasetSyntax[A](ds: Dataset[A]) { | |
def map[B](f: A => B): Dataset[B] = ??? | |
def filter(f: A => Boolean): Dataset[A] = ??? | |
def flatMap[B](f: A => Dataset[B]): Dataset[B] = ??? | |
def fold[B](initial: B)(f: (B, A) => B): Dataset[B] = ??? | |
def distinct: Dataset[A] = ??? | |
def sort: Dataset[A] = ??? | |
def union(dsx: Dataset[A]*): Dataset[A] = ??? | |
def intersect(dsx: Dataset[A]*): Dataset[A] = ??? | |
def diff(dsx: Dataset[A]*): Dataset[A] = ??? | |
def groupBy[B](f: A => B): GroupedDataset[A, B] = ??? | |
def toDataStream: DataStream[A] = ??? | |
def collect: IO[Error, List[A]] = ??? | |
} | |
implicit class GroupedDatasetSyntax[A, B](ds: GroupedDataset[A, B]) { | |
def aggregate[C](zero: C)(f: (C, A) => C): Dataset[C] = ??? | |
} | |
implicit class DataStreamSyntax[A](ds: DataStream[A]) { | |
def map[B](f: A => B): DataStream[B] = ??? | |
def filter(f: A => Boolean): DataStream[A] = ??? | |
def flatMap[B](f: A => DataStream[B]): DataStream[B] = ??? | |
def union(dsx: DataStream[A]*): DataStream[A] = ??? | |
def split[B](f: A => B): List[DataStream[A]] = ??? | |
def scanLeft[B](initial: B)(f: (B, A) => B): DataStream[B] = ??? | |
def aggregation[B](f: (A, A) => A): DataStream[A] = ??? | |
def window(window: Window): WindowedDataStream[A] = ??? | |
def toDataset(window: Window): Dataset[A] = ??? | |
def collect: IO[Error, Stream[A]] = ??? | |
} | |
implicit class WindowedDataStreamSyntax[A](ds: WindowedDataStream[A]) { | |
def fold[B](initial: B)(f: (B, A) => B): DataStream[A] = ??? | |
def reduce[B](f: (A, A) => A): DataStream[A] = ??? | |
def distinct: DataStream[A] = ??? | |
} | |
// ---------- Usage examples ---------- | |
val stream: DataStream[Int] = ??? | |
val file: Dataset[Int] = ??? | |
stream | |
.window(new Window {}) | |
.fold(0)(_ + _) | |
.toDataset(new Window {}) | |
.collect | |
file | |
.map(i => (i, "abc", 7L)) | |
.groupBy(_._2) | |
.aggregate(List.empty[String])((acc, curr) => s"${curr._1} + ${curr._2} + ${curr._3}" +: acc) | |
.toDataStream | |
.window(new Window {}) | |
.reduce(_ ++ _) | |
.collect |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment