Last active
January 1, 2018 20:36
-
-
Save marcintustin/43228fa3be8c31629ce47f030bd44ef8 to your computer and use it in GitHub Desktop.
Generic Spark Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shapeless._ | |
import ops.record._ | |
import ops.hlist._ | |
trait TimeColumns { | |
val date: Date | |
val segment_time: Timestamp | |
val station_id: Integer | |
val start_time: Timestamp | |
val end_time: Timestamp | |
} | |
case class Viewership( | |
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp, | |
viewership_property: String) with TimeColumns | |
case class Schedule( | |
program_id: Integer, | |
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp) with TimeColumns | |
case class ViewershipWithSchedule( | |
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp, | |
viewership_property: String, program_id: Integer) with TimeColumns | |
// This is in a class so that we can specify the type RESULT explicitly without explicitly | |
// specifying the type parameters to matchViewershipWithSchedules - we want the compiler | |
// to calculate those types for us and "conjure" the objects using implicit resolution | |
class ViewershipMatcher[RESULT <: Product] { | |
// Holy Type Noise! | |
// This stuff has to be captured at the boundary of concrete type land and generic type land | |
// going deeper, this information won't be available to the compiler unless captured here | |
// At this point the compiler is doing computations for us | |
def matchViewershipWithSchedules[ | |
VQH <: TimeColumns with Product, ReprVQH <: HList, KeyVQHOut <: HList, | |
ReprSchedule <: HList, KeyScheduleOut <: HList | |
]( | |
viewershipWithQHAndDate: Dataset[VQH], schedules: Dataset[Schedule])( | |
implicit | |
// Be case classes | |
ev0: R <:< Product, | |
ev1: VQH <:< Product, | |
// Necessary if trying to pass down to any generic functions | |
ev2: TypeTag[RESULT], | |
ev3: TypeTag[VQH], | |
// Have compiler retrieve the parameters to the case classes | |
ev4: LabelledGeneric.Aux[VQH, ReprVQH], | |
ev5: LabelledGeneric.Aux[Schedule, ReprSchedule], | |
// we want these keys | |
keysVQH: Keys.Aux[ReprVQH, KeyVQHOut], | |
keysSchedule: Keys.Aux[ReprSchedule, KeyScheduleOut], | |
// and we want to convert them to a list of Symbols | |
traversableVQH: ToTraversable.Aux[KeyVQHOut, List, scala.Symbol], | |
traversableSchedule: ToTraversable.Aux[KeyScheduleOut, List, scala.Symbol] | |
): Dataset[RESULT] = { | |
import session.implicits._ | |
val viewershipJoinWithSchedules = viewershipWithQHAndDate.toDF.as('viewership).join(schedules.toDF.as('schedules), | |
$"viewership.date" === $"schedules.date" | |
&& $"viewership.segment_time" === $"schedules.segment_time" | |
&& $"viewership.station_id" === $"schedules.station_id") | |
// filter on start and end time to avoid spurious matches | |
.filter( | |
$"schedules.start_time" < $"viewership.end_time") | |
&& $"schedules.end_time" > $"viewership.start_time") | |
) | |
// Calculate the columns to select without ambiguity | |
val schedulesProps = Set[String](keysSchedule.apply.toList.map(_.name):_*) | |
val viewershipProps = | |
Set[String](keysVQH.apply.toList.map(_.name):_*) -- schedulesProps | |
// Select everything in viewership unless defined in schedules (i.e. schedules should override) | |
val finalSelects: Seq[String] = | |
(viewershipProps.map(x=>s"viewership.${x}") ++ schedulesProps.map(x=>s"schedules.${x}")).toSeq | |
viewershipJoinWithSchedules | |
// filter spurious matches | |
// Sharp-eyed readers will note that running data repeatedly through this filter runs the risk of losing data | |
// Consider that a problem for another time. | |
.filter( | |
unix_timestamp($"schedules.program_utc_start_time") < unix_timestamp($"viewership.tuning_utc_end_time") | |
&& unix_timestamp($"schedules.program_utc_end_time") > unix_timestamp($"viewership.tuning_utc_start_time") | |
) | |
.selectExpr(finalSelects:_*).as[RESULT] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment