Last active
May 10, 2017 08:52
-
-
Save elyast/2e9d614e9f73331cd6c2 to your computer and use it in GitHub Desktop.
Shapeless usage in spark pipelines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This allow to transform single entity (from a given RDD or DStream) in different ways | |
// using HList of mappers and bind them into given output type | |
// potentially useful when doing feature extraction out of single entity / basic etl | |
// Usage: | |
import twitter4j.Status | |
import shapeless._ | |
val user: Status => String = ... // a function that extract author of tweet | |
val tags: Status => List[String] = ... // a function that gets tags from tweet | |
case class UserTags(user: String, tags: List[String]) | |
val streamOfTweets: DStream[Status] = ... | |
stream.map { value => | |
from(user :: tags :: HNil).to[UserTags] | |
} | |
// obviously if map functions doesnt match with case class we are getting compilation error, | |
// and now the implementation part | |
import shapeless._ | |
import scala.collection.immutable.{:: => Cons} | |
case class from[A, L <: HList, M <: HList](mappers: L)(implicit tr: Applicator.Aux[A, L, M]) { | |
def to[B](value: A)(implicit gen: Generic.Aux[B, M]): B = { | |
gen.from(tr(value, mappers)) | |
} | |
} | |
trait Applicator[A, L <: HList] extends DepFn2[A, L] with Serializable { | |
type Out <: HList | |
} | |
object Applicator { | |
def apply[A, L <: HList](implicit tr: Applicator[A, L]): Aux[A, L, tr.Out] = tr | |
type Aux[A, L <: HList, Out0 <: HList] = Applicator[A, L] {type Out = Out0} | |
implicit def hnilApplicator[A]: Aux[A, HNil, HNil] = | |
new Applicator[A, HNil] { | |
type Out = HNil | |
def apply(elem: A, l: HNil): Out = l | |
} | |
implicit def hlistApplicator[A, H, T <: HList] | |
(implicit st: Applicator[A, T]): Aux[A, (A => H) :: T, H :: st.Out] = | |
new Applicator[A, (A => H) :: T] { | |
type Out = H :: st.Out | |
def apply(elem: A, l: (A => H) :: T) = l.head(elem) :: st(elem, l.tail) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment