Skip to content

Instantly share code, notes, and snippets.

@prateek
Forked from johnynek/MapReduceTypeClasses.scala
Last active August 29, 2015 14:08
Show Gist options
  • Save prateek/751390ff95d3de0f35c6 to your computer and use it in GitHub Desktop.
Save prateek/751390ff95d3de0f35c6 to your computer and use it in GitHub Desktop.
package mapreduce
/**
* This is an attempt to find a minimal set of type classes that describe the map-reduce programming model
* (the underlying model of Google map/reduce, Hadoop, Spark and others)
* The idea is to have:
* 1) lawful types that fully constrain correctness
* 2) a minimal set of laws (i.e. we can't remove any laws,
* 3) able to fully express existing map/reduce in terms of these types
*
* This is just a draft
*/
/**
* law 0: filter(from(l))(p) == from(l.filter(p))
* law 1: concat(from(l1), from(l1)) == from(l1 ++ l2)
* law 2: concat(filter(f1)(p), filter(f2)(p)) == filter(concat(f1, f2))(p)
*/
trait Filterable[F, T] {
def from(iter: Iterable[T]): F
def filter(f: F)(pred: T => Boolean): F
def concat(a: F, b: F): F
}
/**
* law 0: concatMap(filterable.from(it))(fn) == filterable.from(it.flatMap(fn))
* law 1: concatMap(concatMap(m)(fn1))(fn2) == concatMap(m)(fn1(_).flatMap(fn2))
*/
trait ConcatMappable[M[+_]] {
def concatMap[T, U](m: M[T])(fn: T => Iterable[U]): M[U]
// must also be filterable
def filterable[T]: Filterable[M[T], T]
}
/**
* law 0: mapGroup(filterable.from(kvs))(fn) ==
* filterable.from(kvs.groupBy(_._1).mapValues { kvs =>
* val k = kvs.head._1
* fn(k, kvs.map(_._2))
* }
* law 1: mapGroup(mapGroup(r)(fn1))(fn2) = mapGroup(r) { (k, vs) => fn2(k, fn1(k, vs)) }
*/
trait KeyedReducer[R[_, +_]] {
def mapGroup[K, V, W](r: R[K, V])(fn: (K, Iterable[V]) => Iterable[W]): R[K, W]
def filterable[K, V]: Filterable[R[K, V], (K, V)]
}
trait Grouper[M[_], R[_, _]] {
def group[K: Ordering, V](m: M[(K, V)]): R[K, V]
}
/** Usual monad laws */
trait Monad[M[+_]] {
def apply[T](t: T): M[T]
def flatMap[T, U](m: M[T])(fn: T => M[U]): M[U]
}
/**
* law 0: write(m, name).flatMap(read(_, name)) == monad.apply(m)
* law 1: write(group(m), name).flatMap(read(_, name)) == monad.apply(m)
*/
trait Executor[X[+_], S[_], N] {
def monad: Monad[X]
def read[M[+_]: ConcatMappable, T](input: S[T], name: N): X[M[T]]
def write[M[+_]: ConcatMappable, T](m: M[T], name: N): X[S[T]]
def write[R[_, +_]: KeyedReducer, K, V](r: R[K, V], name: N): X[S[(K, V)]]
}
trait Engine {
// Type for the mapper operation
type M[+_]
// Type for the reducer operation
type R[_, +_]
// Type for the monad the executes reads and writes
type X[+_]
// The source/sink type
type S[_]
// the name type for sources and sinks
type N
def concatMappable: ConcatMappable[M]
def keyedReducer: KeyedReducer[R]
def grouper: Grouper[M, R]
def executor: Executor[X, S, N]
}
/**
* Example in scalding
*
trait ScaldingEngine extends Engine {
type M[T] = TypedPipe[T]
type R[K, V] = KeyedList[K, V]
type X[T] = Execution[T]
type S[T] = Source[T] with Sink[T]
type N = Path
}
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment