%%pyspark
spark #spark session
spark.sql #SQL Context
spark.catalog # Hive Context
spark.streams #Streaming (Structured) Context
spark.sparkContext # Spark Context
spark._sc # Spark Context
sc # Spark Context
%%pyspark
# Sample DataFrame
data = [
(1, "100", 1.23, ["1","2",'3'],["3"]),
(2, "200", 4.56, ["2","3"],["1"]),
(3, "300", 7.89, [],["2","3"])
]
columns = ['ID', 'Amount', 'Price', "Tags","Category"]
df = spark.createDataFrame(data, columns)
df.show()
%%pyspark
# Reading a CSV
df = spark.read.csv("filename.csv")
df = spark.\
read.csv(
csv_file_path,
header=False,
inferSchema=True,
sep='|',
nullValue=''
)
df = spark.\
read.format('csv').\
options(
header=True,
inferSchema=True,
nullValue=''
).\
load(csv_file_path)
# Reading a CSV with header
df = spark.read.csv("filename.csv", header=True)
# Reading a CSV using the load method
df = spark.read.format("csv").load("filename.csv")
# Reading a CSV using the load method with header
df = spark.read.format("csv").\
option("header", "true").\
load("filename.csv")
# The same goes for different formats
df = spark.read.format("<file format>"). \
load("filename.<format>")
%%pyspark
df.dtypes
df.columns
df.rdd.getNumPartitions() # check the number of partitions
df.schema # schema details - Python object
df.printSchema() # schema details - Human-readable tree structure
df.describe().show() # basic summary statistics
df.summary().show() # extended set of descriptive statistics
%%pyspark
from pyspark.sql import types as T
from pyspark.sql.functions import col, expr, column, lit, regexp_replace, isnull
# Selecting a single column
df.select(col("Amount")).show()
# Select multiple columns from your DataFrame
df.select("Amount", "Price").show()
# Using the `expr()` function:
df.selectExpr("Amount as newAmount", "Price * 2").show()
df.withColumn("newPrice", expr("Price * 2")).show()
# Using the `column()` function:
df.select(column("Amount")).show()
# Adding a column with constant value
df.withColumn('status', lit('PASS')).show()
#replace '100' with '10' in Amount column
df.withColumn('Amount', regexp_replace('Amount', '100', '10')).show()
# Select a column and add 1 to every entry
df.select(df.Price + 1).show()
%%pyspark
df.withColumnRenamed("Price", "newPrice").show()
df1 = df;
old_names = ['ID', 'Amount', 'Price', "Tags","Category"]
new_names = ['_ID', '_Amount', '_Price', "_Tags","_Category"]
for old, new in zip(old_names, new_names):
df1 = df1.withColumnRenamed(old, new)
df1.show()
%%pyspark
df.where(col("Price") > 1).show()
# filtering with multiple conditions
df.filter((col("Price") > 1) & (col("Amount") == 100)).show()
df.where((col("Price") > 1) | (col("Amount") == 100)).show()
print(df.where(df['Price'].isNull()).count())
df = df.fillna(0, subset=['Price'])
df.where(df['Price'].isNull()).show() # show the subset where the value for 'price' has been replaced with null:
# filter on nested struct column
# df.where((col("address.city") == "New York"))
# df.where(getField(col("address"), "city") == "New York")
# filetring on array columns
from pyspark.sql.functions import array, array_contains, array_intersect, array_union, array_except, size
df.filter(array_contains(col("Tags"), "1")).show()
df.filter(size(array_intersect(col("Tags"), col("Category"))) > 0).show()
df.filter(array_contains(array_union(col("Tags"), col("Category")), "1")).show()
df.filter(array_except(col("Tags"),col("Category")).isNotNull()).show()
%%pyspark
def count_nulls(df):
null_counts = [] #make an empty list to hold our results
for col in df.dtypes: #iterate through the column data types we saw above, e.g. ('C0', 'bigint')
cname = col[0] #splits out the column name, e.g. 'C0'
ctype = col[1] #splits out the column type, e.g. 'bigint'
if ctype != 'string': #skip processing string columns for efficiency (can't have nulls)
nulls = df.where( df[cname].isNull() ).count()
result = tuple([cname, nulls]) #new tuple, (column name, null count)
null_counts.append(result) #put the new tuple in our result list
return null_counts
null_counts = count_nulls(df)
%%pyspark
## Array operations
from pyspark.sql import functions as F, types as T
# Column Array - F.array(*cols)
df = df.withColumn('full_name', F.array('fname', 'lname'))
# Empty Array
df = df.withColumn('empty_array_column', F.array([]))
# Get element at index
df = df.withColumn('first_element', F.col("my_array").getItem(0))
# Array Size/Length
df = df.withColumn('array_length', F.size('my_array'))
# Flatten Array
df = df.withColumn('flattened', F.flatten('my_array'))
# Unique/Distinct
df = df.withColumn('unique_elements', F.array_distinct('my_array'))
# Map over & transform array elements
# ["This", "is", "very", "verbose"] -> [4, 2, 4, 7]
df = df.withColumn("len_col", transform("array_col", lambda x: length(x)))
# Explode & collect
from pyspark.sql.functions import explode, length, collect_list
df = df.\
withColumn("col_temp", explode("array_col")).\
withColumn("len_col_temp", length("col_temp")).\
groupBy("unique_id").\
agg(collect_list("len_col_temp").alias("len_col"))
%%pyspark
## Aggregate operations
df = df.\
groupBy('gender').\
agg(F.max('age').alias('max_age_by_gender'))
df = df.\
groupBy('age').\
agg(F.collect_set('name').alias('person_names'))
%%pyspark
# Writing each partition of this dataframe into a diffrent .csv file
df.write.format('csv').\
option("header", "true").\
save('csv_path')
# Writing all the data into 1 partition
df.coalesce(1).write.format('csv').\
option("header", "true").\
save('csv_path')