Skip to content

Instantly share code, notes, and snippets.

@korkridake
Created November 23, 2018 08:17
Show Gist options
  • Save korkridake/fb0272f991877e1758dc8a4c013cecca to your computer and use it in GitHub Desktop.
Save korkridake/fb0272f991877e1758dc8a4c013cecca to your computer and use it in GitHub Desktop.
spark
# <pyspark.sql.session.SparkSession at 0x7f8df8673ba8>
# -------------------------------------------------------------------------------
# Import PySpark Libraries
# -------------------------------------------------------------------------------
import datetime
from datetime import datetime
from pyspark.sql.functions import skewness, kurtosis
from pyspark.sql.functions import var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql import Row
from pyspark.sql.functions import col
#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and
#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should
#be more clear after we use it below
from pyspark.sql.functions import udf
row = Row("date", "name", "production")
# Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program.
# The elements of the collection are copied to form a distributed dataset that can be operated on in parallel
# Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list.
# One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster.
# Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster.
# However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10))
# See More: https://spark.apache.org/docs/2.1.1/programming-guide.html
df = sc.parallelize([
row("08/01/2014", "Kim", 5),
row("08/02/2014", "Kim", 14),
row("08/01/2014", "Bob", 6),
row("08/02/2014", "Bob", 3),
row("08/01/2014", "Sue", 0),
row("08/02/2014", "Sue", 22),
row("08/01/2014", "Dan", 4),
row("08/02/2014", "Dan", 4),
row("08/01/2014", "Joe", 37),
row("09/01/2014", "Kim", 6),
row("09/02/2014", "Kim", 6),
row("09/01/2014", "Bob", 4),
row("09/02/2014", "Bob", 20),
row("09/01/2014", "Sue", 11),
row("09/02/2014", "Sue", 2),
row("09/01/2014", "Dan", 1),
row("09/02/2014", "Dan", 3),
row("09/02/2014", "Joe", 29)
], 10).toDF()
df.show()
# +----------+----+----------+
# | date|name|production|
# +----------+----+----------+
# |08/01/2014| Kim| 5|
# |08/02/2014| Kim| 14|
# |08/01/2014| Bob| 6|
# |08/02/2014| Bob| 3|
# |08/01/2014| Sue| 0|
# |08/02/2014| Sue| 22|
# |08/01/2014| Dan| 4|
# |08/02/2014| Dan| 4|
# |08/01/2014| Joe| 37|
# |09/01/2014| Kim| 6|
# |09/02/2014| Kim| 6|
# |09/01/2014| Bob| 4|
# |09/02/2014| Bob| 20|
# |09/01/2014| Sue| 11|
# |09/02/2014| Sue| 2|
# |09/01/2014| Dan| 1|
# |09/02/2014| Dan| 3|
# |09/02/2014| Joe| 29|
# +----------+----+----------+
df.dtypes
# Out[14]: [('date', 'string'), ('name', 'string'), ('production', 'bigint')]
#we define our own function that knows how to split apart a MM/DD/YYYY string and return a
#MM/YYYY string. everything in here is standard Python, and not specific to pySpark
def split_date(whole_date):
#this try-except handler provides some minimal fault tolerance in case one of our date
#strings is malformed, as we might find with real-world data. if it fails to split the
#date into three parts it just returns 'error', which we could later subset the data on
#to see what went wrong
try:
mo, day, yr = whole_date.split('/')
except ValueError:
return 'error'
#lastly we return the month and year strings joined together
return mo + '/' + yr
#this is where we wrap the function we wrote above in the udf wrapper
udf_split_date = udf(split_date)
#here we create a new dataframe by calling the original dataframe and specifying the new
#column. unlike with Pandas or R, pySpark dataframes are immutable, so we cannot simply assign
#to a new column on the original dataframe
df_new = df.withColumn('month_year', udf_split_date('date'))
df_new.show()
# +----------+----+----------+----------+
# | date|name|production|month_year|
# +----------+----+----------+----------+
# |08/01/2014| Kim| 5| 08/2014|
# |08/02/2014| Kim| 14| 08/2014|
# |08/01/2014| Bob| 6| 08/2014|
# |08/02/2014| Bob| 3| 08/2014|
# |08/01/2014| Sue| 0| 08/2014|
# |08/02/2014| Sue| 22| 08/2014|
# |08/01/2014| Dan| 4| 08/2014|
# |08/02/2014| Dan| 4| 08/2014|
# |08/01/2014| Joe| 37| 08/2014|
# |09/01/2014| Kim| 6| 09/2014|
# |09/02/2014| Kim| 6| 09/2014|
# |09/01/2014| Bob| 4| 09/2014|
# |09/02/2014| Bob| 20| 09/2014|
# |09/01/2014| Sue| 11| 09/2014|
# |09/02/2014| Sue| 2| 09/2014|
# |09/01/2014| Dan| 1| 09/2014|
# |09/02/2014| Dan| 3| 09/2014|
# |09/02/2014| Joe| 29| 09/2014|
# +----------+----+----------+----------+
print(udf_split_date)
<function split_date at 0x7fabfb1bfbf8>
# How to drop column in PySpark
df_new = df_new.drop('date')
# GroupBy Operation in PySpark (Similar to Pandas)
df_agg = df_new.groupBy('month_year', 'name').agg({'production' : 'sum'})
df_agg.show()
# Apply the datetime.strptime function to every row in 'new_date' column
dateFormat = udf(lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())
df_d = df.withColumn('new_date', dateFormat(col('date')))
df_d.dtypes
# Out[32]:
# [('date', 'string'),
# ('name', 'string'),
# ('production', 'bigint'),
# ('new_date', 'date')]
df_d.show()
# +----------+----+----------+----------+
# | date|name|production| new_date|
# +----------+----+----------+----------+
# |08/01/2014| Kim| 5|2014-01-01|
# |08/02/2014| Kim| 14|2014-01-02|
# |08/01/2014| Bob| 6|2014-01-01|
# |08/02/2014| Bob| 3|2014-01-02|
# |08/01/2014| Sue| 0|2014-01-01|
# |08/02/2014| Sue| 22|2014-01-02|
# |08/01/2014| Dan| 4|2014-01-01|
# |08/02/2014| Dan| 4|2014-01-02|
# |08/01/2014| Joe| 37|2014-01-01|
# |09/01/2014| Kim| 6|2014-01-01|
# |09/02/2014| Kim| 6|2014-01-02|
# |09/01/2014| Bob| 4|2014-01-01|
# |09/02/2014| Bob| 20|2014-01-02|
# |09/01/2014| Sue| 11|2014-01-01|
# |09/02/2014| Sue| 2|2014-01-02|
# |09/01/2014| Dan| 1|2014-01-01|
# |09/02/2014| Dan| 3|2014-01-02|
# |09/02/2014| Joe| 29|2014-01-02|
# +----------+----+----------+----------+
df_d.select('new_date').take(3)
# [Row(new_date=datetime.date(2014, 1, 1)),
# Row(new_date=datetime.date(2014, 1, 2)),
# Row(new_date=datetime.date(2014, 1, 1))]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment