Standardized logging for data pipelines that transform pandas dataframes.
# logging functions
def logging_transform_df(df_orig: pd.DataFrame, df_edit: pd.DataFrame, step_name: str = 'compare df') -> None:
"Logging utility to show row difference between two dataframes"
N_orig, N_edit = len(df_orig), len(df_edit)
N_diff = N_orig - N_edit
pct_diff = N_diff / N_orig * 100
# sign for added or removed rows
if N_diff > 0:
sign = '-'
if N_diff < 0:
sign = '+'
if N_diff == 0:
sign = ''
logging.info(f'{step_name} --> {sign}{N_diff} records ({sign}{pct_diff:.2f}%, {N_orig}-{N_edit})')
def wrap_logging_transform_df(func):
"""Wrapper to compare df before and after transformation"""
@wraps(func)
def with_logging(*args, **kwargs):
try:
df_orig: pd.DataFrame = args[0]
df_edit: pd.DataFrame = func(*args, **kwargs)
# df_edit and df_orig should be dataframe
if not isinstance(df_orig, pd.DataFrame):
raise TypeError(f'{func.__name__} args[0] is not a dataframe, but type: {type(args[0])}). Skip logging.')
if not isinstance(df_edit, pd.DataFrame):
raise TypeError(f"{func.__name__} did not return dataframe, but type: {type(df_edit)}. Skip logging.")
else:
logging_transform_df(df_orig, df_edit, func.__name__)
return df_edit
except Exception as e:
logging.error(f"{func.__name__} - wrap_log_df_difference(): {e}")
return func(*args, **kwargs)
return with_logging
# decorate functions
@wrap_logging_transform_df
def select_top_items(df: pd.DataFrame, top_n_items: int =50) -> pd.DataFrame:
"Top n items by revenue"
top_items = (df
.groupby('stockcode').agg({'revenue': 'sum'})
.sort_values('revenue', ascending=False)
.head(top_n_items)
.index
.to_list()
)
return (df
.loc[df.stockcode.isin(top_items)]
.reset_index(drop=True)
)
...
# example preprocessing pipeline
@wrap_logging_transform_df
def preprocessing(df: pd.DataFrame, dtype_mapping: dict, exclude_cols: list, exclude_items: list, top_n_items: int) -> pd.DataFrame:
"""
Preprocessing pipeline: assign dtypes, drop columns, missings, duplicates,
non-products, keep top items, aggregate daily by item, create ds,
y cols for prophet
"""
return (df
.drop(columns=exclude_cols)
.astype(dtype_mapping)
.assign(description=lambda x: x['description'].str.strip(),
revenue=lambda x: x['quantity'] * x['unitprice'])
.pipe(drop_non_products, exclude_items=exclude_items)
.pipe(select_top_items, top_n_items=top_n_items)
.pipe(aggregate_daily)
.pipe(create_prophet_df)
.pipe(drop_missings_duplicates)
)
if __name__ == '__main__':
df_raw = get_raw_data(filepath)
df = preprocessing(df_raw, **config_preprocessing)
Inspect logs:
2021-12-29 10:26:24,115 - root - INFO - Reading file: data/raw/online_retail_small.xlsx
2021-12-29 10:27:33,988 - root - INFO - select_top_items --> -510208 records (-94.15%, 541909-31701)
2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374)
2021-12-29 10:27:34,028 - root - INFO - drop_non_products --> -87 records (-23.26%, 374-287)
2021-12-29 10:27:34,036 - root - INFO - drop_missings_duplicates --> 0 records (0.00%, 287-287)
2021-12-29 10:27:34,037 - root - INFO - preprocessing --> -541622 records (-99.95%, 541909-287)
Data pipelines take a dataframe as input and transform it with several functions, finally returning a transformed dataframe.
Data pipeline df --> (function_1) --> ... --> (function_n) --> df_transformed
A data pipeline benefits from standardized logging. Errors or unexpected behavior of transformations can be easily spotted.
Standardized logging
2021-12-29 10:27:33,988 - root - INFO - select_top_items --> -510208 records (-94.15%, 541909-31701) 2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374) 2021-12-29 10:27:34,028 - root - INFO - drop_non_products --> -87 records (-23.26%, 374-287) 2021-12-29 10:27:34,036 - root - INFO - drop_missings_duplicates --> 0 records (0.00%, 287-287) 2021-12-29 10:27:34,037 - root - INFO - preprocessing --> -541622 records (-99.95%, 541909-287)
def logging_transform_df(df_orig: pd.DataFrame, df_edit: pd.DataFrame, step_name: str = 'compare df') -> None:
"Logging utility to show row difference between two dataframes"
N_orig, N_edit = len(df_orig), len(df_edit)
N_diff = N_orig - N_edit
pct_diff = N_diff / N_orig * 100
# sign for added or removed rows
if N_diff > 0:
sign = '-'
if N_diff < 0:
sign = '+'
if N_diff == 0:
sign = ''
logging.info(f'{step_name} --> {sign}{N_diff} records ({sign}{pct_diff:.2f}%, {N_orig}-{N_edit})')
def wrap_logging_transform_df(func):
"""Wrapper to compare df before and after transformation"""
@wraps(func)
def with_logging(*args, **kwargs):
try:
df_orig: pd.DataFrame = args[0]
df_edit: pd.DataFrame = func(*args, **kwargs)
# df_edit and df_orig should be dataframe
if not isinstance(df_orig, pd.DataFrame):
raise TypeError(f'{func.__name__} args[0] is not a dataframe, but type: {type(args[0])}). Skip logging.')
if not isinstance(df_edit, pd.DataFrame):
raise TypeError(f"{func.__name__} did not return dataframe, but type: {type(df_edit)}. Skip logging.")
else:
logging_transform_df(df_orig, df_edit, func.__name__)
return df_edit
except Exception as e:
logging.error(f"{func.__name__} - wrap_log_df_difference(): {e}")
return func(*args, **kwargs)
return with_logging
Apply the decorator @wrap_logging_transform_df
to functions that transform a dataframe as part of a data pipeline, such as aggregate_daily
.
@wrap_logging_transform_df
def aggregate_daily(df: pd.DataFrame) -> pd.DataFrame:
"Sums quantity, revenue per item and day"
return (df
.resample('D', on='invoicedate')
.agg(
{'description': 'first', 'revenue': 'sum', 'quantity': 'sum', 'unitprice': 'first', 'country': 'first', 'stockcode': 'first', 'invoiceno': 'first'}
)
.reset_index()
)
...
2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374)
Online retail data: http://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx
invoiceno | stockcode | description | quantity | invoicedate | unitprice | customerid | country |
---|---|---|---|---|---|---|---|
536365 | 85123A | WHITE HANGING HEART T-LIGHT HOLDER | 6 | 2010-12-01 08:26:00 | 2.55 | 17850 | United Kingdom |
536365 | 84029G | KNITTED UNION FLAG HOT WATER BOTTLE | 6 | 2010-12-01 08:26:00 | 3.39 | 17850 | United Kingdom |
... | ... | ... | ... | ... | ... | ... | ... |
Load and transform Online retail data stored in data/raw/online_retail_small.xlsx
:
- drop unnecessary columns
- assign dtypes for efficiency
- select top_n items
- aggregate transactions for each day
- drop non-products
- create
ds
andy
column for facebook prophet - drop missings and duplicates
import pandas as pd
import logging
from functools import wraps
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# configuration for local development
filepath = 'data/raw/online_retail_small.xlsx'
exclude_cols = ['customerid']
dtype_mapping = {
"invoiceno": "object",
"quantity": 'uint8',
"country": "category"
}
exclude_items = ['AMAZON FEE',
'AMAZON',
'Manual',
'POSTAGE',
'DOTCOM POSTAGE',
'CRUK Commission',
'Bank Charges',
'Discount',
'SAMPLES',
'Adjust bad debt'
]
top_n_items = 50
config_preprocessing = {
'dtype_mapping': dtype_mapping,
'exclude_cols': exclude_cols,
'exclude_items': exclude_items,
'top_n_items': 50
}
def columns_to_lowercase(df: pd.DataFrame) -> pd.DataFrame:
"Column names to lowercase, no whitespace"
df.columns = (df.columns
.str.lower()
.str.strip()
)
return df
def wrap_logging_transform_df(func):
"""Wrapper to compare df before and after transformation"""
@wraps(func)
def with_logging(*args, **kwargs):
try:
df_orig: pd.DataFrame = args[0]
df_edit: pd.DataFrame = func(*args, **kwargs)
# df_edit and df_orig should be dataframe
if not isinstance(df_orig, pd.DataFrame):
raise TypeError(f'{func.__name__} args[0] is not a dataframe, but type: {type(args[0])}). Skip logging.')
if not isinstance(df_edit, pd.DataFrame):
raise TypeError(f"{func.__name__} did not return dataframe, but type: {type(df_edit)}. Skip logging.")
else:
logging_transform_df(df_orig, df_edit, func.__name__)
return df_edit
except Exception as e:
logging.error(f"{func.__name__} - wrap_logging_transform_df(): {e}")
return func(*args, **kwargs)
return with_logging
def logging_transform_df(df_orig: pd.DataFrame, df_edit: pd.DataFrame, step_name: str = 'compare df') -> None:
"Logging utility to show row difference between two dataframes"
N_orig, N_edit = len(df_orig), len(df_edit)
N_diff = N_orig - N_edit
pct_diff = N_diff / N_orig * 100
# sign for added or removed rows
if N_diff > 0:
sign = '-'
if N_diff < 0:
sign = '+'
if N_diff == 0:
sign = ''
logging.info(f'{step_name} --> {sign}{N_diff} records ({sign}{pct_diff:.2f}%, {N_orig}-{N_edit})')
@wrap_logging_transform_df
def drop_missings_duplicates(df: pd.DataFrame) -> pd.DataFrame:
"Return df without missings and duplicates"
df_nomiss = df.dropna()
# log_df_difference(df, df_nomiss, 'drop_missings')
df_nodup = df_nomiss.drop_duplicates()
# log_df_difference(df_nomiss, df_nodup, 'drop_duplicates')
return df_nodup.reset_index(drop=True)
@wrap_logging_transform_df
def drop_non_products(df: pd.DataFrame, exclude_items: list) -> pd.DataFrame:
"Drop administrative items from description"
df_items = (df
.query(f'description not in {exclude_items}')
.query(f'revenue > 0')
)
return df_items
@wrap_logging_transform_df
def select_top_items(df: pd.DataFrame, top_n_items: int =50) -> pd.DataFrame:
"Top n items by revenue"
top_items = (df
.groupby('stockcode').agg({'revenue': 'sum'})
.sort_values('revenue', ascending=False)
.head(top_n_items)
.index
.to_list()
)
return (df
.loc[df.stockcode.isin(top_items)]
.reset_index(drop=True)
)
@wrap_logging_transform_df
def aggregate_daily(df: pd.DataFrame) -> pd.DataFrame:
"Sums quantity, revenue per item and day"
return (df
.resample('D', on='invoicedate')
.agg(
{'description': 'first', 'revenue': 'sum', 'quantity': 'sum', 'unitprice': 'first', 'country': 'first', 'stockcode': 'first', 'invoiceno': 'first'}
)
.reset_index()
)
def create_prophet_df(df: pd.DataFrame, y: str = 'revenue', ds: str = 'invoicedate') -> pd.DataFrame:
"According prophet input, create cols: ds and y"
return (df
.assign(y= lambda x: x[y], ds= lambda x: x[ds])
)
def get_raw_data(filepath) -> pd.DataFrame:
"Return data without preprocessing"
logging.info(f'Reading file: {filepath}')
return (pd.read_excel(filepath)
.pipe(columns_to_lowercase)
)
@wrap_logging_transform_df
def preprocessing(df: pd.DataFrame, dtype_mapping: dict, exclude_cols: list, exclude_items: list, top_n_items: int) -> pd.DataFrame:
"""
Preprocessing pipeline: assign dtypes, drop columns, missings, duplicates,
non-products, keep top items, aggregate daily by item, create ds,
y cols for prophet
"""
return (df
.drop(columns=exclude_cols)
.astype(dtype_mapping)
.assign(description=lambda x: x['description'].str.strip(),
revenue=lambda x: x['quantity'] * x['unitprice'])
.pipe(drop_non_products, exclude_items=exclude_items)
.pipe(select_top_items, top_n_items=top_n_items)
.pipe(aggregate_daily)
.pipe(create_prophet_df)
.pipe(drop_missings_duplicates)
)
if __name__ == '__main__':
df_raw = get_raw_data(filepath)
df = preprocessing(df_raw, **config_preprocessing)
2021-12-29 10:26:24,115 - root - INFO - Reading file: data/raw/online_retail_small.xlsx
2021-12-29 10:27:33,988 - root - INFO - select_top_items --> -510208 records (-94.15%, 541909-31701)
2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374)
2021-12-29 10:27:34,028 - root - INFO - drop_non_products --> -87 records (-23.26%, 374-287)
2021-12-29 10:27:34,036 - root - INFO - drop_missings_duplicates --> 0 records (0.00%, 287-287)
2021-12-29 10:27:34,037 - root - INFO - preprocessing --> -541622 records (-99.95%, 541909-287)