Created
November 23, 2018 08:35
-
-
Save korkridake/f750e10aad061f81e9644d98bee9909b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
print(spark) | |
# <pyspark.sql.session.SparkSession at 0x7f8df8673ba8> | |
# ------------------------------------------------------------------------------- | |
# Import PySpark Libraries | |
# ------------------------------------------------------------------------------- | |
import datetime | |
from datetime import datetime | |
from pyspark.sql.functions import skewness, kurtosis | |
from pyspark.sql.functions import var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile | |
#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and | |
#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should | |
#be more clear after we use it below | |
from pyspark.sql.functions import udf | |
from pyspark.sql.functions import col | |
from pyspark.sql.types import IntegerType | |
from pyspark.sql.types import StringType | |
from pyspark.sql.types import DateType | |
from pyspark.sql import DataFrame | |
from pyspark.sql import Row | |
from functools import reduce | |
row = Row("name", "pet", "count") | |
df1 = sc.parallelize([ | |
row("Sue", "cat", 16), | |
row("Kim", "dog", 1), | |
row("Bob", "fish", 5) | |
]).toDF() | |
df2 = sc.parallelize([ | |
row("Fred", "cat", 2), | |
row("Kate", "ant", 179), | |
row("Marc", "lizard", 5) | |
]).toDF() | |
df3 = sc.parallelize([ | |
row("Sarah", "shark", 3), | |
row("Jason", "kids", 2), | |
row("Scott", "squirrel", 1) | |
]).toDF() | |
df1.show() | |
df2.show() | |
df3.show() | |
# +----+----+-----+ | |
# |name| pet|count| | |
# +----+----+-----+ | |
# | Sue| cat| 16| | |
# | Kim| dog| 1| | |
# | Bob|fish| 5| | |
# +----+----+-----+ | |
# | |
# +----+------+-----+ | |
# |name| pet|count| | |
# +----+------+-----+ | |
# |Fred| cat| 2| | |
# |Kate| ant| 179| | |
# |Marc|lizard| 5| | |
# +----+------+-----+ | |
# | |
# +-----+--------+-----+ | |
# | name| pet|count| | |
# +-----+--------+-----+ | |
# |Sarah| shark| 3| | |
# |Jason| kids| 2| | |
# |Scott|squirrel| 1| | |
# +-----+--------+-----+ | |
# If we just want to stack two of them, we can use unionAll | |
df_union = df1.unionAll(df2) | |
df_union.show() | |
# +----+------+-----+ | |
# |name| pet|count| | |
# +----+------+-----+ | |
# | Sue| cat| 16| | |
# | Kim| dog| 1| | |
# | Bob| fish| 5| | |
# |Fred| cat| 2| | |
# |Kate| ant| 179| | |
# |Marc|lizard| 5| | |
# +----+------+-----+ | |
def union_many(*dfs): | |
#this function can have as many dataframes as you want passed into it | |
#the asterics before the name `dfs` tells Python that `dfs` will be a list | |
#containing all of the arguments we pass into union_many when it is called | |
return reduce(DataFrame.unionAll, dfs) | |
df_union = union_many(df1, df2, df3) | |
df_union.show() | |
# +-----+--------+-----+ | |
# | name| pet|count| | |
# +-----+--------+-----+ | |
# | Sue| cat| 16| | |
# | Kim| dog| 1| | |
# | Bob| fish| 5| | |
# | Fred| cat| 2| | |
# | Kate| ant| 179| | |
# | Marc| lizard| 5| | |
# |Sarah| shark| 3| | |
# |Jason| kids| 2| | |
# |Scott|squirrel| 1| | |
# +-----+--------+-----+ | |
row1 = Row("name", "pet", "count") | |
row2 = Row("name", "pet2", "count2") | |
df1 = sc.parallelize([ | |
row1("Sue", "cat", 16), | |
row1("Kim", "dog", 1), | |
row1("Bob", "fish", 5), | |
row1("Libuse", "horse", 1) | |
]).toDF() | |
df2 = sc.parallelize([ | |
row2("Sue", "eagle", 2), | |
row2("Kim", "ant", 179), | |
row2("Bob", "lizard", 5), | |
row2("Ferdinand", "bees", 23) | |
]).toDF() | |
df1.show() | |
df2.show() | |
# +------+-----+-----+ | |
# | name| pet|count| | |
# +------+-----+-----+ | |
# | Sue| cat| 16| | |
# | Kim| dog| 1| | |
# | Bob| fish| 5| | |
# |Libuse|horse| 1| | |
# +------+-----+-----+ | |
# | |
# +---------+------+------+ | |
# | name| pet2|count2| | |
# +---------+------+------+ | |
# | Sue| eagle| 2| | |
# | Kim| ant| 179| | |
# | Bob|lizard| 5| | |
# |Ferdinand| bees| 23| | |
# +---------+------+------+ | |
df1.join(df2, 'name', how='inner').show() | |
# +----+----+-----+------+------+ | |
# |name| pet|count| pet2|count2| | |
# +----+----+-----+------+------+ | |
# | Sue| cat| 16| eagle| 2| | |
# | Bob|fish| 5|lizard| 5| | |
# | Kim| dog| 1| ant| 179| | |
# +----+----+-----+------+------+ | |
df1.join(df2, 'name', how='outer').show() | |
# +---------+-----+-----+------+------+ | |
# | name| pet|count| pet2|count2| | |
# +---------+-----+-----+------+------+ | |
# | Sue| cat| 16| eagle| 2| | |
# |Ferdinand| null| null| bees| 23| | |
# | Bob| fish| 5|lizard| 5| | |
# | Kim| dog| 1| ant| 179| | |
# | Libuse|horse| 1| null| null| | |
# +---------+-----+-----+------+------+ | |
df1.join(df2, 'name', how='left').show() | |
# +------+-----+-----+------+------+ | |
# | name| pet|count| pet2|count2| | |
# +------+-----+-----+------+------+ | |
# | Sue| cat| 16| eagle| 2| | |
# | Bob| fish| 5|lizard| 5| | |
# | Kim| dog| 1| ant| 179| | |
# |Libuse|horse| 1| null| null| | |
# +------+-----+-----+------+------+ | |
df1.join(df2, 'name', how='right').show() | |
# +---------+----+-----+------+------+ | |
# | name| pet|count| pet2|count2| | |
# +---------+----+-----+------+------+ | |
# | Sue| cat| 16| eagle| 2| | |
# |Ferdinand|null| null| bees| 23| | |
# | Bob|fish| 5|lizard| 5| | |
# | Kim| dog| 1| ant| 179| | |
# +---------+----+-----+------+------+ | |
# ----------------------------------------------------------------------------------------------------------- | |
# What if I have something like this? | |
# ----------------------------------------------------------------------------------------------------------- | |
a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra']) | |
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c'], ['p4', 'd'], ['p5', 'e']]).toDF(["other", "b_id"]) | |
c = a.join(b, a.a_id == b.b_id, how = 'right') | |
a.show() | |
b.show() | |
c.show() | |
# +----+-----+ | |
# |a_id|extra| | |
# +----+-----+ | |
# | a| foo| | |
# | b| hem| | |
# | c| haw| | |
# +----+-----+ | |
# | |
# +-----+----+ | |
# |other|b_id| | |
# +-----+----+ | |
# | p1| a| | |
# | p2| b| | |
# | p3| c| | |
# | p4| d| | |
# | p5| e| | |
# +-----+----+ | |
# | |
# +----+-----+-----+----+ | |
# |a_id|extra|other|b_id| | |
# +----+-----+-----+----+ | |
# |null| null| p5| e| | |
# |null| null| p4| d| | |
# | c| haw| p3| c| | |
# | b| hem| p2| b| | |
# | a| foo| p1| a| | |
# +----+-----+-----+----+ | |
# ----------------------------------------------------------------------------------------------------------- | |
# To prevent additional IDs, we should rename to match what we have in SparkDataFrame A. | |
# https://stackoverflow.com/questions/38798567/pyspark-rename-more-than-one-column-using-withcolumnrenamed | |
# ----------------------------------------------------------------------------------------------------------- | |
def rename_columns(df, columns): | |
if isinstance(columns, dict): | |
for old_name, new_name in columns.items(): | |
df = df.withColumnRenamed(old_name, new_name) | |
return df | |
else: | |
raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}") | |
b = rename_columns(b, {'b_id': 'a_id'}) | |
b.show() | |
# +-----+----+ | |
# |other|a_id| | |
# +-----+----+ | |
# | p1| a| | |
# | p2| b| | |
# | p3| c| | |
# | p4| d| | |
# | p5| e| | |
# +-----+----+ | |
a.join(b, 'a_id', how='right').show() | |
# +----+-----+-----+ | |
# |a_id|extra|other| | |
# +----+-----+-----+ | |
# | e| null| p5| | |
# | d| null| p4| | |
# | c| haw| p3| | |
# | b| hem| p2| | |
# | a| foo| p1| | |
# +----+-----+-----+ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment