Skip to content

Instantly share code, notes, and snippets.

@aahmd
Last active February 23, 2024 00:26
Show Gist options
  • Save aahmd/87389b2fe5b5cae6f43226748fad5988 to your computer and use it in GitHub Desktop.
Save aahmd/87389b2fe5b5cae6f43226748fad5988 to your computer and use it in GitHub Desktop.
pyspark queries
# for records grouped by colA, colB, and colC return a df where colD is unique:
import pyspark.sql.functions as fn
df.groupBy('colA', 'colB', 'colC').agg(fn.collect_list('colD').alias('newColD'), fn.count('colD').alias('count').filter(fn.col('count') > 1))
df.select(fn.explode('newColD').alias('colDUniques')).show()
# given a subset of columns, return a dataframe where duplciates exists for these columns:
from pyspark.sql import Window
COLS = [
'col1',
'col2',
]
w = Window.partitionBy(COLS)
w2 = Window.partitionBy(COLS).orderBy(COLS)
# with both 'duplicate' records:
df = df.select("*", fn.count('id').over(w).alias('dupes')).where('dupes > 1').drop('dupes')
# without 'both' duplicates (only one copy of each record that is a dupe)
df = df.selectx(
"*",
fn.count('id').over(w).alias('dupes'),
fn.row_number().over(w2).alias('row_num')
.where(
'(dupes > 1) AND (row_num = 1)'
).drop('dupes', 'row_num')
# for a subset of columns, what are the not null counts:
from pyspark.sql.functions import isnan, when, count, col
_df.select([count(when(isnan(c) | col(c).isNotNull(), c)).alias(c) for c in _df.columns]).show()
# null counts for all columns:
df.select(sum((col(c).isNull()).cast("int") for c in df.columns)).alias("number_of_nulls").show()
# show me these columns w/ two conditional
df.filter(fn.when(df.birth_year == '1985', True).otherwise(False)).filter(fn.when(df.age != '32', True).otherwise(False)).select(['age', 'birth_date', 'birth_day', 'birth_month', 'birth_year']).show(50)
# count of a column w/ conditional
df.filter(fn.when(df.birth_year == '1985', True).otherwise(False)).select(['birth_year', 'state_voter_id']).count()
# select a new column that is made up from other columns
df.select(
fn.concat(
df.COLUNM,
fn.lit('_'),
fn.regexp_replace(fn.col('COLUMN_II'),"(\{)|(\})","")
).alias('new_rnc_id')
)
# group by individual_id, return top 10 value counts
df.groupBy(df['INDIVIDUAL_ID']).count().sort(fn.desc("count")).limit(10).collect()
# distinct col 2 when col 1 is null:
df.where(fn.col('COLUMN').isNull()).select('COLUMN_II').distinct().count()
# when column a is not null, how many values in column a match the values in column b:
_df.where(
fn.col('col_a').isNotNull()
).filter(
fn.when(_df['col_a'] == fn.lit(_df['col_b']), True).otherwise(False)
).select('col_a', 'col_b').limit(10).show()
# count of all values?
`df.select(
[fn.expr('COUNT({c})* 100 / (COUNT(*)) AS {c}'.format(c=c))
for c in self.df.columns]
).collect()`
`df.select(
[fn.expr('SUM({c}) AS {c}'.format(c=c))
for c in self.df.columns]
).collect()`
# when column A is null, show me the count of the top 20 values that exist in column B:
_df.where(
fn.col("column_a").isNull()
).groupBy(
_df['column_b']
).count().sort(
fn.desc("count")
).limit(20).show()
# transforms:
import pyspark.sql.functions as fn
def strip_character(df: DataFrame, columns: List[Column], symbol: str) -> DataFrame:
strip_symbol = fn.udf(lambda col: col.replace(symbol, ""))
return df.select([
strip_symbol(col).alias(col) if col in columns else col for col in df.columns
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment