Created
April 1, 2017 21:14
-
-
Save felipehummel/1e804c19bd80121547634ce59765fff8 to your computer and use it in GitHub Desktop.
Spark + quill Proof of concept
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package quill_spark.poc | |
import java.util.{UUID, Date} | |
import java.time.LocalDate | |
import io.getquill._ | |
import io.getquill.context.Context | |
import io.getquill.context.sql.SqlContext | |
import io.getquill.idiom.{ Idiom => BaseIdiom } | |
import io.getquill.context.mirror.Row | |
import org.apache.spark.sql.{Dataset, SparkSession} | |
import org.apache.spark.SparkConf | |
import scala.reflect.ClassTag | |
import scala.util.{Success, Failure, Try} | |
import scala.reflect.runtime.universe._ | |
case class User(name: String, age: Int) | |
case class UserJob(name: String, job: String) | |
trait MirrorEncoders { | |
this: SparkQuillContext[_, _] => | |
override type Encoder[T] = MirrorEncoder[T] | |
case class MirrorEncoder[T](encoder: BaseEncoder[T]) extends BaseEncoder[T] { | |
override def apply(index: Index, value: T, row: PrepareRow) = | |
encoder(index, value, row) | |
} | |
def encoder[T]: Encoder[T] = MirrorEncoder((index: Index, value: T, row: PrepareRow) => row.add(value)) | |
implicit def mappedEncoder[I, O](implicit mapped: MappedEncoding[I, O], e: Encoder[O]): Encoder[I] = | |
MirrorEncoder((index: Index, value: I, row: PrepareRow) => e(index, mapped.f(value), row)) | |
implicit def optionEncoder[T](implicit d: Encoder[T]): Encoder[Option[T]] = | |
MirrorEncoder((index: Index, value: Option[T], row: PrepareRow) => { | |
value match { | |
case None => row.add(None) | |
case Some(v) => row.add(d(index, v, Row()).data.headOption) | |
} | |
}) | |
implicit val stringEncoder: Encoder[String] = encoder[String] | |
implicit val bigDecimalEncoder: Encoder[BigDecimal] = encoder[BigDecimal] | |
implicit val booleanEncoder: Encoder[Boolean] = encoder[Boolean] | |
implicit val byteEncoder: Encoder[Byte] = encoder[Byte] | |
implicit val shortEncoder: Encoder[Short] = encoder[Short] | |
implicit val intEncoder: Encoder[Int] = encoder[Int] | |
implicit val longEncoder: Encoder[Long] = encoder[Long] | |
implicit val floatEncoder: Encoder[Float] = encoder[Float] | |
implicit val doubleEncoder: Encoder[Double] = encoder[Double] | |
implicit val byteArrayEncoder: Encoder[Array[Byte]] = encoder[Array[Byte]] | |
implicit val dateEncoder: Encoder[Date] = encoder[Date] | |
implicit val localDateEncoder: Encoder[LocalDate] = encoder[LocalDate] | |
implicit val uuidEncoder: Encoder[UUID] = encoder[UUID] | |
} | |
trait MirrorDecoders { | |
this: SparkQuillContext[_, _] => | |
override type Decoder[T] = MirrorDecoder[T] | |
case class MirrorDecoder[T](decoder: BaseDecoder[T]) extends BaseDecoder[T] { | |
override def apply(index: Index, row: ResultRow) = | |
decoder(index, row) | |
} | |
def decoder[T: ClassTag]: Decoder[T] = MirrorDecoder((index: Index, row: ResultRow) => row[T](index)) | |
implicit def mappedDecoder[I, O](implicit mapped: MappedEncoding[I, O], d: Decoder[I]): Decoder[O] = | |
MirrorDecoder((index: Index, row: ResultRow) => mapped.f(d.apply(index, row))) | |
implicit def optionDecoder[T](implicit d: Decoder[T]): Decoder[Option[T]] = | |
MirrorDecoder((index: Index, row: ResultRow) => | |
row[Option[Any]](index) match { | |
case Some(v) => Some(d(0, Row(v))) | |
case None => None | |
}) | |
implicit val stringDecoder: Decoder[String] = decoder[String] | |
implicit val bigDecimalDecoder: Decoder[BigDecimal] = decoder[BigDecimal] | |
implicit val booleanDecoder: Decoder[Boolean] = decoder[Boolean] | |
implicit val byteDecoder: Decoder[Byte] = decoder[Byte] | |
implicit val shortDecoder: Decoder[Short] = decoder[Short] | |
implicit val intDecoder: Decoder[Int] = decoder[Int] | |
implicit val longDecoder: Decoder[Long] = decoder[Long] | |
implicit val floatDecoder: Decoder[Float] = decoder[Float] | |
implicit val doubleDecoder: Decoder[Double] = decoder[Double] | |
implicit val byteArrayDecoder: Decoder[Array[Byte]] = decoder[Array[Byte]] | |
implicit val dateDecoder: Decoder[Date] = decoder[Date] | |
implicit val localDateDecoder: Decoder[LocalDate] = decoder[LocalDate] | |
implicit val uuidDecoder: Decoder[UUID] = decoder[UUID] | |
} | |
class SparkQuillContext[Idiom <: BaseIdiom, Naming <: NamingStrategy](spark: SparkSession) | |
extends Context[Idiom, Naming] | |
with MirrorEncoders | |
with MirrorDecoders | |
with SqlContext[Idiom, Naming] { | |
override type PrepareRow = Row | |
override type ResultRow = Row | |
override type RunQueryResult[T] = Dataset[T] | |
override type RunQuerySingleResult[T] = QueryMirror[T] | |
override type RunActionResult = ActionMirror | |
override type RunActionReturningResult[T] = ActionReturningMirror[T] | |
override type RunBatchActionResult = BatchActionMirror | |
override type RunBatchActionReturningResult[T] = BatchActionReturningMirror[T] | |
override def close = () | |
def probe(statement: String): Try[_] = | |
if (statement.contains("Fail")) | |
Failure(new IllegalStateException("The ast contains 'Fail'")) | |
else | |
Success(()) | |
def transaction[T](f: => T) = f | |
case class ActionMirror(string: String, prepareRow: PrepareRow) | |
case class ActionReturningMirror[T](string: String, prepareRow: Row, extractor: Row => T, returningColumn: String) | |
case class BatchActionMirror(groups: List[(String, List[Row])]) | |
case class BatchActionReturningMirror[T](groups: List[(String, String, List[Row])], extractor: Row => T) | |
case class QueryMirror[T](string: String, prepareRow: Row, extractor: Row => T) | |
// TODO: this should accept any type, not only Products. But I can't make the compiler search spark.implicits._ for the Encoder[T] | |
def executeQuery[T <: Product : TypeTag](string: String, prepare: Row => Row = identity, extractor: Row => T = identity[Row] _) = { | |
spark.sql(string).as[T](spark.implicits.newProductEncoder[T]) | |
} | |
def executeQuerySingle[T](string: String, prepare: Row => Row = identity, extractor: Row => T = identity[Row] _) = ??? | |
def executeAction(string: String, prepare: Row => Row = identity) = ??? | |
def executeActionReturning[O](string: String, prepare: Row => Row = identity, extractor: Row => O, returningColumn: String) = ??? | |
def executeBatchAction(groups: List[BatchGroup]) = ??? | |
def executeBatchActionReturning[T](groups: List[BatchGroupReturning], extractor: Row => T) = ??? | |
} | |
object Main { | |
def main(args: Array[String]) { | |
val conf = new SparkConf(). | |
setMaster("local[*]"). | |
setAppName("test"). | |
set("spark.ui.enabled", "false") | |
val spark = SparkSession | |
.builder() | |
.config(conf) | |
.appName("SparkSessionZipsExample") | |
.enableHiveSupport() | |
.getOrCreate() | |
import spark.implicits._ | |
val usersDs: Dataset[User] = | |
Seq( | |
User("Felipe", 28), | |
User("Joao", 50), | |
User("Pedro", 10)).toDS() | |
val userJobsDs: Dataset[UserJob] = | |
Seq( | |
UserJob("Felipe", "programmer"), | |
UserJob("Joao", "medic"), | |
UserJob("Pedro", "driver")).toDS() | |
val ctx = new SparkQuillContext[MySQLDialect, SnakeCase with PluralizedTableNames](spark) | |
import ctx._ | |
//TODO: find a way to put the logic below inside Context: | |
// Also, the name of the table should be derived automatically | |
usersDs.createOrReplaceTempView("users") | |
userJobsDs.createOrReplaceTempView("user_jobs") | |
// ---- | |
val hasDrinkingAge = quote { | |
for { | |
// TODO: can we use a Dataset[User] object instead of query[User]? | |
(u, j) <- query[User].join(query[UserJob]).on( (u, j) => u.name == j.name) if u.age > 18 | |
} yield (u.name, u.age, j.job) | |
} | |
// val hasDrinkingAge = quote { query[User].filter(_.age > 18) } | |
val r = ctx.run(hasDrinkingAge) | |
r.show() | |
spark.close() | |
} | |
def jobDefinition(deps: Map[String, Dataset[_]]): Dataset[_] = { | |
deps("") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment