Created
October 28, 2016 07:32
-
-
Save koljamaier/032717cb327103e093bd277f3a43a74e to your computer and use it in GitHub Desktop.
Spark DataFrame example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext, SparkConf | |
from pyspark.sql import SQLContext | |
from pyspark.sql.types import * | |
import json | |
import re | |
api_pattern = re.compile(r'(POST|HEAD|GET).*HTTP......[0-9]*.[0-9]*') | |
def matcher( str ): | |
match = api_pattern.search(str) | |
if match: | |
res = match.group().replace('"','') | |
words = res.split() | |
key = " ".join(words[0:len(words)-1]) | |
return key | |
else: | |
return | |
conf = SparkConf().setAppName("dataframeTest").setMaster("local") | |
sc = SparkContext(conf=conf) | |
sqlContext = SQLContext(sc) | |
distFile = sc.textFile("logfile.txt") | |
# Load a text file and convert & parse each line to a tuple (RDD) | |
api_requests_rdd = distFile.map(lambda line: (matcher(json.loads(line)["body"]),1)).filter(lambda (x, y): x is not None).reduceByKey(lambda a, b: a + b) | |
schemaCols = "api_endpoint request_count" | |
# Specify the fields of the table. The parameter "True" indicates, that null-values are allowed | |
fields = [StructField(field_name, StringType(), True) for field_name in schemaCols.split()] | |
schema = StructType(fields) | |
# Apply the schema to the RDD and by that create a DataFrame(!). Cache it because it will be used frequently | |
api_request_dataframe = sqlContext.createDataFrame(api_requests_rdd, schema).cache() | |
# Example queries/operations from the DataFrame API | |
api_request_dataframe.show() | |
api_request_dataframe.printSchema() | |
api_request_dataframe.filter(api_request_dataframe["request_count"]>10).show() | |
api_request_dataframe.select(api_request_dataframe["api_endpoint"]).show() | |
# Register the DataFrame as a table. | |
api_request_dataframe.registerTempTable("api_requests") | |
# SQL can be run over DataFrames that have been registered as a table. This returns a DataFrame! | |
results = sqlContext.sql("SELECT api_endpoint FROM api_requests") | |
# The results of SQL queries are RDDs and support all the normal RDD operations. | |
api_endpoints = results.map(lambda ep: "Api endpoint: " + ep["api_endpoint"]) | |
for ep in api_endpoints.collect(): | |
print(ep) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Data Frames in Spark are a important concept. They allow us to enrich RDDs with structured information which can be beneficial in terms of performance. | |
Hence it is also possible to query data on the cluster in a SQL manner. | |
This example was created referring to http://spark.apache.org/docs/1.6.2/sql-programming-guide.html |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment