Skip to content

Instantly share code, notes, and snippets.

@miodeqqq
Created December 1, 2016 19:24
Show Gist options
  • Save miodeqqq/456aa5a054746ef62889a5eb1597efe5 to your computer and use it in GitHub Desktop.
Save miodeqqq/456aa5a054746ef62889a5eb1597efe5 to your computer and use it in GitHub Desktop.
PySpark basics: filtering, mapping, count
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
LOG_FILE = "hdfs://grid223-20:9000/input/taglogsbig/huge10g.log"
USERNAME = 'bob'
conf = SparkConf().setAppName("Maciej Januszewski").setMaster("spark://grid223-20:7077").set("spark.executor.memory", "3g").set("spark.driver.cores", 4);
sc = SparkContext(conf=conf)
file = sc.textFile(LOG_FILE).cache()
def filter_user(file, username):
"""
Filtering by user.
"""
return file.filter(lambda line: line.split()[1] == username)
def map_time(line):
"""
Time mapping.
"""
return int(line.split()[len(line.split()) - 1])
def map_host_time(line):
"""
Host-Time mapping.
"""
return line.split()[0], int(line.split()[len(line.split()) - 1])
def main():
line_count = file.count()
bob_count, alice_count = (filter_user(file, 'bob').count(), filter_user(file, 'alice').count())
bob_time = filter_user(file, 'bob').map(map_time).reduce(lambda a, b: a + b)
user_hostname_time = filter_user(file, USERNAME).map(map_host_time).reduceByKey(lambda a, b: a + b).collect()
user_time_by_hosts = (filter_user(file, USERNAME).map(map_host_time).reduceByKey(lambda a, b: a + b))
all_times_by_hosts = file.map(map_host_time).reduceByKey(lambda a, b: a + b)
joined = user_time_by_hosts.join(all_times_by_hosts).mapValues(lambda times: times[0] / float(times[1]) * 100).collect()
print("Number of lines: {line_count}\nAlice -: {alice_count}\nBob - number of lines: {bob_count}\nBob - time: {bob_time}".format(
line_count=line_count,
alice_count=alice_count,
bob_count=bob_count,
bob_time=bob_time
))
for pair in user_hostname_time:
print("\t{pair1} - {pair2}".format(
pair1=pair[0],
pair2=pair[1]
)
)
for pair in joined:
print("\t{pair} - {percent_value}".format(
pair=pair[0],
percent_value=round(pair[1], 2)
)
)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment