Created
March 26, 2025 19:43
-
-
Save raffaem/e7af56ffb425cceb4ccb3579806cecc0 to your computer and use it in GitHub Desktop.
pandas with logging
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-FileCopyrightText: 2024-present Raffaele Mancuso <[email protected]> | |
# SPDX-License-Identifier: MIT | |
import pandas | |
import logging | |
class DataFrame(pandas.DataFrame): | |
def __init__(self, *args, **kwargs): | |
logging.info("Our init was called") | |
super().__init__(*args, **kwargs) | |
self.log = True | |
def copy_from(self, df): | |
self.log = df.log | |
def turn_off(self): | |
self.log_backup = self.log | |
self.log = False | |
def turn_on(self): | |
self.log = self.log_backup | |
def dfmvcols(self, start_cols=list(), end_cols=list()): | |
# Make sure start_cols and end_cols are lists | |
if isinstance(start_cols, str): | |
start_cols = [start_cols] | |
if isinstance(end_cols, str): | |
start_cols = [end_cols] | |
# Make sure columns exist in the dataframe | |
start_cols = [x for x in start_cols if x in self.columns] | |
end_cols = [x for x in end_cols if x in self.columns] | |
# Get columns for which we preserve the order | |
cols = [x for x in self.columns if ((x not in start_cols) and (x not in end_cols))] | |
# New column ordering | |
cols = start_cols + cols + end_cols | |
# Return | |
return self[cols] | |
def drop_duplicates(self, inplace=False, *args, **kwargs): | |
self.turn_off() | |
nrow0 = self.shape[0] | |
out_df = super().drop_duplicates(*args, **kwargs) | |
if not inplace: | |
nrow1 = out_df.shape[0] | |
else: | |
nrow1 = self.shape[0] | |
if(self.log_backup): | |
logging.getLogger("pandas-log").info(f"Dropped {nrow0-nrow1}/{nrow0} rows") | |
self.turn_on() | |
return out_df | |
def dropna(self, inplace=False, *args, **kwargs): | |
self.turn_off() | |
nrow0 = self.shape[0] | |
out_df = super().dropna(*args, **kwargs) | |
if not inplace: | |
nrow1 = out_df.shape[0] | |
else: | |
nrow1 = self.shape[0] | |
if(self.log_backup): | |
logging.getLogger("pandas-log").info(f"Dropped {nrow0-nrow1}/{nrow0} rows") | |
self.turn_on() | |
return out_df | |
def rename(self, *args, **kwargs): | |
def proc_pair(x,y): | |
col_pairs = list(zip(x, y)) | |
col_pairs = list(filter(lambda x: x[0]!=x[1], col_pairs)) | |
col_pairs = [str(x[0])+" -> "+str(x[1]) for x in col_pairs] | |
col_pairs = ", ".join(col_pairs) | |
return col_pairs | |
start_cols = self.columns | |
start_ixs = self.index | |
out_df = super().rename(*args, **kwargs) | |
fin_cols = out_df.columns | |
fin_ixs = out_df.index | |
col_pairs = proc_pair(start_cols, fin_cols) | |
ix_pairs = proc_pair(start_ixs, fin_ixs) | |
if self.log: | |
msg = "" | |
if col_pairs: | |
msg += f"Renamed columns: {col_pairs}; " | |
if ix_pairs: | |
msg += "Renamed indexes: {ix_pairs}" | |
logging.info(msg) | |
return out_df | |
def __getitem__(self, key): | |
# This returns a pandas DataFrame | |
out_df = super().__getitem__(key) | |
if isinstance(out_df, pandas.Series): | |
if self.log: | |
logging.getLogger("pandas-log").info(f"Selected a single column") | |
return out_df | |
out_df.__class__ = DataFrame | |
out_df.copy_from(self) | |
start_cols = self.columns | |
fin_cols = out_df.columns | |
start_ixs = self.index | |
fin_ixs = out_df.index | |
if self.log: | |
logging.getLogger("pandas-log").info(f"Selected {len(fin_cols):,d}/{len(start_cols):,d} columns and {len(fin_ixs):,d}/{len(start_ixs):,d} indexes") | |
return out_df | |
def concat(dfs, *args, **kwargs): | |
out_df = pandas.concat(dfs, *args, **kwargs) | |
out_df = DataFrame(out_df) | |
ls = [f"({df.shape[0]:,d} x {df.shape[1]:,d})" for df in dfs] | |
ls = " + ".join(ls) | |
logging.getLogger("pandas-log").info(f"Concatenating {ls} -> ({out_df.shape[0]:,d} x {out_df.shape[1]:,d})") | |
return out_df | |
def merge(left, right, on=None, left_on=None, right_on=None, *args, **kwargs): | |
left.turn_off() | |
right.turn_off() | |
outdf = pandas.merge(left, right, on=on, left_on=left_on, right_on=right_on, *args, **kwargs) | |
outdf.__class__ = DataFrame | |
outdf.copy_from(left) | |
if on is not None: | |
left_on = on | |
right_on = on | |
if isinstance(left_on, str): | |
left_on = [left_on] | |
if isinstance(right_on, str): | |
right_on = [right_on] | |
mask_left = left[left_on].set_index(left_on).index.isin(outdf[left_on].set_index(left_on).index) | |
mask_right = right[right_on].set_index(right_on).index.isin(outdf[right_on].set_index(right_on).index) | |
logging.getLogger("pandas-log").info("Merging:") | |
logging.getLogger("pandas-log").info(f"\t{len(mask_left)-sum(mask_left):,d}/{len(mask_left):,d} left rows unmatched") | |
logging.getLogger("pandas-log").info(f"\t{len(mask_right)-sum(mask_right):,d}/{len(mask_right):,d} right rows unmatched") | |
left.turn_on() | |
right.turn_on() | |
return outdf | |
def read_excel(file, *args, **kwargs): | |
out_df = pandas.read_excel(file, *args, **kwargs) | |
out_df = DataFrame(out_df) | |
logging.getLogger("pandas-log").info(f"Read Excel file '{file}' with shape ({out_df.shape[0]:,d} x {out_df.shape[1]:,d})") | |
return out_df | |
def read_csv(file, *args, **kwargs): | |
out_df = pandas.read_csv(file, *args, **kwargs) | |
out_df = DataFrame(out_df) | |
logging.getLogger("pandas-log").info(f"Read CSV file '{file}' with shape ({out_df.shape[0]:,d} x {out_df.shape[1]:,d})") | |
return out_df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment