-
-
Save Atry/3b322592eddd5483e6e7c0cc35530003 to your computer and use it in GitHub Desktop.
having monad instances for RDD like things
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object RDDasMonadPlus { | |
import org.apache.spark.{ SparkContext } | |
import org.apache.spark.rdd.RDD | |
import scalaz._ | |
import Scalaz._ | |
import scala.reflect.ClassTag | |
// RDDMPlus is the type for which we will define the Monad instance. it can be | |
// constructed from an RDD using the RDDClassTag constructor. this | |
// implementation is based on insights from | |
// <http://okmij.org/ftp/Haskell/set-monad.html#PE> | |
sealed trait RDDMPlus[A] | |
// wherever ClassTag[A] is available use this constructor. specifically, | |
// wherever you are generating RDD, you should be generating RDDClassTag | |
// instead since the fact you are able to generate an RDD implies the | |
// pertinent ClassTag is already in scope. | |
case class RDDClassTag[A: ClassTag](rdd: RDD[A]) extends RDDMPlus[A] { | |
def ct: ClassTag[A] = implicitly[ClassTag[A]] | |
} | |
// use RDDAny only when no ClassTag[A] is available. | |
case class RDDAny[A](list: List[A]) extends RDDMPlus[A] | |
// the below is not needed for MonadPlus instance, but is useful | |
implicit class RDDMPlusOps[A](val self: RDDMPlus[A]) extends AnyVal { | |
def toRDD(sc: SparkContext)(implicit ev: ClassTag[A]): RDD[A] = self match { | |
case RDDAny(xs) => sc.parallelize(xs) | |
case RDDClassTag(rdd) => rdd | |
} | |
} | |
implicit def MonadPlusRDDInstance(implicit sc: SparkContext) = new MonadPlus[RDDMPlus] { | |
// Members declared in scalaz.PlusEmpty | |
def empty[A]: RDDMPlus[A] = RDDAny(Nil) | |
// Members declared in scalaz.Plus | |
def plus[A](x: RDDMPlus[A], y: => RDDMPlus[A]): RDDMPlus[A] = x match { | |
// the goal here is to end up with RDDClassTag wherever ClassTag[A] is | |
// accessible, thereby propagating that information | |
case RDDAny(xs) => y match { | |
case RDDAny(ys) => RDDAny(xs ++ ys) | |
case yct @ RDDClassTag(ys) => { | |
implicit val classTag = yct.ct | |
RDDClassTag(sc.parallelize(xs) ++ ys) | |
} | |
} | |
case xct @ RDDClassTag(xs) => { | |
implicit val classTag = xct.ct | |
y match { | |
case RDDAny(ys) => RDDClassTag(xs ++ sc.parallelize(ys)) | |
case RDDClassTag(ys) => RDDClassTag(xs ++ ys) | |
} | |
} | |
} | |
// Members declared in scalaz.Applicative | |
def point[A](a: => A): RDDMPlus[A] = RDDAny(List(a)) | |
// Members declared in scalaz.Bind | |
def bind[A, B](fa: RDDMPlus[A])(f: A => RDDMPlus[B]): RDDMPlus[B] = fa match { | |
case RDDClassTag(r) => r.map(f).fold(empty)(plus(_, _)) | |
case RDDAny(l) => l.map(f).fold(empty)(plus(_, _)) | |
} | |
} | |
} | |
class RDDMonadPlusUsage(sc: org.apache.spark.SparkContext) { | |
import org.apache.spark.{ SparkContext } | |
import org.apache.spark.rdd.RDD | |
import scalaz._ | |
import Scalaz._ | |
// some generic function which requires Monad instance | |
def foo[R[_]: Monad, A, B](ma: R[A])(f: A => R[B]): R[B] = { | |
implicitly[Monad[R]].bind[A, B](ma)(f) | |
} | |
import RDDasMonadPlus._ // bring the MonadPlusRDDInstance implicit in scope | |
implicit val impSc = sc // allow the implicit to be triggered | |
// lets try and use foo | |
def bar[A, B](ra: RDDMPlus[A])(f: A => RDDMPlus[B]): RDDMPlus[B] = { | |
// at this point, the implicit can kick in and the call to foo succeeds | |
// below | |
foo(ra)(f) | |
} | |
// let's say that we had something in the form of RDDs can we still use | |
// implicit? the answer is yes, so long we either have concrete types (instead | |
// of type variables) or the ClassTag is available. of course, if neither of | |
// those conditions are met you can always convert to List and use RDDAny, but | |
// that's not something you should be doing | |
def baz(ra: RDD[Int])(f: Int => RDD[Double]): RDD[Double] = { | |
// first we need to transform inputs to RDDMPlus | |
val ma: RDDMPlus[Int] = RDDClassTag(ra) // this works because we have a | |
// known specific type RDD[Int] for | |
// ra | |
val g: Int => RDDMPlus[Double] = f andThen RDDClassTag.apply | |
bar(ma)(g). // now we can invoke bar | |
toRDD(sc) // and then finally convert back to RDD | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment