Last active
August 29, 2015 14:04
-
-
Save ankurdave/f5d4df4b521ac83b9c7d to your computer and use it in GitHub Desktop.
Abstracting over RDD and DStream in Scala - in response to http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANDWdjZ1ZUGSPsAM87J3cRNrh7N4PVqEBk+b_GQ-_8M_4bAgRw@mail.gmail.com%3E
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.language.higherKinds | |
// Common interface of RDD and DStream. Note the Coll type parameter, which will either be RDD or DStream. | |
trait DistributedCollection[A, Coll[_]] { | |
def map[B](f: A => B): Coll[B] | |
} | |
class RDD[A](val x: A) extends DistributedCollection[A, RDD] { | |
def map[B](f: A => B): RDD[B] = new RDD(f(x)) | |
} | |
class DStream[A](val y: A) extends DistributedCollection[A, DStream] { | |
def map[B](f: A => B): DStream[B] = new DStream(f(y)) | |
} | |
// An example method that operates on either RDD or DStream. The Coll type parameter can either be | |
// RDD or DStream, and this constraint is expressed using the "<:" type bound. | |
def transform[Coll[X] <: DistributedCollection[X, Coll]](coll: Coll[Int]): Coll[String] = | |
coll.map(_ + 1).map(_.toString) | |
// Example of the method's usage | |
val rdd = new RDD[Int](1) | |
val dstream = new DStream[Int](2) | |
transform(rdd).x | |
transform(dstream).y |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Toy definitions of RDD and DStream | |
class RDD[A](val x: A) { | |
def map[B](f: A => B): RDD[B] = new RDD(f(x)) | |
} | |
class DStream[A](val y: A) { | |
def map[B](f: A => B): DStream[B] = new DStream(f(y)) | |
} | |
import scala.language.higherKinds | |
import scala.language.implicitConversions | |
// The common interface for RDD and DStream | |
trait DistributedCollection[A, Coll[_]] { | |
def map[B](f: A => B): Coll[B] | |
} | |
// The typeclass that allows converting RDD and DStream to DistributedCollection | |
trait ToDistributedCollection[Coll[_]] { | |
def apply[A](c: Coll[A]): DistributedCollection[A, Coll] | |
} | |
// Instances of the typeclass for RDD and DStream | |
implicit object RddToDistributedCollection extends ToDistributedCollection[RDD] { | |
def apply[A](rdd: RDD[A]) = new DistributedCollection[A, RDD] { | |
def map[B](f: A => B): RDD[B] = rdd.map(f) | |
} | |
} | |
implicit object DstreamToDistributedCollection extends ToDistributedCollection[DStream] { | |
def apply[A](dstream: DStream[A]) = new DistributedCollection[A, DStream] { | |
def map[B](f: A => B): DStream[B] = dstream.map(f) | |
} | |
} | |
// An example method that operates on either RDD or DStream. The implicit parameter allows | |
// converting a Coll to a DistributedCollection, allowing us to operate on it. When chaining | |
// transformations, we have to use it twice. | |
def transform[Coll[_]](coll: Coll[Int])(implicit dc: ToDistributedCollection[Coll]): Coll[String] = | |
dc(dc(coll).map(_ + 1)).map(_.toString) | |
// Example of the method's usage | |
val rdd = new RDD[Int](1) | |
val dstream = new DStream[Int](2) | |
transform(rdd).x | |
transform(dstream).y |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment