Last active
February 22, 2019 03:56
-
-
Save jmichaels/82cd2635bd14c706977b871ba89e3284 to your computer and use it in GitHub Desktop.
Spark Streaming DStream Example - Consuming and Producing Kafka Messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Data from: | |
// | |
// https://catalog.data.gov/dataset/metro-bike-share-trip-data | |
// https://bikeshare.metro.net/about/data/ | |
// | |
// Looks like: | |
// | |
// Trip ID,Duration,Start Time,End Time,Starting Station ID,Starting Station Latitude,Starting Station Longitude,Ending Station ID,Ending Station Latitude,Ending Station Longitude,Bike ID,Plan Duration,Trip Route Category,Passholder Type,Starting Lat-Long,Ending Lat-Long | |
// 1912818,180,07/07/2016 04:17:00 AM,07/07/2016 04:20:00 AM,3014,34.0566101,-118.23721,3014,34.0566101,-118.23721,6281,30,Round Trip,Monthly Pass,"(34.0566101, -118.23721)","(34.0566101, -118.23721)" | |
// 1919661,1980,07/07/2016 06:00:00 AM,07/07/2016 06:33:00 AM,3014,34.0566101,-118.23721,3014,34.0566101,-118.23721,6281,30,Round Trip,Monthly Pass,"(34.0566101, -118.23721)","(34.0566101, -118.23721)" | |
// 1933383,300,07/07/2016 10:32:00 AM,07/07/2016 10:37:00 AM,3016,34.0528984,-118.24156,3016,34.0528984,-118.24156,5861,365,Round Trip,Flex Pass,"(34.0528984, -118.24156)","(34.0528984, -118.24156)" | |
// 1944197,10860,07/07/2016 10:37:00 AM,07/07/2016 01:38:00 PM,3016,34.0528984,-118.24156,3016,34.0528984,-118.24156,5861,365,Round Trip,Flex Pass,"(34.0528984, -118.24156)","(34.0528984, -118.24156)" | |
// 1940317,420,07/07/2016 12:51:00 PM,07/07/2016 12:58:00 PM,3032,34.0498886,-118.25588,3032,34.0498886,-118.25588,6674,0,Round Trip,Walk-up,"(34.0498886, -118.25588)","(34.0498886, -118.25588)" | |
// 1944075,780,07/07/2016 12:51:00 PM,07/07/2016 01:04:00 PM,3021,34.0456085,-118.23703,3054,34.0392189,-118.23649,6717,30,One Way,Monthly Pass,"(34.0456085, -118.23703)","(34.0392189, -118.23649)" | |
// 1944073,600,07/07/2016 12:54:00 PM,07/07/2016 01:04:00 PM,3022,34.0460701,-118.23309,3014,34.0566101,-118.23721,5721,30,One Way,Monthly Pass,"(34.0460701, -118.23309)","(34.0566101, -118.23721)" | |
// | |
// Assumes you have these Kafka topics: | |
// | |
// kafka-topics --create \ | |
// --zookeeper localhost:2181 \ | |
// --replication-factor 1 \ | |
// --partitions 3 \ | |
// --topic bike_share_record | |
// | |
// kafka-topics --create \ | |
// --zookeeper localhost:2181 \ | |
// --replication-factor 1 \ | |
// --partitions 3 \ | |
// --topic long_duration_trips | |
import java.util.Properties | |
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import org.apache.spark.streaming.kafka010.KafkaUtils | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
object BikeShareKafkaEtl { | |
def main(args: Array[String]): Unit = { | |
val kafkaParams = Map[String, Object]( | |
"bootstrap.servers" -> "localhost:9092", | |
"key.deserializer" -> classOf[StringDeserializer], | |
"value.deserializer" -> classOf[StringDeserializer], | |
"group.id" -> "my_consumer_group_1", | |
"auto.offset.reset" -> "latest", | |
"enable.auto.commit" -> (false: java.lang.Boolean) | |
) | |
val microBatchSize = 2 // Seconds | |
val conf = new SparkConf().setAppName("Simple Streaming Application").setMaster("local[*]") | |
val ssc = new StreamingContext(conf, Seconds(microBatchSize)) | |
val topics = Array("bike_share_record") | |
val stream = KafkaUtils.createDirectStream[String, String]( | |
ssc, | |
PreferConsistent, | |
Subscribe[String, String](topics, kafkaParams) | |
) | |
stream.foreachRDD( rdd => { | |
rdd.foreachPartition( partition => { | |
partition.foreach( kafkaMessage => { | |
val producer = buildKafkaProducerForLongDurationTrips() | |
try { | |
val trip = parseBikeShareCSVRow(kafkaMessage.value) | |
if((trip("duration").toInt / 60) > 4) { | |
alertLongDurationTrip(trip, producer) | |
} | |
} catch { | |
case e : Throwable => { | |
println(s"Failed to parse bikeshare CSV: $e") | |
} | |
} | |
}) | |
}) | |
}) | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
def parseBikeShareCSVRow(csvRow: String): Map[String, String] = { | |
val splits = csvRow.split(Utilities.COMMA_DELIMITER) | |
val trip = Map( | |
"trip_id" -> splits(0), | |
"duration" -> splits(1), | |
"start_time" -> splits(2), | |
"end_time" -> splits(3), | |
"starting_station_id" -> splits(4), | |
"starting_station_latitude" -> splits(5), | |
"starting_station_longitude" -> splits(6), | |
"ending_station_id" -> splits(7), | |
"ending_station_latitude" -> splits(8), | |
"ending_station_longitude" -> splits(9), | |
"bike_id" -> splits(10), | |
"plan_duration" -> splits(11), | |
"trip_route_category" -> splits(12), | |
"passholder_type" -> splits(13), | |
"starting_lat_Long" -> splits(14), | |
"ending_lat_long" -> splits(15) | |
) | |
return trip | |
} | |
def buildKafkaProducerForLongDurationTrips() : KafkaProducer[String,String] = { | |
val props = new Properties() | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | |
"org.apache.kafka.common.serialization.StringSerializer") | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, | |
"org.apache.kafka.common.serialization.StringSerializer") | |
val producer = new KafkaProducer[String, String](props) | |
return producer | |
} | |
def alertLongDurationTrip(trip: Map[String, String], producer: KafkaProducer[String, String]) : Unit = { | |
val outputTopic = "long_duration_trips" | |
val tripId = trip("trip_id") | |
val bikeId = trip("bike_id") | |
val duration = trip("duration") | |
val endingStationId = trip("ending_station_id") | |
val message = s"$tripId,$bikeId,$duration,$endingStationId" | |
val messageKey = java.util.UUID.randomUUID().toString() | |
producer.send(new ProducerRecord[String, String](outputTopic, messageKey, message)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment