Last active
May 11, 2023 11:49
-
-
Save jhumigas/a0890c625472e448abad46e137b3d373 to your computer and use it in GitHub Desktop.
Filtering out failure cases with Panderas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Existing builtin decorators in panderas are ok, but they will raise an exception once a check fails. | |
If that is not the expected behavior, we can create our own decorators, to filter out corrupted rows to analyse them later without | |
creating a failure on the whole dataset. | |
In the following, we will create our own decorator which will filter out rows that did not pass our checks. | |
""" | |
import inspect | |
import logging | |
import typing | |
import pandas as pd | |
import pandera as pa | |
import wrapt | |
Schemas = typing.Union[pa.schemas.DataFrameSchema, pa.schemas.SeriesSchema] | |
def validate(schema: Schemas, input_df: pd.DataFrame) -> pd.DataFrame: | |
try: | |
schema.validate(input_df, lazy=True, inplace=True) | |
# Handling exception in case there is a single error | |
except pa.errors.SchemaError as exc: | |
logging.warning("Data Validation checks failed") | |
logging.warning(exc) | |
# We remove the row with abnormal values | |
# Then we log it, after analysis if we can fix it, we do it prior to the validation | |
input_df = input_df[~input_df.index.isin(exc.failure_cases["index"])] | |
return input_df | |
# Handling exception in case there are multiple errors | |
except pa.errors.SchemaErrors as exc: | |
logging.warning("Data Validation checks failed") | |
logging.warning(exc) | |
logging.warning(exc.failure_cases["index"]) | |
input_df = input_df[~input_df.index.isin(exc.failure_cases["index"])] | |
return input_df | |
return input_df | |
def get_function_argnames(fn: typing.Callable) -> typing.List[str]: | |
arg_spec = inspect.getfullargspec(fn).args | |
first_arg_is_self = arg_spec[0] == "self" | |
is_regular_method = inspect.ismethod(fn) and first_arg_is_self | |
if is_regular_method: | |
# Don't include "self" / "cls" argument | |
arg_spec = arg_spec[1:] | |
return arg_spec | |
def validate_input(schema: Schemas, obj_getter: typing.Optional[typing.Union[str, int]] = None) -> typing.Callable: | |
@wrapt.decorator | |
def _wrapper( | |
wrapped_fn: typing.Callable, | |
instance: typing.Union[None, typing.Any], | |
args: typing.Tuple[typing.Any, ...], | |
kwargs: typing.Dict[str, typing.Any], | |
): | |
args = list(args) | |
if isinstance(obj_getter, int): | |
args[obj_getter] = validate(schema, args[obj_getter]) | |
elif isinstance(obj_getter, str): | |
kwargs[obj_getter] = validate(schema, kwargs[obj_getter]) | |
elif obj_getter is None and args and len(args) > 0: | |
args[0] = validate(schema, args[0]) | |
return wrapped_fn(*args, **kwargs) | |
return _wrapper | |
def validate_output(schema: Schemas) -> typing.Callable: | |
@wrapt.decorator | |
def _wrapper( | |
wrapped_fn: typing.Callable, | |
instance: typing.Union[None, typing.Any], | |
args: typing.Tuple[typing.Any, ...], | |
kwargs: typing.Dict[str, typing.Any], | |
): | |
result = wrapped_fn(*args, **kwargs) | |
return validate(schema, result) | |
return _wrapper | |
df = pd.DataFrame({ | |
"column1": [1, 4, 0, 10, 9], | |
"column2": [-1.3, -1.4, -2.9, -10.1, -20.4], | |
}) | |
in_schema = pa.DataFrameSchema({ | |
"column1": pa.Column(int, | |
pa.Check(lambda x: 0 <= x <= 10, element_wise=True)), | |
"column2": pa.Column(float, pa.Check(lambda x: x < -1.2)), | |
}) | |
out_schema = pa.DataFrameSchema({ | |
"column2": pa.Column(float, pa.Check(lambda x: x > 0, element_wise=True)), | |
}) | |
# by default, check_input assumes that the first argument is | |
# dataframe/series. | |
@validate_input(in_schema) | |
def preprocessor_2(dataframe): | |
dataframe["column3"] = dataframe["column1"] + dataframe["column2"] | |
return dataframe | |
# by default assumes that the pandas DataFrame/Schema is the only output | |
@validate_output(out_schema) | |
def zero_column_expect_first(df): | |
df["column2"] = 0 | |
df.loc[0, "column2"] = 20 | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment