Created
October 28, 2016 07:25
-
-
Save koljamaier/7b401aa85e46c01204d89840b1b3ac83 to your computer and use it in GitHub Desktop.
Spark cluster text processing vs. single node
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From this little example created by myself I learned, that it is always important to think in the distributed MapReduce paradigm when getting hands on spark. | |
Operations that are costly calculations in a centralized manner should be avoided on the cluster. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
def matcher( str ): | |
for match in re.finditer(r'(POST|HEAD|GET).*HTTP......[0-9]*.[0-9]*', str): | |
if match: | |
res = match.group().replace('"','') | |
words = res.split() | |
key = " ".join(words[0:len(words)-1]) | |
requests = int(words[len(words)-1]) | |
print key | |
if key in api_requests: | |
api_requests[key][0] += requests | |
api_requests[key][1] += 1 | |
else: | |
api_requests[key] = [requests] | |
api_requests[key].append(1) | |
else: | |
print "Nothing found" | |
return | |
with open("logfile.txt", "r") as file: | |
# new dictionary | |
api_requests = {} | |
for line in file: | |
data = json.loads(line) | |
elm = data["body"] | |
matcher(elm) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext, SparkConf | |
import json | |
import re | |
api_pattern = re.compile(r'(POST|HEAD|GET).*HTTP......[0-9]*.[0-9]*') | |
def matcher( str ): | |
match = api_pattern.search(str) | |
if match: | |
res = match.group().replace('"','') | |
words = res.split() | |
key = " ".join(words[0:len(words)-1]) | |
return key | |
else: | |
return | |
conf = SparkConf().setAppName("test").setMaster("local") | |
sc = SparkContext(conf=conf) | |
distFile = sc.textFile("logfile.txt") | |
api_requests = distFile.map(lambda line: (matcher(json.loads(line)["body"]),1)).filter(lambda (x, y): x is not None).reduceByKey(lambda a, b: a + b) | |
list_test = api_requests.map(lambda (x, y): list((x,y)) ) | |
for name in list_test.collect(): | |
print(name[1]) | |
api_requests.saveAsTextFile("api_requests") | |
# spark-submit C:\Users\koljacorneliusmaier\Documents\textprocessingCluster.py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment