Created
July 20, 2016 16:57
-
-
Save johnynek/9a8a6fa75d22a965a501568307a2b9fc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.stripe.scorching | |
| import com.twitter.scalding.serialization.{OrderedSerialization, Serialization} | |
| sealed trait Collection[+T] { | |
| import Collection._ | |
| def concatMap[U](f: T => TraversableOnce[U]): Collection[U] = ConcatMap(this, f) | |
| def group[K, V](implicit ord: OrderedSerialization[K], | |
| vser: Serialization[V], | |
| ev: T <:< (K, V)): Grouping[K, V] = | |
| GroupProof[K, V](this.asInstanceOf[Collection[(K, V)]], ord, vser) | |
| def map[U](f: T => U): Collection[U] = Map(this, f) | |
| def write(sink: Sink[T]): Action[Unit] = WriteAction(this, sink) | |
| } | |
| sealed trait Grouping[K, +V] { | |
| import Collection._ | |
| def mapGroup[U](f: (K, Iterator[V]) => Iterator[U]): Grouping[K, U] = MapGroup(this, f) | |
| def cogroup[U, R](that: Grouping[K, U])(f: (K, Iterator[V], Iterable[U]) => Iterator[R]): Grouping[K, R] = CoGroup(this, that, f) | |
| def ungroup: Collection[(K, V)] = Ungrouped(this) | |
| } | |
| trait Sink[-T] | |
| trait Source[+T] | |
| sealed trait Action[+T] { | |
| import Collection._ | |
| def map[U](f: T => U): Action[U] = MapAction(this, f) | |
| def zip[U](that: Action[U]): Action[(T, U)] = ZipAction(this, that) | |
| def flatMap[U](next: T => Action[U]): Action[U] = FlatMapAction(this, next) | |
| } | |
| object Collection { | |
| def read[T](source: Source[T]): Collection[T] = Read(source) | |
| implicit class InvariantMethods[T](s: Collection[T]) { | |
| def forceToDisk(implicit ser: Serialization[T]): Collection[T] = Forced(s, ser) | |
| } | |
| private[scorching] case class Read[T](source: Source[T]) extends Collection[T] | |
| private[scorching] case class Map[A, B](s: Collection[A], f: A => B) extends Collection[B] | |
| private[scorching] case class ConcatMap[A, B](s: Collection[A], f: A => TraversableOnce[B]) extends Collection[B] | |
| private[scorching] case class Forced[T](s: Collection[T], ser: Serialization[T]) extends Collection[T] | |
| private[scorching] case class Ungrouped[K, V](g: Grouping[K, V]) extends Collection[(K, V)] | |
| private[scorching] case class GroupProof[K, V](c: Collection[(K, V)], | |
| k: OrderedSerialization[K], | |
| v: Serialization[V]) extends Grouping[K, V] | |
| private[scorching] case class MapGroup[K, V, W](g: Grouping[K, V], f: (K, Iterator[V]) => Iterator[W]) extends Grouping[K, W] | |
| private[scorching] case class CoGroup[K, V, W, R](left: Grouping[K, V], | |
| right: Grouping[K, W], | |
| fn: (K, Iterator[V], Iterable[W]) => Iterator[R]) extends Grouping[K, R] | |
| private[scorching] case class MapAction[A, B](first: Action[A], next: A => B) extends Action[B] | |
| private[scorching] case class ZipAction[A, B](left: Action[A], right: Action[B]) extends Action[(A, B)] | |
| private[scorching] case class FlatMapAction[A, B](first: Action[A], next: A => Action[B]) extends Action[B] | |
| private[scorching] case class WriteAction[T](c: Collection[T], sink: Sink[T]) extends Action[Unit] | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment