Skip to content

Instantly share code, notes, and snippets.

@marcintustin
Created January 1, 2018 19:05
Show Gist options
  • Save marcintustin/3cd470760e3458f718d615e8f0c27c8d to your computer and use it in GitHub Desktop.
Save marcintustin/3cd470760e3458f718d615e8f0c27c8d to your computer and use it in GitHub Desktop.
Spark code with explicit schemas - dangerously parametrised
def matchViewershipWithSchedules[T, RESULT](
viewership: Dataset[T], schedules: Dataset[Schedule])(
// You need this so that RESULT can be used as a type parameter
// inside the body of the function. A quirk of how Scala conforms
// to Java's type erasure rules
implicit ev: TypeTag[RESULT]): Dataset[RESULT] = {
import session.implicits._
viewershipWithQHAndDate.toDF.as('viewership).join(schedules.toDF.as('schedules),
$"viewership.date" === $"schedules.date"
&& $"viewership.segment_time" === $"schedules.segment_time"
&& $"viewership.station_id" === $"schedules.station_id")
// filter on start and end time to avoid spurious matches
.filter(
$"schedules.start_time" < $"viewership.end_time")
&& $"schedules.end_time" > $"viewership.start_time")
)
.selectExpr("viewership.*", "schedules.program_id").as[RESULT]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment