Skip to content

Instantly share code, notes, and snippets.

@dimitardanailov
Last active March 20, 2019 15:11
Show Gist options
  • Save dimitardanailov/ef04fe59fce94536f5ca to your computer and use it in GitHub Desktop.
Save dimitardanailov/ef04fe59fce94536f5ca to your computer and use it in GitHub Desktop.
Spark - Commands
# Start python
PYSPARK_DRIVER_PYTHON=ipython pyspark
# Verify input data
hdfs dfs -ls input/
# Read shows files
show_views_file = sc.textFile("input/join2_gennum?.txt")
# view first two lines
show_views_file.take(2)
# Parse shows files
def split_show_views(line):
"""
Function to split and parse each line of the data set
line: 'show,views' a string from a gennum file
"""
# split the input line in word and count on the comma
show,views=line.split(",")
# turn the count to an integer
views=int(views)
return (show, views)
show_views = show_views_file.map(split_show_views)
# view the result
show_views.collect()
# view just the first two lines
show_views.take(2)
# Read channel files
show_channel_file = sc.textFile("input/join2_genchan?.txt")
# view first two lines
show_channel_file.take(2)
# Parse channel files
def split_show_channel(line):
"""
Function to split and parse each line of the data set
line: 'show,channel' a string from a gennum file
"""
show,channel=line.split(",")
return (show, channel)
show_channel = show_channel_file.map(split_show_channel)
# view the result
show_channel.take(2)
# Join the two data sets
# use the join transformation, order of files does not matter
joined_dataset = show_views.join(show_channel)
joined_dataset = show_channel.join(show_views)
# view the result
joined_dataset.take(2)
# Extract channel as key
# want total viewers by channel
def extract_channel_views(show_views_channel):
"""
Aim is to find the total viewers by channel
show_views_channel: 'show', (views, 'channel')
returns:an RDD with the channel as key and all the viewer counts, whichever
is the show.
"""
channel,views=show_views_channel[1]
return (channel, views)
channel_views = joined_dataset.map(extract_channel_views)
def sum_channel_viewers(a,b):
return a + b
channel_views.reduceByKey(sum_channel_viewers).collect()
fileA = sc.textFile("input/join1_FileA.txt")
fileA.collect()
# Out[]: [u'able,991', u'about,11', u'burger,15', u'actor,22']
fileB = sc.textFile("input/join1_FileB.txt")
fileB.collect()
# Out[]:
# [u'Jan-01 able,5',
# u'Feb-02 about,3',
# u'Mar-03 about,8 ',
# u'Apr-04 able,13',
# u'Feb-22 actor,3',
# u'Feb-23 burger,5',
# u'Mar-08 burger,2',
# u'Dec-15 able,100']
def split_fileA(line):
# split the input line in word and count on the comma
splitter = line.split(",")
# turn the count to an integer
word = splitter[0]
count = int(splitter[1])
return (word, count)
test_line = "able,991"
split_fileA(test_line) # Out[]: ('able, 991)
fileA_data = fileA.map(split_fileA)
fileA_data.collect()
# Out[]: [(u'able', 991), (u'about', 11), (u'burger', 15), (u'actor', 22)]
def split_fileB(line):
# split the input line into word, date and count_string
splitter = line.split(",")
splitter_word_date = splitter[0].split(" ")
word = splitter_word_date[1]
date = splitter_word_date[0]
count_string = splitter[1]
return (word, date + " " + count_string)
fileB_data = fileB.map(split_fileB)
fileB_data.collect()
# [(u'able', u'Jan-01 5'),
# (u'about', u'Feb-02 3'),
# (u'about', u'Mar-03 8'),
# (u'able', u'Apr-04 13'),
# (u'actor', u'Feb-22 3'),
# (u'burger', u'Feb-23 5'),
# (u'burger', u'Mar-08 2'),
# (u'able', u'Dec-15 100')]
fileB_joined_fileA = fileB_data.join(fileA_data)
fileB_joined_fileA.collect()
# Install IPython
> sudo easy_install ipython==1.2.1
# Launch pyspark with IPython
> PYSPARK_DRIVER_PYTHON=ipython pyspark
# Check ipython version
In [1]: sc.version
# Example Output: u'1.3.0'
# RDD in PySpark
In [2]: integer_RDD = sc.parallelize(range(10), 3)
# Check partitions

# 1. 
# Gather all data on the driver:
In [3]: integer_RDD.collect()
# Example Output: Out: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# 2.
# Maintain splitiing in partitions
In [4]: integer_RDD.glom().collect()
# Example Output: Out: [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
# 1.
# from local filesystem:
In [5]: text_RDD = sc.textFile("file:///home/cloudera/testfile2")

# 2.
# from HDFS
In [6]: text_RDD = sc.textFile("/user/cloudera/input/testfile1")
> text_RDD.take(1) #outputs the first line
# coalesce

In [7]: sc.parallelize(range(10), 4).glom().collect()
# Example Output: [[0, 1], [2, 3], [4, 5], [6, 7, 8, 9]]

In [8]: sc.parallelize(range(10), 4).coalesce(2).glom().collect()
# Example Output: [[0, 1, 2, 3], [4, 5, 6, 7, 8, 9]]
# flatMap
def split_words(line):
return line.spilit()
words_flat_RDD = text_RDD.flatMap(split_words)
words_flat_RDD.collect()
# filter
def starts_with_a(word):
return word.lower().startswith("a")
words_RDD.filter(starts_with_a).collect()
# Transofrmations of (K, V) pairs
def create_pair):
return (word, 1)
pairs_RDD = text_RDD.flatMap(split_words).map(create_pair)
def split_words(line):
return line.split()
def create_pair(word):
return (word, 1)
pairs_RDD = text_RDD.flatMap(split_words).map(create_pair)
pairs_RDD.collect()
# Out[]: [(u'A', 1),
# (u'long', 1),
# (u'time', 1),
# (u'ago', 1),
# (u'in', 1),
# (u'a',1),
# (u'galaxy', 1),
# (u'far',1),
# (u'far', 1),
# (u'away', 1)]
def sum_counts(a, b):
return a + b
wordcounts_RDD = pairs_RDD.reduceByKey(sum_counts)
wordcounts_RDD.collect()
# Out[]:
# [(u'A', 1),
# (u'ago', 1),
# (u'a', 1),
# (u'far', 2),
# (u'long', 1),
# (u'galaxt', 1),
# (u'time', 1),
# (u'in', 1),
# (u'awat', 1)]
# What is the mean of the "cool" column across all of the dataset?

yelp_df.select("cool").agg({"cool": "avg"}).collect()
# Using again the Yelp dataset, take into consideration only the records with a "review count" of 10 or more.
# What is the average of the "cool" column for venues with 4 "stars"? 
yelp_df_review_df = yelp_df.filter("review_count >= 10")
yelp_df_stars_df = yelp_df_review_df.filter("stars = 4")
yelp_df_stars_df.select("cool").agg({"cool": "avg"}).collect()
# Using again the Yelp dataset, take into consideration only the records with a "review count" of 10 or more and only records for which the venue is still open (see the "open" column).
# What is the average of the "cool" column for venues with 5 "stars"?

yelp_df_review_df = yelp_df.filter("review_count >= 10")
yelp_df_venie_open_df = yelp_df_review_df.filter("open=\"True\"")
yelp_df_five_stars_df = yelp_df_venie_open_df.filter("stars = 5")
yelp_df_five_stars_df.select("cool").agg({"cool": "avg"}).collect()
# Using again the Yelp dataset, take into consideration only the records with a "review count" of 10 or more and only records for which the venue is still open (see the "open" column).
# Count the records for each "state", which state has the 3rd highest number of reviews?
from pyspark.sql.functions import asc, desc

yelp_df_review_df = yelp_df.filter("review_count >= 10")
yelp_df_venie_open_df = yelp_df_review_df.filter("open=\"True\"")
yelp_df_groupby_state_df = yelp_df_venie_open_df.groupBy("state")
yelp_df_groupby_state_df.agg({"review_count":"sum"}).orderBy(desc("SUM(review_count)")).collect()
# Using again the Yelp dataset, but taking into consideration the complete dataset, what is the maximum number of rows per venue (identified by "business_id")?

yelp_df.groupBy("business_id","review_count").count().orderBy("count").collect()
# Register as a SQL temp table
yelp_df.registerTempTable("yelp")
# Filter
filtered_yelp = sqlCtx.sql("SELECT * FROM yelp WHERE useful >= 1")
filtered_yelp.count()
# Aggregation
sqlCtx.sql("SELECT MAX(useful) as max_useful FROM yelp").collect()
# Join

# Step 1
useful_perc_data.join(
  yelp_df,
  yelp_df.id == useful_perc_data.uid,
  "inner"
).select(useful_perc_data.uid, "useful_perc", "review_count")

# Step 2
useful_perc_data.registerTempTable("useful_perc_data")

# Step 3
sqlCtx.sql(
"""SELECT useful_perc_data.uid, useful_perc, review_count
FROM useful_perc_data INNER JOIN yelp
ON useful_perc_data.uid=yelp.id""")
# copied hive-site.xml to Spark conf
sudo cp /etc/hive/conf.dist/hive-site.xml /etc/spark/conf/
# Create dataframe from hive table
customers_df = sqlCtx.sql("SELECT * FROM customers")
# Import yelp
yelp_df=sqlCtx.load(source="com.databricks.spark.csv",header='true',inferSchema='true',path='file:///usr/lib/hue/apps/search/examples/collections/solr_configs_yelp_demo/index_data.csv')
# Filter
yelp_df.filter(yelp_df.useful >= 1).count()
yelp_df.filter(yelp_df["useful"] >= 1).count()
yelp_df.filter("useful >= 1").count()
# Select
yelp_df.select("useful").agg({"useful": "max"}).collect()
# Take
yelp_df.select("id", "useful").take(5)
# Modify Column
yelp_df.select("id", yelp_df.useful/28*100).show(5)
yelp_df.select("id", (yelp_df.useful/28*100).cast("int")).show(5)
# Save as new dataframe
useful_perc_data = yelp_df.select(
  yelp_df["id"].alias("uid"), 
  (yelp_df.useful/28*100).cast("int").alias("useful_perc")
)
useful_perc_data.columns
# Ordering by column
from pyspark.sql.functions import asc, desc

useful_perc_data = yelp_df.select(
  yelp_df["id"].alias("uid"), 
  (yelp_df.useful/28*100).cast("int").alias("useful_perc")
).orderBy(desc("useful_perc"))

useful_perc_data.show(2)
# Join inputs

useful_perc_data.join(
  yelp_df,
  yelp_df.id == useful_perc_data.uid,
  "inner"
).cache().select(useful_perc_data.uid, "useful_perc", "review_count").show(5)
@sarkarChanchal105
Copy link

sarkarChanchal105 commented May 29, 2018

Hi Dimitar,

This question is related to the code : advanced-join-in-spark.py

I have question for you on this Advanced Join in spark (Coursera). I am using python3 and the automated grader is showing wrong answer. I suspect that I am missing something with the output format. I am running my code on a Windows machine on stand alone spark cluster(version 2.2).

Here is the sample output I am getting.

[('CNO', 100), ('XYZ', 100), ('BOB', 100), ('ABC', 100), ('MAN', 100), ('DEF', 100), ('CAB', 100), ('NOX', 100), ('BAT', 100)]

Any idea what is wrong here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment