Last active
February 23, 2024 00:26
-
-
Save aahmd/87389b2fe5b5cae6f43226748fad5988 to your computer and use it in GitHub Desktop.
pyspark queries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# for records grouped by colA, colB, and colC return a df where colD is unique: | |
import pyspark.sql.functions as fn | |
df.groupBy('colA', 'colB', 'colC').agg(fn.collect_list('colD').alias('newColD'), fn.count('colD').alias('count').filter(fn.col('count') > 1)) | |
df.select(fn.explode('newColD').alias('colDUniques')).show() | |
# given a subset of columns, return a dataframe where duplciates exists for these columns: | |
from pyspark.sql import Window | |
COLS = [ | |
'col1', | |
'col2', | |
] | |
w = Window.partitionBy(COLS) | |
w2 = Window.partitionBy(COLS).orderBy(COLS) | |
# with both 'duplicate' records: | |
df = df.select("*", fn.count('id').over(w).alias('dupes')).where('dupes > 1').drop('dupes') | |
# without 'both' duplicates (only one copy of each record that is a dupe) | |
df = df.selectx( | |
"*", | |
fn.count('id').over(w).alias('dupes'), | |
fn.row_number().over(w2).alias('row_num') | |
.where( | |
'(dupes > 1) AND (row_num = 1)' | |
).drop('dupes', 'row_num') | |
# for a subset of columns, what are the not null counts: | |
from pyspark.sql.functions import isnan, when, count, col | |
_df.select([count(when(isnan(c) | col(c).isNotNull(), c)).alias(c) for c in _df.columns]).show() | |
# null counts for all columns: | |
df.select(sum((col(c).isNull()).cast("int") for c in df.columns)).alias("number_of_nulls").show() | |
# show me these columns w/ two conditional | |
df.filter(fn.when(df.birth_year == '1985', True).otherwise(False)).filter(fn.when(df.age != '32', True).otherwise(False)).select(['age', 'birth_date', 'birth_day', 'birth_month', 'birth_year']).show(50) | |
# count of a column w/ conditional | |
df.filter(fn.when(df.birth_year == '1985', True).otherwise(False)).select(['birth_year', 'state_voter_id']).count() | |
# select a new column that is made up from other columns | |
df.select( | |
fn.concat( | |
df.COLUNM, | |
fn.lit('_'), | |
fn.regexp_replace(fn.col('COLUMN_II'),"(\{)|(\})","") | |
).alias('new_rnc_id') | |
) | |
# group by individual_id, return top 10 value counts | |
df.groupBy(df['INDIVIDUAL_ID']).count().sort(fn.desc("count")).limit(10).collect() | |
# distinct col 2 when col 1 is null: | |
df.where(fn.col('COLUMN').isNull()).select('COLUMN_II').distinct().count() | |
# when column a is not null, how many values in column a match the values in column b: | |
_df.where( | |
fn.col('col_a').isNotNull() | |
).filter( | |
fn.when(_df['col_a'] == fn.lit(_df['col_b']), True).otherwise(False) | |
).select('col_a', 'col_b').limit(10).show() | |
# count of all values? | |
`df.select( | |
[fn.expr('COUNT({c})* 100 / (COUNT(*)) AS {c}'.format(c=c)) | |
for c in self.df.columns] | |
).collect()` | |
`df.select( | |
[fn.expr('SUM({c}) AS {c}'.format(c=c)) | |
for c in self.df.columns] | |
).collect()` | |
# when column A is null, show me the count of the top 20 values that exist in column B: | |
_df.where( | |
fn.col("column_a").isNull() | |
).groupBy( | |
_df['column_b'] | |
).count().sort( | |
fn.desc("count") | |
).limit(20).show() | |
# transforms: | |
import pyspark.sql.functions as fn | |
def strip_character(df: DataFrame, columns: List[Column], symbol: str) -> DataFrame: | |
strip_symbol = fn.udf(lambda col: col.replace(symbol, "")) | |
return df.select([ | |
strip_symbol(col).alias(col) if col in columns else col for col in df.columns | |
]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment