Created
April 15, 2019 08:12
-
-
Save oskarryn/09b2caac97e00103d44ed3e6f29e75ba to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
spark/bin/spark-submit \ | |
--master local --driver-memory 4g \ | |
--num-executors 2 --executor-memory 4g \ | |
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \ | |
sstreaming-spark-out.py | |
''' | |
from pyspark.sql import SparkSession | |
from pyspark.sql.types import * | |
spark = SparkSession.builder \ | |
.appName("Spark Structured Streaming from Kafka") \ | |
.getOrCreate() | |
sdfRides = spark \ | |
.readStream \ | |
.format("kafka") \ | |
.option("kafka.bootstrap.servers", "localhost:9092") \ | |
.option("subscribe", "taxirides") \ | |
.option("startingOffsets", "latest") \ | |
.load() \ | |
.selectExpr("CAST(value AS STRING)") | |
sdfFares = spark \ | |
.readStream \ | |
.format("kafka") \ | |
.option("kafka.bootstrap.servers", "localhost:9092") \ | |
.option("subscribe", "taxifares") \ | |
.option("startingOffsets", "latest") \ | |
.load() \ | |
.selectExpr("CAST(value AS STRING)") | |
taxiFaresSchema = StructType([ \ | |
StructField("rideId", LongType()), StructField("taxiId", LongType()), \ | |
StructField("driverId", LongType()), StructField("startTime", TimestampType()), \ | |
StructField("paymentType", StringType()), StructField("tip", FloatType()), \ | |
StructField("tolls", FloatType()), StructField("totalFare", FloatType())]) | |
taxiRidesSchema = StructType([ \ | |
StructField("rideId", LongType()), StructField("isStart", StringType()), \ | |
StructField("endTime", TimestampType()), StructField("startTime", TimestampType()), \ | |
StructField("startLon", FloatType()), StructField("startLat", FloatType()), \ | |
StructField("endLon", FloatType()), StructField("endLat", FloatType()), \ | |
StructField("passengerCnt", ShortType()), StructField("taxiId", LongType()), \ | |
StructField("driverId", LongType())]) | |
def parse_data_from_kafka_message(sdf, schema): | |
from pyspark.sql.functions import split | |
assert sdf.isStreaming == True, "DataFrame doesn't receive treaming data" | |
col = split(sdf['value'], ',') #split attributes to nested array in one Column | |
#now expand col to multiple top-level columns | |
for idx, field in enumerate(schema): | |
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType)) | |
return sdf.select([field.name for field in schema]) | |
sdfRides = parse_data_from_kafka_message(sdfRides, taxiRidesSchema) | |
sdfFares = parse_data_from_kafka_message(sdfFares, taxiFaresSchema) | |
query = sdfRides.groupBy("driverId").count() | |
query.writeStream \ | |
.outputMode("complete") \ | |
.format("console") \ | |
.option("truncate", False) \ | |
.start() \ | |
.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment