Created
December 16, 2016 08:12
-
-
Save rjurney/17d471bc98fd1ec925c37d141017640d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys, os, re | |
import json | |
import datetime, iso8601 | |
from pyspark import SparkContext, SparkConf | |
from pyspark.sql import SparkSession, Row | |
from pyspark.streaming import StreamingContext | |
from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition | |
# Save to Mongo | |
from bson import json_util | |
import pymongo_spark | |
pymongo_spark.activate() | |
def main(base_path): | |
APP_NAME = "make_predictions_streaming.py" | |
# Process data every 10 seconds | |
PERIOD = 10 | |
BROKERS = 'localhost:9092' | |
PREDICTION_TOPIC = 'flight_delay_classification_request' | |
try: | |
sc and ssc | |
except NameError as e: | |
import findspark | |
# Add the streaming package and initialize | |
findspark.add_packages(["org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2"]) | |
findspark.init() | |
import pyspark | |
import pyspark.sql | |
import pyspark.streaming | |
conf = SparkConf().set("spark.default.parallelism", 1) | |
sc = SparkContext(appName="Agile Data Science: PySpark Streaming 'Hello, World!'", conf=conf) | |
ssc = StreamingContext(sc, PERIOD) | |
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate() | |
# | |
# Load all models to be used in making predictions | |
# | |
# Load the arrival delay bucketizer | |
from pyspark.ml.feature import Bucketizer | |
arrival_bucketizer_path = "{}/models/arrival_bucketizer.bin".format(base_path) | |
arrival_bucketizer = Bucketizer.load(arrival_bucketizer_path) | |
# Load the departure delay bucketizer | |
departure_bucketizer_path = "{}/models/departure_bucketizer.bin".format(base_path) | |
departure_bucketizer = Bucketizer.load(departure_bucketizer_path) | |
# Load all the string field vectorizer pipelines into a dict | |
from pyspark.ml import PipelineModel | |
string_vectorizer_pipeline_models = {} | |
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear", | |
"Origin", "Dest", "FlightNum", "DepDelayBucket"]: | |
string_pipeline_model_path = "{}/models/string_indexer_pipeline_model_{}.bin".format( | |
base_path, | |
column | |
) | |
string_pipeline_model = PipelineModel.load(string_pipeline_model_path) | |
string_vectorizer_pipeline_models[column] = string_pipeline_model | |
# Load the numeric vector assembler | |
from pyspark.ml.feature import VectorAssembler | |
vector_assembler_path = "{}/models/numeric_vector_assembler.bin".format(base_path) | |
vector_assembler = VectorAssembler.load(vector_assembler_path) | |
# Load the final assembler | |
final_assembler_path = "{}/models/final_vector_assembler.bin".format(base_path) | |
final_assembler = VectorAssembler.load(final_assembler_path) | |
# Load the classifier model | |
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel | |
random_forest_model_path = "{}/models/spark_random_forest_classifier.flight_delays.bin".format( | |
base_path | |
) | |
rfc = RandomForestClassificationModel.load( | |
random_forest_model_path | |
) | |
# | |
# Process Prediction Requests in Streaming | |
# | |
stream = KafkaUtils.createDirectStream( | |
ssc, | |
[PREDICTION_TOPIC], | |
{ | |
"metadata.broker.list": BROKERS, | |
"group.id": "0", | |
} | |
) | |
object_stream = stream.map(lambda x: json.loads(x[1])) | |
object_stream.pprint() | |
row_stream = object_stream.map( | |
lambda x: Row( | |
FlightDate=iso8601.parse_date(x['FlightDate']), | |
Origin=x['Origin'], | |
Distance=x['Distance'], | |
DayOfMonth=x['DayOfMonth'], | |
DayOfYear=x['DayOfYear'], | |
UUID=x['UUID'], | |
DepDelay=x['DepDelay'], | |
DayOfWeek=x['DayOfWeek'], | |
FlightNum=x['FlightNum'], | |
Dest=x['Dest'], | |
Timestamp=iso8601.parse_date(x['Timestamp']), | |
Carrier=x['Carrier'] | |
) | |
) | |
row_stream.pprint() | |
# | |
# Create a dataframe from the RDD-based object stream | |
# | |
def classify_prediction_requests(rdd): | |
from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType, TimestampType | |
from pyspark.sql.types import StructType, StructField | |
prediction_request_schema = StructType([ | |
StructField("Carrier", StringType(), True), | |
StructField("DayOfMonth", IntegerType(), True), | |
StructField("DayOfWeek", IntegerType(), True), | |
StructField("DayOfYear", IntegerType(), True), | |
StructField("DepDelay", DoubleType(), True), | |
StructField("Dest", StringType(), True), | |
StructField("Distance", DoubleType(), True), | |
StructField("FlightDate", DateType(), True), | |
StructField("FlightNum", StringType(), True), | |
StructField("Origin", StringType(), True), | |
StructField("Timestamp", TimestampType(), True), | |
StructField("UUID", StringType(), True), | |
]) | |
prediction_requests_df = spark.createDataFrame(rdd, schema=prediction_request_schema) | |
prediction_requests_df.show() | |
# Bucketize the departure and arrival delays for classification | |
ml_bucketized_features = departure_bucketizer.transform(prediction_requests_df) | |
# Check the buckets | |
ml_bucketized_features.select("DepDelay", "DepDelayBucket").show() | |
# Vectorize string fields with the corresponding pipeline for that column | |
# Turn category fields into categoric feature vectors, then drop intermediate fields | |
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear", | |
"Origin", "Dest", "FlightNum", "DepDelayBucket"]: | |
string_pipeline_path = "{}/models/string_indexer_pipeline_{}.bin".format( | |
base_path, | |
column | |
) | |
string_pipeline_model = string_vectorizer_pipeline_models[column] | |
ml_bucketized_features = string_pipeline_model.transform(ml_bucketized_features) | |
ml_bucketized_features = ml_bucketized_features.drop(column + "_index") | |
# Vectorize numeric columns | |
ml_bucketized_features = vector_assembler.transform(ml_bucketized_features) | |
# Drop the original numeric columns | |
numeric_columns = ["DepDelay", "Distance"] | |
# Combine various features into one feature vector, 'features' | |
final_vectorized_features = final_assembler.transform(ml_bucketized_features) | |
final_vectorized_features.show() | |
# Drop the individual vector columns | |
feature_columns = ["Carrier_vec", "DayOfMonth_vec", "DayOfWeek_vec", "DayOfYear_vec", | |
"Origin_vec", "Dest_vec", "FlightNum_vec", "DepDelayBucket_vec", | |
"NumericFeatures_vec"] | |
for column in feature_columns: | |
final_vectorized_features = final_vectorized_features.drop(column) | |
# Inspect the finalized features | |
final_vectorized_features.show() | |
# Make the prediction | |
predictions = rfc.transform(final_vectorized_features) | |
# Drop the features vector and prediction metadata to give the original fields | |
predictions = predictions.drop("Features_vec") | |
final_predictions = predictions.drop("indices").drop("values").drop("rawPrediction").drop("probability") | |
# Inspect the output | |
final_predictions.show() | |
# Store to Mongo | |
if final_predictions.count() > 0: | |
final_predictions.rdd.map(lambda x: x.asDict()).saveToMongoDB( | |
"mongodb://localhost:27017/agile_data_science.flight_delay_classification_response" | |
) | |
# Do the classification and store to Mongo | |
row_stream.foreachRDD(classify_prediction_requests) | |
ssc.start() | |
ssc.awaitTermination() | |
if __name__ == "__main__": | |
main(sys.argv[1]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
------------------------------------------- | |
Time: 2016-12-15 23:38:00 | |
------------------------------------------- | |
------------------------------------------- | |
Time: 2016-12-15 23:38:00 | |
------------------------------------------- | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+ | |
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID| | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+ | |
+--------+--------------+ | |
|DepDelay|DepDelayBucket| | |
+--------+--------------+ | |
+--------+--------------+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+-----------+--------------+-------------+-------------+----------+--------+-------------+------------------+-------------------+------------+ | |
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|DepDelayBucket|Carrier_vec|DayOfMonth_vec|DayOfWeek_vec|DayOfYear_vec|Origin_vec|Dest_vec|FlightNum_vec|DepDelayBucket_vec|NumericFeatures_vec|Features_vec| | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+-----------+--------------+-------------+-------------+----------+--------+-------------+------------------+-------------------+------------+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+-----------+--------------+-------------+-------------+----------+--------+-------------+------------------+-------------------+------------+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+------------+ | |
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|DepDelayBucket|Features_vec| | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+------------+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+------------+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+----------+ | |
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|DepDelayBucket|Prediction| | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+----------+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+----------+ | |
------------------------------------------- | |
Time: 2016-12-15 23:38:10 | |
------------------------------------------- | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+ | |
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID| | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+ | |
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+ | |
16/12/15 23:38:13 ERROR python.PythonRDD: Error while sending iterator | |
java.net.SocketTimeoutException: Accept timed out | |
at java.net.PlainSocketImpl.socketAccept(Native Method) | |
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) | |
at java.net.ServerSocket.implAccept(ServerSocket.java:545) | |
at java.net.ServerSocket.accept(ServerSocket.java:513) | |
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:697) | |
16/12/15 23:38:13 ERROR scheduler.JobScheduler: Error running job streaming job 1481873890000 ms.1 | |
org.apache.spark.SparkException: An exception was raised by Python: | |
Traceback (most recent call last): | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/util.py", line 65, in call | |
r = self.func(t, *rdds) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint | |
taken = rdd.take(num + 1) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 1310, in take | |
res = self.context.runJob(self, takeUpToNumLeft, p) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/context.py", line 934, in runJob | |
return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 142, in _load_from_socket | |
for item in serializer.load_stream(rf): | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 139, in load_stream | |
yield self._read_with_length(stream) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 156, in _read_with_length | |
length = read_int(stream) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 543, in read_int | |
length = stream.read(4) | |
File "/Users/rjurney/anaconda3/lib/python3.5/socket.py", line 575, in readinto | |
return self._sock.recv_into(b) | |
socket.timeout: timed out | |
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) | |
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) | |
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) | |
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) | |
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) | |
at scala.util.Try$.apply(Try.scala:192) | |
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) | |
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) | |
at java.lang.Thread.run(Thread.java:745) | |
+--------+--------------+ | |
|DepDelay|DepDelayBucket| | |
+--------+--------------+ | |
+--------+--------------+ | |
Traceback (most recent call last): | |
File "ch08/make_predictions_streaming.py", line 210, in <module> | |
main(sys.argv[1]) | |
File "ch08/make_predictions_streaming.py", line 207, in main | |
ssc.awaitTermination() | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/context.py", line 206, in awaitTermination | |
self._jssc.awaitTermination() | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.py", line 63, in deco | |
return f(*a, **kw) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value | |
py4j.protocol.Py4JJavaError: An error occurred while calling o31.awaitTermination. | |
: org.apache.spark.SparkException: An exception was raised by Python: | |
Traceback (most recent call last): | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/util.py", line 65, in call | |
r = self.func(t, *rdds) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint | |
taken = rdd.take(num + 1) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 1310, in take | |
res = self.context.runJob(self, takeUpToNumLeft, p) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/context.py", line 934, in runJob | |
return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 142, in _load_from_socket | |
for item in serializer.load_stream(rf): | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 139, in load_stream | |
yield self._read_with_length(stream) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 156, in _read_with_length | |
length = read_int(stream) | |
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 543, in read_int | |
length = stream.read(4) | |
File "/Users/rjurney/anaconda3/lib/python3.5/socket.py", line 575, in readinto | |
return self._sock.recv_into(b) | |
socket.timeout: timed out | |
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) | |
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) | |
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) | |
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) | |
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) | |
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) | |
at scala.util.Try$.apply(Try.scala:192) | |
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) | |
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) | |
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) | |
at java.lang.Thread.run(Thread.java:745) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment