Created
September 11, 2019 16:55
-
-
Save nuria/cd9b775f7c0242b96d9da9493b98f932 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# From stat1004: | |
# pyspark2 --jars ~otto/spark-sql-kafka-0-10_2.11-2.3.1.jar,~otto/kafka-clients-1.1.0.jar | |
# Need spark-sql-kafka for DataStream source and kafka-clients for Kafka serdes. | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
# Declare a Spark schema that matches the JSONData. | |
# In a future MEP world this would be automatically loaded | |
# from a JSONSchema. | |
netflow_schema = StructType() \ | |
.add("event_type", "string") \ | |
.add("tag2", "integer") \ | |
.add("as_src", "integer") \ | |
.add("as_dst", "integer") \ | |
.add("as_path", "string") \ | |
.add("peer_as_src", "integer") \ | |
.add("peer_as_dst", "string") \ | |
.add("ip_src", "string") \ | |
.add("ip_dst", "string") \ | |
.add("port_src", "integer") \ | |
.add("port_dst", "integer") \ | |
.add("tcp_flags", "string") \ | |
.add("ip_proto", "string") \ | |
.add("stamp_inserted", "timestamp") \ | |
.add("stamp_updated", "timestamp") \ | |
.add("packets", "integer") \ | |
.add("bytes", "integer") \ | |
.add("writer_id", "string") | |
# Create a streaming DataFrame from the netflow topic in Kafka. | |
netflow_stream = spark \ | |
.readStream \ | |
.format("kafka") \ | |
.option("kafka.bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet:9092") \ | |
.option("subscribe", "netflow") \ | |
.load() | |
# The JSON data is in the Kafka message value. | |
# We need to read it as json (using from_json function) using our schema, | |
# and then select the subfields into a top level DataFrame | |
netflow = netflow_stream \ | |
.selectExpr("CAST(value AS STRING)") \ | |
.select(from_json("value", netflow_schema).alias("data")).select("data.*") | |
# Query the DataFrame using Spark SQL API. | |
# This will output the top 20 counts seen for (configurable) ip_src, | |
# updated every 30 seconds. | |
# NOTE: .format("console") writes to the console; this could | |
# be writing back to another Kafka topic, or a file, or elsewhere. | |
ip_src_counts = netflow \ | |
.withWatermark("stamp_updated", "30 seconds") \ | |
.groupBy("ip_src").count() \ | |
.orderBy("count", ascending=False) \ | |
.writeStream \ | |
.trigger(processingTime="30 seconds") \ | |
.outputMode("complete") \ | |
.option("truncate", False) \ | |
.format("console") \ | |
.start() | |
# ------------------------------------------- | |
# Batch: 2 | |
# ------------------------------------------- | |
# +--------------+-----+ | |
# |ip_src |count| | |
# +--------------+-----+ | |
# |X.X.X.X |8809 | | |
# |X.X.X.X |8295 | | |
# |X.X.X.X |743 | | |
# ... | |
# +--------------+-----+ | |
# only showing top 20 rows | |
# To stop the streaming query: | |
ip_src_counts.stop() | |
# See also: | |
# https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html | |
# https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html | |
# https://spark.apache.org/docs/2.3.1/structured-streaming-programming-guide.html | |
# https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html | |
# |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment