Created
November 23, 2018 08:17
-
-
Save korkridake/fb0272f991877e1758dc8a4c013cecca to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark | |
# <pyspark.sql.session.SparkSession at 0x7f8df8673ba8> | |
# ------------------------------------------------------------------------------- | |
# Import PySpark Libraries | |
# ------------------------------------------------------------------------------- | |
import datetime | |
from datetime import datetime | |
from pyspark.sql.functions import skewness, kurtosis | |
from pyspark.sql.functions import var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile | |
from pyspark.sql.types import IntegerType | |
from pyspark.sql.types import StringType | |
from pyspark.sql.types import DateType | |
from pyspark.sql import Row | |
from pyspark.sql.functions import col | |
#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and | |
#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should | |
#be more clear after we use it below | |
from pyspark.sql.functions import udf | |
row = Row("date", "name", "production") | |
# Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. | |
# The elements of the collection are copied to form a distributed dataset that can be operated on in parallel | |
# Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. | |
# One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. | |
# Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. | |
# However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)) | |
# See More: https://spark.apache.org/docs/2.1.1/programming-guide.html | |
df = sc.parallelize([ | |
row("08/01/2014", "Kim", 5), | |
row("08/02/2014", "Kim", 14), | |
row("08/01/2014", "Bob", 6), | |
row("08/02/2014", "Bob", 3), | |
row("08/01/2014", "Sue", 0), | |
row("08/02/2014", "Sue", 22), | |
row("08/01/2014", "Dan", 4), | |
row("08/02/2014", "Dan", 4), | |
row("08/01/2014", "Joe", 37), | |
row("09/01/2014", "Kim", 6), | |
row("09/02/2014", "Kim", 6), | |
row("09/01/2014", "Bob", 4), | |
row("09/02/2014", "Bob", 20), | |
row("09/01/2014", "Sue", 11), | |
row("09/02/2014", "Sue", 2), | |
row("09/01/2014", "Dan", 1), | |
row("09/02/2014", "Dan", 3), | |
row("09/02/2014", "Joe", 29) | |
], 10).toDF() | |
df.show() | |
# +----------+----+----------+ | |
# | date|name|production| | |
# +----------+----+----------+ | |
# |08/01/2014| Kim| 5| | |
# |08/02/2014| Kim| 14| | |
# |08/01/2014| Bob| 6| | |
# |08/02/2014| Bob| 3| | |
# |08/01/2014| Sue| 0| | |
# |08/02/2014| Sue| 22| | |
# |08/01/2014| Dan| 4| | |
# |08/02/2014| Dan| 4| | |
# |08/01/2014| Joe| 37| | |
# |09/01/2014| Kim| 6| | |
# |09/02/2014| Kim| 6| | |
# |09/01/2014| Bob| 4| | |
# |09/02/2014| Bob| 20| | |
# |09/01/2014| Sue| 11| | |
# |09/02/2014| Sue| 2| | |
# |09/01/2014| Dan| 1| | |
# |09/02/2014| Dan| 3| | |
# |09/02/2014| Joe| 29| | |
# +----------+----+----------+ | |
df.dtypes | |
# Out[14]: [('date', 'string'), ('name', 'string'), ('production', 'bigint')] | |
#we define our own function that knows how to split apart a MM/DD/YYYY string and return a | |
#MM/YYYY string. everything in here is standard Python, and not specific to pySpark | |
def split_date(whole_date): | |
#this try-except handler provides some minimal fault tolerance in case one of our date | |
#strings is malformed, as we might find with real-world data. if it fails to split the | |
#date into three parts it just returns 'error', which we could later subset the data on | |
#to see what went wrong | |
try: | |
mo, day, yr = whole_date.split('/') | |
except ValueError: | |
return 'error' | |
#lastly we return the month and year strings joined together | |
return mo + '/' + yr | |
#this is where we wrap the function we wrote above in the udf wrapper | |
udf_split_date = udf(split_date) | |
#here we create a new dataframe by calling the original dataframe and specifying the new | |
#column. unlike with Pandas or R, pySpark dataframes are immutable, so we cannot simply assign | |
#to a new column on the original dataframe | |
df_new = df.withColumn('month_year', udf_split_date('date')) | |
df_new.show() | |
# +----------+----+----------+----------+ | |
# | date|name|production|month_year| | |
# +----------+----+----------+----------+ | |
# |08/01/2014| Kim| 5| 08/2014| | |
# |08/02/2014| Kim| 14| 08/2014| | |
# |08/01/2014| Bob| 6| 08/2014| | |
# |08/02/2014| Bob| 3| 08/2014| | |
# |08/01/2014| Sue| 0| 08/2014| | |
# |08/02/2014| Sue| 22| 08/2014| | |
# |08/01/2014| Dan| 4| 08/2014| | |
# |08/02/2014| Dan| 4| 08/2014| | |
# |08/01/2014| Joe| 37| 08/2014| | |
# |09/01/2014| Kim| 6| 09/2014| | |
# |09/02/2014| Kim| 6| 09/2014| | |
# |09/01/2014| Bob| 4| 09/2014| | |
# |09/02/2014| Bob| 20| 09/2014| | |
# |09/01/2014| Sue| 11| 09/2014| | |
# |09/02/2014| Sue| 2| 09/2014| | |
# |09/01/2014| Dan| 1| 09/2014| | |
# |09/02/2014| Dan| 3| 09/2014| | |
# |09/02/2014| Joe| 29| 09/2014| | |
# +----------+----+----------+----------+ | |
print(udf_split_date) | |
<function split_date at 0x7fabfb1bfbf8> | |
# How to drop column in PySpark | |
df_new = df_new.drop('date') | |
# GroupBy Operation in PySpark (Similar to Pandas) | |
df_agg = df_new.groupBy('month_year', 'name').agg({'production' : 'sum'}) | |
df_agg.show() | |
# Apply the datetime.strptime function to every row in 'new_date' column | |
dateFormat = udf(lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType()) | |
df_d = df.withColumn('new_date', dateFormat(col('date'))) | |
df_d.dtypes | |
# Out[32]: | |
# [('date', 'string'), | |
# ('name', 'string'), | |
# ('production', 'bigint'), | |
# ('new_date', 'date')] | |
df_d.show() | |
# +----------+----+----------+----------+ | |
# | date|name|production| new_date| | |
# +----------+----+----------+----------+ | |
# |08/01/2014| Kim| 5|2014-01-01| | |
# |08/02/2014| Kim| 14|2014-01-02| | |
# |08/01/2014| Bob| 6|2014-01-01| | |
# |08/02/2014| Bob| 3|2014-01-02| | |
# |08/01/2014| Sue| 0|2014-01-01| | |
# |08/02/2014| Sue| 22|2014-01-02| | |
# |08/01/2014| Dan| 4|2014-01-01| | |
# |08/02/2014| Dan| 4|2014-01-02| | |
# |08/01/2014| Joe| 37|2014-01-01| | |
# |09/01/2014| Kim| 6|2014-01-01| | |
# |09/02/2014| Kim| 6|2014-01-02| | |
# |09/01/2014| Bob| 4|2014-01-01| | |
# |09/02/2014| Bob| 20|2014-01-02| | |
# |09/01/2014| Sue| 11|2014-01-01| | |
# |09/02/2014| Sue| 2|2014-01-02| | |
# |09/01/2014| Dan| 1|2014-01-01| | |
# |09/02/2014| Dan| 3|2014-01-02| | |
# |09/02/2014| Joe| 29|2014-01-02| | |
# +----------+----+----------+----------+ | |
df_d.select('new_date').take(3) | |
# [Row(new_date=datetime.date(2014, 1, 1)), | |
# Row(new_date=datetime.date(2014, 1, 2)), | |
# Row(new_date=datetime.date(2014, 1, 1))] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment