Created October 28, 2016 07:32
Spark DataFrame example
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import json
import re
api_pattern = re.compile(r'(POST|HEAD|GET).*HTTP......[0-9]*.[0-9]*')
def matcher( str ):
match =
if match:
res ='"','')
words = res.split()
key = " ".join(words[0:len(words)-1])
return key
conf = SparkConf().setAppName("dataframeTest").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
distFile = sc.textFile("logfile.txt")
# Load a text file and convert & parse each line to a tuple (RDD)
api_requests_rdd = line: (matcher(json.loads(line)["body"]),1)).filter(lambda (x, y): x is not None).reduceByKey(lambda a, b: a + b)
schemaCols = "api_endpoint request_count"
# Specify the fields of the table. The parameter "True" indicates, that null-values are allowed
fields = [StructField(field_name, StringType(), True) for field_name in schemaCols.split()]
schema = StructType(fields)
# Apply the schema to the RDD and by that create a DataFrame(!). Cache it because it will be used frequently
api_request_dataframe = sqlContext.createDataFrame(api_requests_rdd, schema).cache()
# Example queries/operations from the DataFrame API
# Register the DataFrame as a table.
# SQL can be run over DataFrames that have been registered as a table. This returns a DataFrame!
results = sqlContext.sql("SELECT api_endpoint FROM api_requests")
# The results of SQL queries are RDDs and support all the normal RDD operations.
api_endpoints = ep: "Api endpoint: " + ep["api_endpoint"])
for ep in api_endpoints.collect():
Data Frames in Spark are a important concept. They allow us to enrich RDDs with structured information which can be beneficial in terms of performance.
Hence it is also possible to query data on the cluster in a SQL manner.
This example was created referring to
