Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created July 20, 2016 16:57
Show Gist options
  • Select an option

  • Save johnynek/9a8a6fa75d22a965a501568307a2b9fc to your computer and use it in GitHub Desktop.

Select an option

Save johnynek/9a8a6fa75d22a965a501568307a2b9fc to your computer and use it in GitHub Desktop.
package com.stripe.scorching
import com.twitter.scalding.serialization.{OrderedSerialization, Serialization}
sealed trait Collection[+T] {
import Collection._
def concatMap[U](f: T => TraversableOnce[U]): Collection[U] = ConcatMap(this, f)
def group[K, V](implicit ord: OrderedSerialization[K],
vser: Serialization[V],
ev: T <:< (K, V)): Grouping[K, V] =
GroupProof[K, V](this.asInstanceOf[Collection[(K, V)]], ord, vser)
def map[U](f: T => U): Collection[U] = Map(this, f)
def write(sink: Sink[T]): Action[Unit] = WriteAction(this, sink)
}
sealed trait Grouping[K, +V] {
import Collection._
def mapGroup[U](f: (K, Iterator[V]) => Iterator[U]): Grouping[K, U] = MapGroup(this, f)
def cogroup[U, R](that: Grouping[K, U])(f: (K, Iterator[V], Iterable[U]) => Iterator[R]): Grouping[K, R] = CoGroup(this, that, f)
def ungroup: Collection[(K, V)] = Ungrouped(this)
}
trait Sink[-T]
trait Source[+T]
sealed trait Action[+T] {
import Collection._
def map[U](f: T => U): Action[U] = MapAction(this, f)
def zip[U](that: Action[U]): Action[(T, U)] = ZipAction(this, that)
def flatMap[U](next: T => Action[U]): Action[U] = FlatMapAction(this, next)
}
object Collection {
def read[T](source: Source[T]): Collection[T] = Read(source)
implicit class InvariantMethods[T](s: Collection[T]) {
def forceToDisk(implicit ser: Serialization[T]): Collection[T] = Forced(s, ser)
}
private[scorching] case class Read[T](source: Source[T]) extends Collection[T]
private[scorching] case class Map[A, B](s: Collection[A], f: A => B) extends Collection[B]
private[scorching] case class ConcatMap[A, B](s: Collection[A], f: A => TraversableOnce[B]) extends Collection[B]
private[scorching] case class Forced[T](s: Collection[T], ser: Serialization[T]) extends Collection[T]
private[scorching] case class Ungrouped[K, V](g: Grouping[K, V]) extends Collection[(K, V)]
private[scorching] case class GroupProof[K, V](c: Collection[(K, V)],
k: OrderedSerialization[K],
v: Serialization[V]) extends Grouping[K, V]
private[scorching] case class MapGroup[K, V, W](g: Grouping[K, V], f: (K, Iterator[V]) => Iterator[W]) extends Grouping[K, W]
private[scorching] case class CoGroup[K, V, W, R](left: Grouping[K, V],
right: Grouping[K, W],
fn: (K, Iterator[V], Iterable[W]) => Iterator[R]) extends Grouping[K, R]
private[scorching] case class MapAction[A, B](first: Action[A], next: A => B) extends Action[B]
private[scorching] case class ZipAction[A, B](left: Action[A], right: Action[B]) extends Action[(A, B)]
private[scorching] case class FlatMapAction[A, B](first: Action[A], next: A => Action[B]) extends Action[B]
private[scorching] case class WriteAction[T](c: Collection[T], sink: Sink[T]) extends Action[Unit]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment