Skip to content

Instantly share code, notes, and snippets.

@jhumigas
Last active May 11, 2023 11:49
Show Gist options
  • Save jhumigas/a0890c625472e448abad46e137b3d373 to your computer and use it in GitHub Desktop.
Save jhumigas/a0890c625472e448abad46e137b3d373 to your computer and use it in GitHub Desktop.
Filtering out failure cases with Panderas
"""
Existing builtin decorators in panderas are ok, but they will raise an exception once a check fails.
If that is not the expected behavior, we can create our own decorators, to filter out corrupted rows to analyse them later without
creating a failure on the whole dataset.
In the following, we will create our own decorator which will filter out rows that did not pass our checks.
"""
import inspect
import logging
import typing
import pandas as pd
import pandera as pa
import wrapt
Schemas = typing.Union[pa.schemas.DataFrameSchema, pa.schemas.SeriesSchema]
def validate(schema: Schemas, input_df: pd.DataFrame) -> pd.DataFrame:
try:
schema.validate(input_df, lazy=True, inplace=True)
# Handling exception in case there is a single error
except pa.errors.SchemaError as exc:
logging.warning("Data Validation checks failed")
logging.warning(exc)
# We remove the row with abnormal values
# Then we log it, after analysis if we can fix it, we do it prior to the validation
input_df = input_df[~input_df.index.isin(exc.failure_cases["index"])]
return input_df
# Handling exception in case there are multiple errors
except pa.errors.SchemaErrors as exc:
logging.warning("Data Validation checks failed")
logging.warning(exc)
logging.warning(exc.failure_cases["index"])
input_df = input_df[~input_df.index.isin(exc.failure_cases["index"])]
return input_df
return input_df
def get_function_argnames(fn: typing.Callable) -> typing.List[str]:
arg_spec = inspect.getfullargspec(fn).args
first_arg_is_self = arg_spec[0] == "self"
is_regular_method = inspect.ismethod(fn) and first_arg_is_self
if is_regular_method:
# Don't include "self" / "cls" argument
arg_spec = arg_spec[1:]
return arg_spec
def validate_input(schema: Schemas, obj_getter: typing.Optional[typing.Union[str, int]] = None) -> typing.Callable:
@wrapt.decorator
def _wrapper(
wrapped_fn: typing.Callable,
instance: typing.Union[None, typing.Any],
args: typing.Tuple[typing.Any, ...],
kwargs: typing.Dict[str, typing.Any],
):
args = list(args)
if isinstance(obj_getter, int):
args[obj_getter] = validate(schema, args[obj_getter])
elif isinstance(obj_getter, str):
kwargs[obj_getter] = validate(schema, kwargs[obj_getter])
elif obj_getter is None and args and len(args) > 0:
args[0] = validate(schema, args[0])
return wrapped_fn(*args, **kwargs)
return _wrapper
def validate_output(schema: Schemas) -> typing.Callable:
@wrapt.decorator
def _wrapper(
wrapped_fn: typing.Callable,
instance: typing.Union[None, typing.Any],
args: typing.Tuple[typing.Any, ...],
kwargs: typing.Dict[str, typing.Any],
):
result = wrapped_fn(*args, **kwargs)
return validate(schema, result)
return _wrapper
df = pd.DataFrame({
"column1": [1, 4, 0, 10, 9],
"column2": [-1.3, -1.4, -2.9, -10.1, -20.4],
})
in_schema = pa.DataFrameSchema({
"column1": pa.Column(int,
pa.Check(lambda x: 0 <= x <= 10, element_wise=True)),
"column2": pa.Column(float, pa.Check(lambda x: x < -1.2)),
})
out_schema = pa.DataFrameSchema({
"column2": pa.Column(float, pa.Check(lambda x: x > 0, element_wise=True)),
})
# by default, check_input assumes that the first argument is
# dataframe/series.
@validate_input(in_schema)
def preprocessor_2(dataframe):
dataframe["column3"] = dataframe["column1"] + dataframe["column2"]
return dataframe
# by default assumes that the pandas DataFrame/Schema is the only output
@validate_output(out_schema)
def zero_column_expect_first(df):
df["column2"] = 0
df.loc[0, "column2"] = 20
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment