Skip to content

Instantly share code, notes, and snippets.

@korkridake
Last active October 3, 2022 11:25
Show Gist options
  • Save korkridake/07eb243bd3b6b6ae8e23f30c4b507fb0 to your computer and use it in GitHub Desktop.
Save korkridake/07eb243bd3b6b6ae8e23f30c4b507fb0 to your computer and use it in GitHub Desktop.
Important PySpark functions to work with dataframes
print(sc)
print(spark)
# <pyspark.sql.session.SparkSession at 0x7f8df8673ba8>
# -------------------------------------------------------------------------------
# Import PySpark Libraries
# -------------------------------------------------------------------------------
import math
import re
import pandas as pd
import numpy as np
import datetime
from pyspark.sql.functions import skewness, kurtosis, var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile, udf, col, desc
from pyspark.sql.functions import split, explode, substring, upper, trim, lit, length, regexp_replace, col, when, desc, concat, coalesce, countDistinct, expr
#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and
#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should
#be more clear after we use it below
from pyspark.sql.types import IntegerType, StringType, DateType
from pyspark.sql.types import StructField, StringType, StructType
from pyspark.sql import DataFrame, Row
from functools import reduce
# <pyspark.sql.session.SparkSession object at 0x7f7e362177f0>
# -------------------------------------------------------------------------------
# -------------------------------------------------------------------------------
# -------------------------------------------------------------------------------
# Please refer to: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html for full reference.
# -------------------------------------------------------------------------------
# -------------------------------------------------------------------------------
# -------------------------------------------------------------------------------
# -------------------------------------------------------------------------------
# How to select columns?
# -------------------------------------------------------------------------------
# Assume that you have pre-loaded the dataframe called 'df'
# Please replace 'COL1' and 'COL2'
df.select(['COL1', 'COL2'])
# -------------------------------------------------------------------------------
# How to trim the whitespaces?
# How to uppercase the elements in the feature?
# -------------------------------------------------------------------------------
df.withColumn('COL1', trim(upper(df['COL1'])))
# -------------------------------------------------------------------------------
# How to filter for non-null rows in the feature?
# -------------------------------------------------------------------------------
df.where(df['COL1'] != 'NaN')
# -------------------------------------------------------------------------------
# How to drop missing rows from the dataframe?
# dropna(): returns a new DataFrame omitting rows with null values.
# -------------------------------------------------------------------------------
df.dropna()
# -------------------------------------------------------------------------------
# split: splits str around pattern (pattern is a regular expression). In this case, by ','
# explode: returns a new row for each element in the given array or map.
# toDF: returns a new class:DataFrame that with new specified column names
# -------------------------------------------------------------------------------
df.select(df['COL1'], explode(split(df['COL2'], ';'))).toDF(*['COL1', 'COL2'])
# -------------------------------------------------------------------------------
# lit: creates a Column of literal value.
# -------------------------------------------------------------------------------
df.withColumn('COL1', lit(1))
# -------------------------------------------------------------------------------
# Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and
# DataFrameNaFunctions.drop() are aliases of each other.
# Parameters:
# how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
# thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
# subset – optional list of column names to consider.
# -------------------------------------------------------------------------------
df.dropna(subset=['COL1'], how='any', thresh=1)
df.dropna(subset =['COL1', 'COL2'] , how = 'all' , inplace = True)
# -------------------------------------------------------------------------------
# Return df column names and data types
# -------------------------------------------------------------------------------
df.dtypes
# -------------------------------------------------------------------------------
# Return the first n rows (return the first 2 rows in the below example)
# -------------------------------------------------------------------------------
df.take(2)
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
# Choose all columns except prospect_id and cast data type to 'float'
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
### Method 1
### Source: https://stackoverflow.com/questions/40478018/pyspark-dataframe-convert-multiple-columns-to-float
### --------------------------------------------------------------------------
### cols = df.columns[1:]
for col_name in cols:
print('cast data type to float for: ', col_name)
df = df.withColumn(col_name, col(col_name).cast('float'))
### --------------------------------------------------------------------------
### Method 2
### Source: https://stackoverflow.com/questions/40478018/pyspark-dataframe-convert-multiple-columns-to-float
### --------------------------------------------------------------------------
df = df.select(*(col(c).cast("float").alias(c) for c in df.columns)) # cast to float for all columns!
df = df.withColumn('COL1', df['COL1'].astype('string')) # cast to float for a single column!
# -------------------------------------------------------------------------------
### Apply the function to every row in COL1
### The strip() method removes any whitespace from the beginning or the end
### The upper() method returns the string in upper case
# -------------------------------------------------------------------------------
df['COL1'] = df['COL1'].apply(lambda x: x.upper().strip())
# -------------------------------------------------------------------------------
### Drop duplicates in COL1, COL2, and COL3
# -------------------------------------------------------------------------------
df = df.drop_duplicates(['COL1', 'COL2', 'COL3'])
# -------------------------------------------------------------------------------
### Create another column by mutating COL1 and COL2
# -------------------------------------------------------------------------------
df = df.withColumn('NEW_COL', concat(coalesce(col('COL1'), lit('')), lit('_'), coalesce(col('COL2'), lit(''))))
# -------------------------------------------------------------------------------
### Create another column by doing the regular expression on COL1
# -------------------------------------------------------------------------------
df = df.withColumn('NEW_COL', regexp_replace(df['COL1'],'REG_PATTERN','')) # For example, the pattern is [^A-Za-z0-9]+
# -------------------------------------------------------------------------------
### Joining in PySpark (by 4 Columns)
# -------------------------------------------------------------------------------
df1.join(df2, [col('COL1_df1') == col('COL1_df2'),
col('COL2_df1')==col('COL2_df2'),
col('COL3_df1')==col('COL3_df2'),
col('COL4_df1')==col('COL4_df2')],
how='inner')
# -------------------------------------------------------------------------------
### Joining in PySpark (by a single column)
# -------------------------------------------------------------------------------
df1.join(df2, [col('COL1_df1') == col('COL1_df2')], how='inner')
# -------------------------------------------------------------------------------
### Ifelse in PySpark
#### If the condition is satisfied, then 1, otherwise 0
# -------------------------------------------------------------------------------
df = df.withColumn('NEW_COL', when(col('COL1') < col('COL2'), 1).otherwise(0))
# -------------------------------------------------------------------------------
### Filter in PySpark
### In this example, filter only row with COL1 > 29
# -------------------------------------------------------------------------------
df = df.filter(df['COL1'] > 29)
# -------------------------------------------------------------------------------
### Data Dimensionality in PySpark
# -------------------------------------------------------------------------------
print('-' * 150)
print('the number of rows: ',df.count())
print('the number of columns: ', len(df.columns))
print('-' * 150)
# -------------------------------------------------------------------------------
### Number of Unique Rows in COL1
# -------------------------------------------------------------------------------
df.select('COL1').distinct().count())
@korkridake
Copy link
Author

@korkridake
Copy link
Author

Disclosure statement: [NAME] does not work or receive funding from any company or organization that would benefit from this article. Views expressed here are personal and not supported by university or company.

@korkridake
Copy link
Author

@korkridake
Copy link
Author

How to show full column content in a Spark Dataframe?
https://stackoverflow.com/questions/33742895/how-to-show-full-column-content-in-a-spark-dataframe

results.show(20, False) will not truncate.

@korkridake
Copy link
Author

# -------------------------------------------------------------------------------
# Convert the list into the SparkDataFrame 
# -------------------------------------------------------------------------------
list_of_column_names_spark = spark.createDataFrame(list_of_column_names, StringType())
list_of_column_names_spark = list_of_column_names_spark .selectExpr("value as col_name")
display(list_of_column_names_spark )

@korkridake
Copy link
Author

@korkridake
Copy link
Author

@hariumesh3
Copy link

Hi.. is there any way to add 'double quotes' to a string from a dataframe- basically to make a valid string

+------------------------+---------+-------------------------------------------------------------------------------------------+
|time |IP |JSON |
+------------------------+---------+-------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}|

Here which function can use to convert "Keys" value(9serverkey) to proper json string.. ie it should convert to
"Keys":"-9serverkey"

i tried this way

df.withColumn("JSON", F.regexp_replace(F.col("JSON"), r'"Keys":([-][^,]+)', '"Keys":"$1"'))

but its not converting to the string..

Appreciated if you can let us which function can apply for this?
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment