Skip to content

Instantly share code, notes, and snippets.

@korkridake
Created November 23, 2018 08:35
Show Gist options
  • Save korkridake/f750e10aad061f81e9644d98bee9909b to your computer and use it in GitHub Desktop.
Save korkridake/f750e10aad061f81e9644d98bee9909b to your computer and use it in GitHub Desktop.
print(spark)
# <pyspark.sql.session.SparkSession at 0x7f8df8673ba8>
# -------------------------------------------------------------------------------
# Import PySpark Libraries
# -------------------------------------------------------------------------------
import datetime
from datetime import datetime
from pyspark.sql.functions import skewness, kurtosis
from pyspark.sql.functions import var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile
#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and
#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should
#be more clear after we use it below
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql import DataFrame
from pyspark.sql import Row
from functools import reduce
row = Row("name", "pet", "count")
df1 = sc.parallelize([
row("Sue", "cat", 16),
row("Kim", "dog", 1),
row("Bob", "fish", 5)
]).toDF()
df2 = sc.parallelize([
row("Fred", "cat", 2),
row("Kate", "ant", 179),
row("Marc", "lizard", 5)
]).toDF()
df3 = sc.parallelize([
row("Sarah", "shark", 3),
row("Jason", "kids", 2),
row("Scott", "squirrel", 1)
]).toDF()
df1.show()
df2.show()
df3.show()
# +----+----+-----+
# |name| pet|count|
# +----+----+-----+
# | Sue| cat| 16|
# | Kim| dog| 1|
# | Bob|fish| 5|
# +----+----+-----+
#
# +----+------+-----+
# |name| pet|count|
# +----+------+-----+
# |Fred| cat| 2|
# |Kate| ant| 179|
# |Marc|lizard| 5|
# +----+------+-----+
#
# +-----+--------+-----+
# | name| pet|count|
# +-----+--------+-----+
# |Sarah| shark| 3|
# |Jason| kids| 2|
# |Scott|squirrel| 1|
# +-----+--------+-----+
# If we just want to stack two of them, we can use unionAll
df_union = df1.unionAll(df2)
df_union.show()
# +----+------+-----+
# |name| pet|count|
# +----+------+-----+
# | Sue| cat| 16|
# | Kim| dog| 1|
# | Bob| fish| 5|
# |Fred| cat| 2|
# |Kate| ant| 179|
# |Marc|lizard| 5|
# +----+------+-----+
def union_many(*dfs):
#this function can have as many dataframes as you want passed into it
#the asterics before the name `dfs` tells Python that `dfs` will be a list
#containing all of the arguments we pass into union_many when it is called
return reduce(DataFrame.unionAll, dfs)
df_union = union_many(df1, df2, df3)
df_union.show()
# +-----+--------+-----+
# | name| pet|count|
# +-----+--------+-----+
# | Sue| cat| 16|
# | Kim| dog| 1|
# | Bob| fish| 5|
# | Fred| cat| 2|
# | Kate| ant| 179|
# | Marc| lizard| 5|
# |Sarah| shark| 3|
# |Jason| kids| 2|
# |Scott|squirrel| 1|
# +-----+--------+-----+
row1 = Row("name", "pet", "count")
row2 = Row("name", "pet2", "count2")
df1 = sc.parallelize([
row1("Sue", "cat", 16),
row1("Kim", "dog", 1),
row1("Bob", "fish", 5),
row1("Libuse", "horse", 1)
]).toDF()
df2 = sc.parallelize([
row2("Sue", "eagle", 2),
row2("Kim", "ant", 179),
row2("Bob", "lizard", 5),
row2("Ferdinand", "bees", 23)
]).toDF()
df1.show()
df2.show()
# +------+-----+-----+
# | name| pet|count|
# +------+-----+-----+
# | Sue| cat| 16|
# | Kim| dog| 1|
# | Bob| fish| 5|
# |Libuse|horse| 1|
# +------+-----+-----+
#
# +---------+------+------+
# | name| pet2|count2|
# +---------+------+------+
# | Sue| eagle| 2|
# | Kim| ant| 179|
# | Bob|lizard| 5|
# |Ferdinand| bees| 23|
# +---------+------+------+
df1.join(df2, 'name', how='inner').show()
# +----+----+-----+------+------+
# |name| pet|count| pet2|count2|
# +----+----+-----+------+------+
# | Sue| cat| 16| eagle| 2|
# | Bob|fish| 5|lizard| 5|
# | Kim| dog| 1| ant| 179|
# +----+----+-----+------+------+
df1.join(df2, 'name', how='outer').show()
# +---------+-----+-----+------+------+
# | name| pet|count| pet2|count2|
# +---------+-----+-----+------+------+
# | Sue| cat| 16| eagle| 2|
# |Ferdinand| null| null| bees| 23|
# | Bob| fish| 5|lizard| 5|
# | Kim| dog| 1| ant| 179|
# | Libuse|horse| 1| null| null|
# +---------+-----+-----+------+------+
df1.join(df2, 'name', how='left').show()
# +------+-----+-----+------+------+
# | name| pet|count| pet2|count2|
# +------+-----+-----+------+------+
# | Sue| cat| 16| eagle| 2|
# | Bob| fish| 5|lizard| 5|
# | Kim| dog| 1| ant| 179|
# |Libuse|horse| 1| null| null|
# +------+-----+-----+------+------+
df1.join(df2, 'name', how='right').show()
# +---------+----+-----+------+------+
# | name| pet|count| pet2|count2|
# +---------+----+-----+------+------+
# | Sue| cat| 16| eagle| 2|
# |Ferdinand|null| null| bees| 23|
# | Bob|fish| 5|lizard| 5|
# | Kim| dog| 1| ant| 179|
# +---------+----+-----+------+------+
# -----------------------------------------------------------------------------------------------------------
# What if I have something like this?
# -----------------------------------------------------------------------------------------------------------
a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c'], ['p4', 'd'], ['p5', 'e']]).toDF(["other", "b_id"])
c = a.join(b, a.a_id == b.b_id, how = 'right')
a.show()
b.show()
c.show()
# +----+-----+
# |a_id|extra|
# +----+-----+
# | a| foo|
# | b| hem|
# | c| haw|
# +----+-----+
#
# +-----+----+
# |other|b_id|
# +-----+----+
# | p1| a|
# | p2| b|
# | p3| c|
# | p4| d|
# | p5| e|
# +-----+----+
#
# +----+-----+-----+----+
# |a_id|extra|other|b_id|
# +----+-----+-----+----+
# |null| null| p5| e|
# |null| null| p4| d|
# | c| haw| p3| c|
# | b| hem| p2| b|
# | a| foo| p1| a|
# +----+-----+-----+----+
# -----------------------------------------------------------------------------------------------------------
# To prevent additional IDs, we should rename to match what we have in SparkDataFrame A.
# https://stackoverflow.com/questions/38798567/pyspark-rename-more-than-one-column-using-withcolumnrenamed
# -----------------------------------------------------------------------------------------------------------
def rename_columns(df, columns):
if isinstance(columns, dict):
for old_name, new_name in columns.items():
df = df.withColumnRenamed(old_name, new_name)
return df
else:
raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")
b = rename_columns(b, {'b_id': 'a_id'})
b.show()
# +-----+----+
# |other|a_id|
# +-----+----+
# | p1| a|
# | p2| b|
# | p3| c|
# | p4| d|
# | p5| e|
# +-----+----+
a.join(b, 'a_id', how='right').show()
# +----+-----+-----+
# |a_id|extra|other|
# +----+-----+-----+
# | e| null| p5|
# | d| null| p4|
# | c| haw| p3|
# | b| hem| p2|
# | a| foo| p1|
# +----+-----+-----+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment