Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from adrianonvls/pandas_utils.py
Created February 4, 2026 16:03
Show Gist options
  • Select an option

  • Save datavudeja/5c0369c1ee488aa1e21c48b528569392 to your computer and use it in GitHub Desktop.

Select an option

Save datavudeja/5c0369c1ee488aa1e21c48b528569392 to your computer and use it in GitHub Desktop.
Pandas util functions
import pandas as pd
from IPython.display import display
def compare_dfs_cols_and_types(df_x, df_y, df_x_name="x", df_y_name="y"):
"""Function to compare two DataFrames, checking column
names and types, prints the differences (if they exists)
and return a DataFrame with the NaNs sinalizing the mismatches.
:param df_x: First Dataframe
:type df_x: pd.DataFrame
:param df_y: Second DataFrame
:type df_y: pd.DataFrame
:param df_x_name: Name of the first table
to display on result, defaults to "x"
:type df_x_name: str, optional
:param df_y_name: Name of the second table
to display on result, defaults to "y"
:type df_y_name: str, optional
:return: Columns and types of the DataFrames merged.
:rtype: pd.DataFrame
"""
def generate_dtypes_df(df, col_names):
"""Generate a dtypes DataFrame.
:param df: DataFrame to extract the dtypes
:type df: pd.DataFrame
:param col_names: Two column names to replace in DataFrame.
:type col_names: list
:return: dtypes DataFrame
:rtype: pd.DataFrame
"""
dtypes = df.dtypes.apply(str)
dtypes = dtypes.reset_index()
dtypes.columns = col_names
dtypes = dtypes.sort_values(by=col_names[0]).reset_index(drop=True)
return dtypes
names_x = [f"{df_x_name}_cols", f"{df_x_name}_types"]
names_y = [f"{df_y_name}_cols", f"{df_y_name}_types"]
x = generate_dtypes_df(df_x, names_x)
y = generate_dtypes_df(df_y, names_y)
# merge by column names
compare = x.merge(
right=y,
left_on=names_x[0],
right_on=names_y[0],
how="outer"
)
# reorder columns
compare = compare[[names_x[0], names_y[0], names_x[1], names_y[1]]]
compare["is_equal"] = (
# x_cols == y_cols
(compare[names_x[0]] == compare[names_y[0]])
&
# x_types == y_types
(compare[names_x[1]] == compare[names_y[1]])
)
if compare["is_equal"].all():
print("The DataFrames are identical.")
elif not compare["is_equal"].any():
print("The Dataframes are completely different.")
else:
print("The DataFrames are different because of the columns:")
display(compare[compare["is_equal"] == False])
return compare
def filter_columns(df, col_list):
try:
df = df[col_list]
except KeyError as e:
e = str(e)
e = e[e.find("[") + 1 : e.find("]")].replace("'", "")
col_list.remove(e)
df = filter_columns(df, col_list)
return df
from pyspark.sql.types import *
def pandas_to_spark(pandas_df):
"""Create a spark dataframe from pandas dataframe
Args:
pandas_df (pd.DataFrame): pandas dataframe to use
Returns:
df: Returns a spark dataframe.
"""
# Monkey patch to avoid the issue with iteritems
# AttributeError: 'DataFrame' object has no attribute 'iteritems'
pd.DataFrame.iteritems = pd.DataFrame.items
def equivalent_type(_type):
_type = str(_type).lower()
if "int" in _type:
if int(re.sub("[^\d]+", "", _type)) > 32:
return LongType()
else:
return IntegerType()
elif "float" in _type:
if int(re.sub("[^\d]+", "", _type)) > 32:
return DoubleType()
else:
return FloatType()
elif "datetime" in _type:
return TimestampType()
elif "bool" in _type:
return BooleanType()
elif "byte" in _type:
return ByteType()
else:
return StringType()
def define_structure(column, format_type):
try:
_type = equivalent_type(format_type)
except Exception as e:
print(e)
_type = StringType()
return StructField(column, _type)
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = [
define_structure(column, _type) for column, _type in zip(columns, types)
]
p_schema = StructType(struct_list)
return spark.createDataFrame(pandas_df, p_schema)
path = "./"
all_files = glob.glob(os.path.join(path, "*23.csv"))
df = pd.concat((read_files(f) for f in all_files), ignore_index=True)
df = df.sort_values(by=["trunc_day"])
def remove_outliers(df, columns, n_std=3):
for col in columns:
print(f"Working on column: {col}")
mean = df[col].mean()
sd = df[col].std()
df = df[(df[col] <= mean + (n_std * sd))]
return df
def get_period(start, end, freq="MS"):
"""Get a list of dates between start and end parameters,
with first and last day of months. Date format: "%Y-%m-%d",
time is optional Datetime format: "%Y-%m-%d %H:%M:%S"
final_period list structure = [
first day of first month 00:00:00,
last day of first month 23:59:59,
first day of second month 00:00:00,
last day of second month 23:59:59,
]
If the period is less than one month, returns the start
and end dates.
Args:
start (str): start date
end (str): end date
freq (str): frequence of dates.
Defaults to "MS"
https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases
Returns:
list: list of datetime
Ex: get_period("2021-01-01", "2022-03-31")
return [
"2021-01-01 00:00:00",
"2021-01-31 23:59:59",
"2021-02-01 00:00:00",
"2021-02-28 23:59:59",
"2021-03-01 00:00:00",
"2022-03-31 23:59:59"
]
Ex: get_period("2022-12-01", "2022-12-07")
return [
"2021-12-01 00:00:00",
"2021-12-07 23:59:59",
]
"""
def split_datetime_str(datetime_str):
split_str = datetime_str.split(" ")
return (
(split_str[0], split_str[1]) if len(split_str) > 1 else (split_str[0], "")
)
def parse_datetime(datestring, format="%Y-%m-%d %H:%M:%S"):
if datestring:
return datetime.strptime(datestring, format)
else:
return None
if start > end:
raise Exception(f"{start=} date is greater than {end=}")
start_date, start_time = split_datetime_str(start)
end_date, end_time = split_datetime_str(end)
period = list(pd.date_range(start_date, end_date, freq=freq))
final_period = []
if len(period) > 1:
# day of start
if start_date[8:10] != "01":
period[0] = parse_datetime(start, format="%Y-%m-%d")
for i in range(len(period)):
try:
final_period.append(period[i])
final_period.append(period[i + 1] - relativedelta(seconds=1))
# end of period list
except IndexError:
final_period.append(parse_datetime(f"{end_date} 23:59:59"))
# period is smaller than the frequency
else:
final_period = [
parse_datetime(f"{start_date} 00:00:00"),
parse_datetime(f"{end_date} 23:59:59"),
]
# time was passed on start param
if start_time:
final_period[0] = parse_datetime(f"{start_date} {start_time}")
# time was passed on end param
if end_time:
final_period[-1] = parse_datetime(f"{end_date} {end_time}")
return final_period
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment