Skip to content

Instantly share code, notes, and snippets.

@marcintustin
Last active January 1, 2018 19:52
Show Gist options
  • Save marcintustin/285ea7222c7738f8def4cf2890553fc4 to your computer and use it in GitHub Desktop.
Save marcintustin/285ea7222c7738f8def4cf2890553fc4 to your computer and use it in GitHub Desktop.
Spark code with explicit schemas
case class Viewership(
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp, viewership_property: String)
case class Schedule(
program_id: Integer, date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp)
case class ViewershipWithSchedule(
date: Date, segment_time: Timestamp, station_id: Integer, start_time: Timestamp, end_time: Timestamp,
viewership_property: String, program_id: Integer)
def matchViewershipWithSchedules(
viewership: Dataset[Viewership], schedules: Dataset[Schedule]): Dataset[ViewershipWithSchedule] = {
import session.implicits._
viewershipWithQHAndDate.toDF.as('viewership).join(schedules.toDF.as('schedules),
$"viewership.date" === $"schedules.date"
&& $"viewership.segment_time" === $"schedules.segment_time"
&& $"viewership.station_id" === $"schedules.station_id")
// filter on start and end time to avoid spurious matches
.filter(
$"schedules.start_time" < $"viewership.end_time")
&& $"schedules.end_time" > $"viewership.start_time")
)
.selectExpr("viewership.*", "schedules.program_id").as[ViewershipWithSchedule]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment