Skip to content

Instantly share code, notes, and snippets.

@felipehummel
Created April 1, 2017 21:14
Show Gist options
  • Save felipehummel/1e804c19bd80121547634ce59765fff8 to your computer and use it in GitHub Desktop.
Save felipehummel/1e804c19bd80121547634ce59765fff8 to your computer and use it in GitHub Desktop.
Spark + quill Proof of concept
package quill_spark.poc
import java.util.{UUID, Date}
import java.time.LocalDate
import io.getquill._
import io.getquill.context.Context
import io.getquill.context.sql.SqlContext
import io.getquill.idiom.{ Idiom => BaseIdiom }
import io.getquill.context.mirror.Row
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.SparkConf
import scala.reflect.ClassTag
import scala.util.{Success, Failure, Try}
import scala.reflect.runtime.universe._
case class User(name: String, age: Int)
case class UserJob(name: String, job: String)
trait MirrorEncoders {
this: SparkQuillContext[_, _] =>
override type Encoder[T] = MirrorEncoder[T]
case class MirrorEncoder[T](encoder: BaseEncoder[T]) extends BaseEncoder[T] {
override def apply(index: Index, value: T, row: PrepareRow) =
encoder(index, value, row)
}
def encoder[T]: Encoder[T] = MirrorEncoder((index: Index, value: T, row: PrepareRow) => row.add(value))
implicit def mappedEncoder[I, O](implicit mapped: MappedEncoding[I, O], e: Encoder[O]): Encoder[I] =
MirrorEncoder((index: Index, value: I, row: PrepareRow) => e(index, mapped.f(value), row))
implicit def optionEncoder[T](implicit d: Encoder[T]): Encoder[Option[T]] =
MirrorEncoder((index: Index, value: Option[T], row: PrepareRow) => {
value match {
case None => row.add(None)
case Some(v) => row.add(d(index, v, Row()).data.headOption)
}
})
implicit val stringEncoder: Encoder[String] = encoder[String]
implicit val bigDecimalEncoder: Encoder[BigDecimal] = encoder[BigDecimal]
implicit val booleanEncoder: Encoder[Boolean] = encoder[Boolean]
implicit val byteEncoder: Encoder[Byte] = encoder[Byte]
implicit val shortEncoder: Encoder[Short] = encoder[Short]
implicit val intEncoder: Encoder[Int] = encoder[Int]
implicit val longEncoder: Encoder[Long] = encoder[Long]
implicit val floatEncoder: Encoder[Float] = encoder[Float]
implicit val doubleEncoder: Encoder[Double] = encoder[Double]
implicit val byteArrayEncoder: Encoder[Array[Byte]] = encoder[Array[Byte]]
implicit val dateEncoder: Encoder[Date] = encoder[Date]
implicit val localDateEncoder: Encoder[LocalDate] = encoder[LocalDate]
implicit val uuidEncoder: Encoder[UUID] = encoder[UUID]
}
trait MirrorDecoders {
this: SparkQuillContext[_, _] =>
override type Decoder[T] = MirrorDecoder[T]
case class MirrorDecoder[T](decoder: BaseDecoder[T]) extends BaseDecoder[T] {
override def apply(index: Index, row: ResultRow) =
decoder(index, row)
}
def decoder[T: ClassTag]: Decoder[T] = MirrorDecoder((index: Index, row: ResultRow) => row[T](index))
implicit def mappedDecoder[I, O](implicit mapped: MappedEncoding[I, O], d: Decoder[I]): Decoder[O] =
MirrorDecoder((index: Index, row: ResultRow) => mapped.f(d.apply(index, row)))
implicit def optionDecoder[T](implicit d: Decoder[T]): Decoder[Option[T]] =
MirrorDecoder((index: Index, row: ResultRow) =>
row[Option[Any]](index) match {
case Some(v) => Some(d(0, Row(v)))
case None => None
})
implicit val stringDecoder: Decoder[String] = decoder[String]
implicit val bigDecimalDecoder: Decoder[BigDecimal] = decoder[BigDecimal]
implicit val booleanDecoder: Decoder[Boolean] = decoder[Boolean]
implicit val byteDecoder: Decoder[Byte] = decoder[Byte]
implicit val shortDecoder: Decoder[Short] = decoder[Short]
implicit val intDecoder: Decoder[Int] = decoder[Int]
implicit val longDecoder: Decoder[Long] = decoder[Long]
implicit val floatDecoder: Decoder[Float] = decoder[Float]
implicit val doubleDecoder: Decoder[Double] = decoder[Double]
implicit val byteArrayDecoder: Decoder[Array[Byte]] = decoder[Array[Byte]]
implicit val dateDecoder: Decoder[Date] = decoder[Date]
implicit val localDateDecoder: Decoder[LocalDate] = decoder[LocalDate]
implicit val uuidDecoder: Decoder[UUID] = decoder[UUID]
}
class SparkQuillContext[Idiom <: BaseIdiom, Naming <: NamingStrategy](spark: SparkSession)
extends Context[Idiom, Naming]
with MirrorEncoders
with MirrorDecoders
with SqlContext[Idiom, Naming] {
override type PrepareRow = Row
override type ResultRow = Row
override type RunQueryResult[T] = Dataset[T]
override type RunQuerySingleResult[T] = QueryMirror[T]
override type RunActionResult = ActionMirror
override type RunActionReturningResult[T] = ActionReturningMirror[T]
override type RunBatchActionResult = BatchActionMirror
override type RunBatchActionReturningResult[T] = BatchActionReturningMirror[T]
override def close = ()
def probe(statement: String): Try[_] =
if (statement.contains("Fail"))
Failure(new IllegalStateException("The ast contains 'Fail'"))
else
Success(())
def transaction[T](f: => T) = f
case class ActionMirror(string: String, prepareRow: PrepareRow)
case class ActionReturningMirror[T](string: String, prepareRow: Row, extractor: Row => T, returningColumn: String)
case class BatchActionMirror(groups: List[(String, List[Row])])
case class BatchActionReturningMirror[T](groups: List[(String, String, List[Row])], extractor: Row => T)
case class QueryMirror[T](string: String, prepareRow: Row, extractor: Row => T)
// TODO: this should accept any type, not only Products. But I can't make the compiler search spark.implicits._ for the Encoder[T]
def executeQuery[T <: Product : TypeTag](string: String, prepare: Row => Row = identity, extractor: Row => T = identity[Row] _) = {
spark.sql(string).as[T](spark.implicits.newProductEncoder[T])
}
def executeQuerySingle[T](string: String, prepare: Row => Row = identity, extractor: Row => T = identity[Row] _) = ???
def executeAction(string: String, prepare: Row => Row = identity) = ???
def executeActionReturning[O](string: String, prepare: Row => Row = identity, extractor: Row => O, returningColumn: String) = ???
def executeBatchAction(groups: List[BatchGroup]) = ???
def executeBatchActionReturning[T](groups: List[BatchGroupReturning], extractor: Row => T) = ???
}
object Main {
def main(args: Array[String]) {
val conf = new SparkConf().
setMaster("local[*]").
setAppName("test").
set("spark.ui.enabled", "false")
val spark = SparkSession
.builder()
.config(conf)
.appName("SparkSessionZipsExample")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val usersDs: Dataset[User] =
Seq(
User("Felipe", 28),
User("Joao", 50),
User("Pedro", 10)).toDS()
val userJobsDs: Dataset[UserJob] =
Seq(
UserJob("Felipe", "programmer"),
UserJob("Joao", "medic"),
UserJob("Pedro", "driver")).toDS()
val ctx = new SparkQuillContext[MySQLDialect, SnakeCase with PluralizedTableNames](spark)
import ctx._
//TODO: find a way to put the logic below inside Context:
// Also, the name of the table should be derived automatically
usersDs.createOrReplaceTempView("users")
userJobsDs.createOrReplaceTempView("user_jobs")
// ----
val hasDrinkingAge = quote {
for {
// TODO: can we use a Dataset[User] object instead of query[User]?
(u, j) <- query[User].join(query[UserJob]).on( (u, j) => u.name == j.name) if u.age > 18
} yield (u.name, u.age, j.job)
}
// val hasDrinkingAge = quote { query[User].filter(_.age > 18) }
val r = ctx.run(hasDrinkingAge)
r.show()
spark.close()
}
def jobDefinition(deps: Map[String, Dataset[_]]): Dataset[_] = {
deps("")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment