Skip to content

Instantly share code, notes, and snippets.

@princeppy
Created November 29, 2024 20:02
Show Gist options
  • Save princeppy/6a816d4450ea83838a092c61b232cf2c to your computer and use it in GitHub Desktop.
Save princeppy/6a816d4450ea83838a092c61b232cf2c to your computer and use it in GitHub Desktop.

PySpark Snippets

%%pyspark

spark #spark session
spark.sql #SQL Context
spark.catalog # Hive Context
spark.streams #Streaming (Structured) Context
spark.sparkContext # Spark Context
spark._sc # Spark Context
sc # Spark Context
%%pyspark

# Sample DataFrame
data = [
    (1, "100", 1.23, ["1","2",'3'],["3"]),
    (2, "200", 4.56, ["2","3"],["1"]),
    (3, "300", 7.89, [],["2","3"])
]
columns = ['ID', 'Amount', 'Price', "Tags","Category"]
df = spark.createDataFrame(data, columns)
df.show()

Reading Files

%%pyspark

# Reading a CSV
df = spark.read.csv("filename.csv")

df = spark.\
    read.csv(
        csv_file_path,
        header=False, 
        inferSchema=True, 
        sep='|',
        nullValue=''
    )
df = spark.\
    read.format('csv').\
    options(
        header=True, 
        inferSchema=True,
        nullValue=''
    ).\
    load(csv_file_path)


# Reading a CSV with header
df = spark.read.csv("filename.csv", header=True)

# Reading a CSV using the load method
df = spark.read.format("csv").load("filename.csv")

# Reading a CSV using the load method with header
df = spark.read.format("csv").\
                option("header", "true").\
                load("filename.csv")

# The same goes for different formats
df = spark.read.format("<file format>"). \
                load("filename.<format>")
%%pyspark

df.dtypes
df.columns
df.rdd.getNumPartitions() # check the number of partitions
df.schema # schema details - Python object
df.printSchema() # schema details - Human-readable tree structure
df.describe().show() # basic summary statistics
df.summary().show() # extended set of descriptive statistics

Col operation

%%pyspark

from pyspark.sql import types as T
from pyspark.sql.functions import col, expr, column, lit, regexp_replace, isnull

# Selecting a single column
df.select(col("Amount")).show()

# Select multiple columns from your DataFrame
df.select("Amount", "Price").show()

# Using the `expr()` function:
df.selectExpr("Amount as newAmount", "Price * 2").show()
df.withColumn("newPrice", expr("Price * 2")).show()

# Using the `column()` function:
df.select(column("Amount")).show()

# Adding a column with constant value
df.withColumn('status', lit('PASS')).show()

#replace '100' with '10' in Amount column
df.withColumn('Amount', regexp_replace('Amount', '100', '10')).show()


# Select a column and add 1 to every entry
df.select(df.Price + 1).show()

Renaming

%%pyspark

df.withColumnRenamed("Price", "newPrice").show()

df1 = df;
old_names = ['ID', 'Amount', 'Price', "Tags","Category"]
new_names = ['_ID', '_Amount', '_Price', "_Tags","_Category"]
for old, new in zip(old_names, new_names):
    df1 = df1.withColumnRenamed(old, new)
df1.show()

Filtering rows

%%pyspark

df.where(col("Price") > 1).show()

# filtering with multiple conditions
df.filter((col("Price") > 1) & (col("Amount") == 100)).show()
df.where((col("Price") > 1) | (col("Amount") == 100)).show()
print(df.where(df['Price'].isNull()).count())
df = df.fillna(0, subset=['Price'])
df.where(df['Price'].isNull()).show() # show the subset where the value for 'price' has been replaced with null:

# filter on nested struct column
# df.where((col("address.city") == "New York"))
# df.where(getField(col("address"), "city") == "New York")

# filetring on array columns
from pyspark.sql.functions import array, array_contains, array_intersect, array_union, array_except, size
df.filter(array_contains(col("Tags"), "1")).show()
df.filter(size(array_intersect(col("Tags"), col("Category"))) > 0).show()
df.filter(array_contains(array_union(col("Tags"), col("Category")), "1")).show()
df.filter(array_except(col("Tags"),col("Category")).isNotNull()).show()
%%pyspark

def count_nulls(df):
    null_counts = []          #make an empty list to hold our results
    for col in df.dtypes:     #iterate through the column data types we saw above, e.g. ('C0', 'bigint')
        cname = col[0]        #splits out the column name, e.g. 'C0'    
        ctype = col[1]        #splits out the column type, e.g. 'bigint'
        if ctype != 'string': #skip processing string columns for efficiency (can't have nulls)
            nulls = df.where( df[cname].isNull() ).count()
            result = tuple([cname, nulls])  #new tuple, (column name, null count)
            null_counts.append(result)      #put the new tuple in our result list
    return null_counts

null_counts = count_nulls(df)
%%pyspark

## Array operations 

from pyspark.sql import functions as F, types as T

# Column Array - F.array(*cols)
df = df.withColumn('full_name', F.array('fname', 'lname'))

# Empty Array
df = df.withColumn('empty_array_column', F.array([]))

# Get element at index
df = df.withColumn('first_element', F.col("my_array").getItem(0))

# Array Size/Length
df = df.withColumn('array_length', F.size('my_array'))

# Flatten Array
df = df.withColumn('flattened', F.flatten('my_array'))

# Unique/Distinct
df = df.withColumn('unique_elements', F.array_distinct('my_array'))

# Map over & transform array elements
# ["This", "is", "very", "verbose"] -> [4, 2, 4, 7]
df = df.withColumn("len_col", transform("array_col", lambda x: length(x)))

# Explode & collect
from pyspark.sql.functions import explode, length, collect_list
df = df.\
    withColumn("col_temp", explode("array_col")).\
    withColumn("len_col_temp", length("col_temp")).\
    groupBy("unique_id").\
    agg(collect_list("len_col_temp").alias("len_col"))
%%pyspark

## Aggregate operations
df = df.\
    groupBy('gender').\
    agg(F.max('age').alias('max_age_by_gender'))

df = df.\
    groupBy('age').\
    agg(F.collect_set('name').alias('person_names'))
    
%%pyspark

# Writing each partition of this dataframe into a diffrent .csv file
df.write.format('csv').\
        option("header", "true").\
        save('csv_path')

# Writing all the data into 1 partition
df.coalesce(1).write.format('csv').\
        option("header", "true").\
        save('csv_path')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment