-
-
Save prateek/751390ff95d3de0f35c6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mapreduce | |
/** | |
* This is an attempt to find a minimal set of type classes that describe the map-reduce programming model | |
* (the underlying model of Google map/reduce, Hadoop, Spark and others) | |
* The idea is to have: | |
* 1) lawful types that fully constrain correctness | |
* 2) a minimal set of laws (i.e. we can't remove any laws, | |
* 3) able to fully express existing map/reduce in terms of these types | |
* | |
* This is just a draft | |
*/ | |
/** | |
* law 0: filter(from(l))(p) == from(l.filter(p)) | |
* law 1: concat(from(l1), from(l1)) == from(l1 ++ l2) | |
* law 2: concat(filter(f1)(p), filter(f2)(p)) == filter(concat(f1, f2))(p) | |
*/ | |
trait Filterable[F, T] { | |
def from(iter: Iterable[T]): F | |
def filter(f: F)(pred: T => Boolean): F | |
def concat(a: F, b: F): F | |
} | |
/** | |
* law 0: concatMap(filterable.from(it))(fn) == filterable.from(it.flatMap(fn)) | |
* law 1: concatMap(concatMap(m)(fn1))(fn2) == concatMap(m)(fn1(_).flatMap(fn2)) | |
*/ | |
trait ConcatMappable[M[+_]] { | |
def concatMap[T, U](m: M[T])(fn: T => Iterable[U]): M[U] | |
// must also be filterable | |
def filterable[T]: Filterable[M[T], T] | |
} | |
/** | |
* law 0: mapGroup(filterable.from(kvs))(fn) == | |
* filterable.from(kvs.groupBy(_._1).mapValues { kvs => | |
* val k = kvs.head._1 | |
* fn(k, kvs.map(_._2)) | |
* } | |
* law 1: mapGroup(mapGroup(r)(fn1))(fn2) = mapGroup(r) { (k, vs) => fn2(k, fn1(k, vs)) } | |
*/ | |
trait KeyedReducer[R[_, +_]] { | |
def mapGroup[K, V, W](r: R[K, V])(fn: (K, Iterable[V]) => Iterable[W]): R[K, W] | |
def filterable[K, V]: Filterable[R[K, V], (K, V)] | |
} | |
trait Grouper[M[_], R[_, _]] { | |
def group[K: Ordering, V](m: M[(K, V)]): R[K, V] | |
} | |
/** Usual monad laws */ | |
trait Monad[M[+_]] { | |
def apply[T](t: T): M[T] | |
def flatMap[T, U](m: M[T])(fn: T => M[U]): M[U] | |
} | |
/** | |
* law 0: write(m, name).flatMap(read(_, name)) == monad.apply(m) | |
* law 1: write(group(m), name).flatMap(read(_, name)) == monad.apply(m) | |
*/ | |
trait Executor[X[+_], S[_], N] { | |
def monad: Monad[X] | |
def read[M[+_]: ConcatMappable, T](input: S[T], name: N): X[M[T]] | |
def write[M[+_]: ConcatMappable, T](m: M[T], name: N): X[S[T]] | |
def write[R[_, +_]: KeyedReducer, K, V](r: R[K, V], name: N): X[S[(K, V)]] | |
} | |
trait Engine { | |
// Type for the mapper operation | |
type M[+_] | |
// Type for the reducer operation | |
type R[_, +_] | |
// Type for the monad the executes reads and writes | |
type X[+_] | |
// The source/sink type | |
type S[_] | |
// the name type for sources and sinks | |
type N | |
def concatMappable: ConcatMappable[M] | |
def keyedReducer: KeyedReducer[R] | |
def grouper: Grouper[M, R] | |
def executor: Executor[X, S, N] | |
} | |
/** | |
* Example in scalding | |
* | |
trait ScaldingEngine extends Engine { | |
type M[T] = TypedPipe[T] | |
type R[K, V] = KeyedList[K, V] | |
type X[T] = Execution[T] | |
type S[T] = Source[T] with Sink[T] | |
type N = Path | |
} | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment