Skip to content

Instantly share code, notes, and snippets.

@jmichaels
Last active February 22, 2019 03:56
Show Gist options
  • Save jmichaels/82cd2635bd14c706977b871ba89e3284 to your computer and use it in GitHub Desktop.
Save jmichaels/82cd2635bd14c706977b871ba89e3284 to your computer and use it in GitHub Desktop.
Spark Streaming DStream Example - Consuming and Producing Kafka Messages
// Data from:
//
// https://catalog.data.gov/dataset/metro-bike-share-trip-data
// https://bikeshare.metro.net/about/data/
//
// Looks like:
//
// Trip ID,Duration,Start Time,End Time,Starting Station ID,Starting Station Latitude,Starting Station Longitude,Ending Station ID,Ending Station Latitude,Ending Station Longitude,Bike ID,Plan Duration,Trip Route Category,Passholder Type,Starting Lat-Long,Ending Lat-Long
// 1912818,180,07/07/2016 04:17:00 AM,07/07/2016 04:20:00 AM,3014,34.0566101,-118.23721,3014,34.0566101,-118.23721,6281,30,Round Trip,Monthly Pass,"(34.0566101, -118.23721)","(34.0566101, -118.23721)"
// 1919661,1980,07/07/2016 06:00:00 AM,07/07/2016 06:33:00 AM,3014,34.0566101,-118.23721,3014,34.0566101,-118.23721,6281,30,Round Trip,Monthly Pass,"(34.0566101, -118.23721)","(34.0566101, -118.23721)"
// 1933383,300,07/07/2016 10:32:00 AM,07/07/2016 10:37:00 AM,3016,34.0528984,-118.24156,3016,34.0528984,-118.24156,5861,365,Round Trip,Flex Pass,"(34.0528984, -118.24156)","(34.0528984, -118.24156)"
// 1944197,10860,07/07/2016 10:37:00 AM,07/07/2016 01:38:00 PM,3016,34.0528984,-118.24156,3016,34.0528984,-118.24156,5861,365,Round Trip,Flex Pass,"(34.0528984, -118.24156)","(34.0528984, -118.24156)"
// 1940317,420,07/07/2016 12:51:00 PM,07/07/2016 12:58:00 PM,3032,34.0498886,-118.25588,3032,34.0498886,-118.25588,6674,0,Round Trip,Walk-up,"(34.0498886, -118.25588)","(34.0498886, -118.25588)"
// 1944075,780,07/07/2016 12:51:00 PM,07/07/2016 01:04:00 PM,3021,34.0456085,-118.23703,3054,34.0392189,-118.23649,6717,30,One Way,Monthly Pass,"(34.0456085, -118.23703)","(34.0392189, -118.23649)"
// 1944073,600,07/07/2016 12:54:00 PM,07/07/2016 01:04:00 PM,3022,34.0460701,-118.23309,3014,34.0566101,-118.23721,5721,30,One Way,Monthly Pass,"(34.0460701, -118.23309)","(34.0566101, -118.23721)"
//
// Assumes you have these Kafka topics:
//
// kafka-topics --create \
// --zookeeper localhost:2181 \
// --replication-factor 1 \
// --partitions 3 \
// --topic bike_share_record
//
// kafka-topics --create \
// --zookeeper localhost:2181 \
// --replication-factor 1 \
// --partitions 3 \
// --topic long_duration_trips
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object BikeShareKafkaEtl {
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my_consumer_group_1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val microBatchSize = 2 // Seconds
val conf = new SparkConf().setAppName("Simple Streaming Application").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(microBatchSize))
val topics = Array("bike_share_record")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD( rdd => {
rdd.foreachPartition( partition => {
partition.foreach( kafkaMessage => {
val producer = buildKafkaProducerForLongDurationTrips()
try {
val trip = parseBikeShareCSVRow(kafkaMessage.value)
if((trip("duration").toInt / 60) > 4) {
alertLongDurationTrip(trip, producer)
}
} catch {
case e : Throwable => {
println(s"Failed to parse bikeshare CSV: $e")
}
}
})
})
})
ssc.start()
ssc.awaitTermination()
}
def parseBikeShareCSVRow(csvRow: String): Map[String, String] = {
val splits = csvRow.split(Utilities.COMMA_DELIMITER)
val trip = Map(
"trip_id" -> splits(0),
"duration" -> splits(1),
"start_time" -> splits(2),
"end_time" -> splits(3),
"starting_station_id" -> splits(4),
"starting_station_latitude" -> splits(5),
"starting_station_longitude" -> splits(6),
"ending_station_id" -> splits(7),
"ending_station_latitude" -> splits(8),
"ending_station_longitude" -> splits(9),
"bike_id" -> splits(10),
"plan_duration" -> splits(11),
"trip_route_category" -> splits(12),
"passholder_type" -> splits(13),
"starting_lat_Long" -> splits(14),
"ending_lat_long" -> splits(15)
)
return trip
}
def buildKafkaProducerForLongDurationTrips() : KafkaProducer[String,String] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
return producer
}
def alertLongDurationTrip(trip: Map[String, String], producer: KafkaProducer[String, String]) : Unit = {
val outputTopic = "long_duration_trips"
val tripId = trip("trip_id")
val bikeId = trip("bike_id")
val duration = trip("duration")
val endingStationId = trip("ending_station_id")
val message = s"$tripId,$bikeId,$duration,$endingStationId"
val messageKey = java.util.UUID.randomUUID().toString()
producer.send(new ProducerRecord[String, String](outputTopic, messageKey, message))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment