Skip to content

Instantly share code, notes, and snippets.

@marcintustin
Last active January 1, 2018 20:36
Show Gist options
  • Save marcintustin/43228fa3be8c31629ce47f030bd44ef8 to your computer and use it in GitHub Desktop.
Save marcintustin/43228fa3be8c31629ce47f030bd44ef8 to your computer and use it in GitHub Desktop.
Generic Spark Code
import shapeless._
import ops.record._
import ops.hlist._
trait TimeColumns {
val date: Date
val segment_time: Timestamp
val station_id: Integer
val start_time: Timestamp
val end_time: Timestamp
}
case class Viewership(
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp,
viewership_property: String) with TimeColumns
case class Schedule(
program_id: Integer,
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp) with TimeColumns
case class ViewershipWithSchedule(
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp,
viewership_property: String, program_id: Integer) with TimeColumns
// This is in a class so that we can specify the type RESULT explicitly without explicitly
// specifying the type parameters to matchViewershipWithSchedules - we want the compiler
// to calculate those types for us and "conjure" the objects using implicit resolution
class ViewershipMatcher[RESULT <: Product] {
// Holy Type Noise!
// This stuff has to be captured at the boundary of concrete type land and generic type land
// going deeper, this information won't be available to the compiler unless captured here
// At this point the compiler is doing computations for us
def matchViewershipWithSchedules[
VQH <: TimeColumns with Product, ReprVQH <: HList, KeyVQHOut <: HList,
ReprSchedule <: HList, KeyScheduleOut <: HList
](
viewershipWithQHAndDate: Dataset[VQH], schedules: Dataset[Schedule])(
implicit
// Be case classes
ev0: R <:< Product,
ev1: VQH <:< Product,
// Necessary if trying to pass down to any generic functions
ev2: TypeTag[RESULT],
ev3: TypeTag[VQH],
// Have compiler retrieve the parameters to the case classes
ev4: LabelledGeneric.Aux[VQH, ReprVQH],
ev5: LabelledGeneric.Aux[Schedule, ReprSchedule],
// we want these keys
keysVQH: Keys.Aux[ReprVQH, KeyVQHOut],
keysSchedule: Keys.Aux[ReprSchedule, KeyScheduleOut],
// and we want to convert them to a list of Symbols
traversableVQH: ToTraversable.Aux[KeyVQHOut, List, scala.Symbol],
traversableSchedule: ToTraversable.Aux[KeyScheduleOut, List, scala.Symbol]
): Dataset[RESULT] = {
import session.implicits._
val viewershipJoinWithSchedules = viewershipWithQHAndDate.toDF.as('viewership).join(schedules.toDF.as('schedules),
$"viewership.date" === $"schedules.date"
&& $"viewership.segment_time" === $"schedules.segment_time"
&& $"viewership.station_id" === $"schedules.station_id")
// filter on start and end time to avoid spurious matches
.filter(
$"schedules.start_time" < $"viewership.end_time")
&& $"schedules.end_time" > $"viewership.start_time")
)
// Calculate the columns to select without ambiguity
val schedulesProps = Set[String](keysSchedule.apply.toList.map(_.name):_*)
val viewershipProps =
Set[String](keysVQH.apply.toList.map(_.name):_*) -- schedulesProps
// Select everything in viewership unless defined in schedules (i.e. schedules should override)
val finalSelects: Seq[String] =
(viewershipProps.map(x=>s"viewership.${x}") ++ schedulesProps.map(x=>s"schedules.${x}")).toSeq
viewershipJoinWithSchedules
// filter spurious matches
// Sharp-eyed readers will note that running data repeatedly through this filter runs the risk of losing data
// Consider that a problem for another time.
.filter(
unix_timestamp($"schedules.program_utc_start_time") < unix_timestamp($"viewership.tuning_utc_end_time")
&& unix_timestamp($"schedules.program_utc_end_time") > unix_timestamp($"viewership.tuning_utc_start_time")
)
.selectExpr(finalSelects:_*).as[RESULT]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment