Skip to content

Instantly share code, notes, and snippets.

@smedegaard
Last active June 2, 2023 18:29
Show Gist options
  • Save smedegaard/802eda2cbdff72cf2e794d325ac35ddd to your computer and use it in GitHub Desktop.
Save smedegaard/802eda2cbdff72cf2e794d325ac35ddd to your computer and use it in GitHub Desktop.
"""
This is a template for creating custom ColumnPairMapExpectations.
For detailed instructions on how to use it, please see:
https://docs.greatexpectations.io/docs/guides/expectations/creating_custom_expectations/how_to_create_custom_column_pair_map_expectations
"""
from typing import Optional
from great_expectations.compatibility import pyspark
from great_expectations.compatibility.pyspark import functions as F
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.exceptions import InvalidExpectationConfigurationError
from great_expectations.execution_engine import (
SparkDFExecutionEngine,
)
from great_expectations.expectations.expectation import ColumnPairMapExpectation
from great_expectations.expectations.metrics.map_metric_provider import (
ColumnPairMapMetricProvider,
column_pair_condition_partial,
)
# This class defines a Metric to support your Expectation.
# For most ColumnPairMapExpectations, the main business logic for calculation will live in this class.
class ColumnPairValuesCvrSetForCompanies(ColumnPairMapMetricProvider):
# This is the id string that will be used to reference your metric.
condition_metric_name = "column_pair_values.cvr_set_for_companies"
# These point your metric at the provided keys to facilitate calculation
condition_domain_keys = ("column_A", "column_B", "target_value")
condition_value_keys = ()
# This method defines the business logic for evaluating your metric when using a SparkDFExecutionEngine
@column_pair_condition_partial(engine=SparkDFExecutionEngine)
def _spark(
cls,
column_A: pyspark.Column,
column_B: pyspark.Column,
**kwargs,
):
print("---------------")
print()
print(kwargs)
print()
print("---------------")
return column_A.eqNullSafe("foobar") & column_B.isNotNull()
# This class defines the Expectation itself
class ExpectCvrToBeSetIfCustomerIsCompany(ColumnPairMapExpectation):
"""
If the `customerType` is `Company`, the `cvr` should not be `None`
"""
# These examples will be shown in the public gallery.
# They will also be executed as unit tests for your Expectation.
examples = [
{
"data": {
"customer_type_data": [
"Company",
"Private",
],
"good_cvr_data": ["12345678", None],
"bad_cvr_data": [None, "12345678"],
},
"tests": [
{
"title": "basic_positive_test",
"exact_match_out": True,
"in": {
"column_A": "customer_type_data",
"column_B": "good_cvr_data",
"target_value": "Company",
},
"out": {"success": True},
},
{
"title": "basic_negative_test",
"exact_match_out": True,
"in": {
"column_A": "customer_type_data",
"column_B": "bad_cvr_data",
"target_value": "Company",
},
"out": {"success": False},
},
],
"test_backends": [
{
"backend": "spark",
"dialects": None,
},
],
}
]
# This is the id string of the Metric used by this Expectation.
# For most Expectations, it will be the same as the `condition_metric_name` defined in your Metric class above.
map_metric = "column_pair_values.cvr_set_for_companies"
args_keys = "target_value"
# This is a list of parameter names that can affect whether the Expectation evaluates to True or False
success_keys = ("column_A", "column_B", "mostly", "target_value")
# This dictionary contains default values for any parameters that should have default values
default_kwarg_values = {"taget_value": "Slartibartfast"}
def validate_configuration(
self, configuration: Optional[ExpectationConfiguration]
) -> None:
"""
Validates that a configuration has been set, and sets a configuration if it has yet to be set. Ensures that
necessary configuration arguments have been provided for the validation of the expectation.
Args:
configuration (OPTIONAL[ExpectationConfiguration]): \
An optional Expectation Configuration entry that will be used to configure the expectation
Returns:
None. Raises InvalidExpectationConfigurationError if the config is not validated successfully
"""
super().validate_configuration(configuration)
configuration = configuration or self.configuration
# # Check other things in configuration.kwargs and raise Exceptions if needed
# try:
# assert (
# ...
# ), "message"
# assert (
# ...
# ), "message"
# except AssertionError as e:
# raise InvalidExpectationConfigurationError(str(e))
# This object contains metadata for display in the public Gallery
library_metadata = {
"tags": [], # Tags for this Expectation in the Gallery
"contributors": [ # Github handles for all contributors to this Expectation.
"@your_name_here", # Don't forget to add your github handle here!
],
}
if __name__ == "__main__":
ExpectCvrToBeSetIfCustomerIsCompany().print_diagnostic_checklist()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment