Last active
June 1, 2018 10:51
-
-
Save tilakpatidar/4011270592211085164a024c6dc83d62 to your computer and use it in GitHub Desktop.
Pyspark testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import findspark | |
from pandas.util.testing import assert_frame_equal | |
findspark.init() | |
import logging | |
import pytest | |
from pyspark import HiveContext | |
from pyspark import SparkConf | |
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
def quiet_py4j(): | |
""" turn down spark logging for the test context """ | |
logger = logging.getLogger('py4j') | |
logger.setLevel(logging.WARN) | |
@pytest.fixture(scope="session") | |
def spark_context(request): | |
""" fixture for creating a spark context | |
Args: | |
request: pytest.FixtureRequest object | |
""" | |
conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")) | |
sc = SparkContext(conf=conf) | |
request.addfinalizer(lambda: sc.stop()) | |
quiet_py4j() | |
return sc | |
@pytest.fixture(scope="session") | |
def hive_context(spark_context): | |
""" fixture for creating a Hive Context. Creating a fixture enables it to be reused across all | |
tests in a session | |
Args: | |
spark_context: spark_context fixture | |
Returns: | |
HiveContext for tests | |
""" | |
return HiveContext(spark_context) | |
@pytest.fixture(scope="session") | |
def streaming_context(spark_context): | |
return StreamingContext(spark_context, 1) | |
def assert_frame_equal_with_sort(results, expected, keycolumns): | |
results_sorted = results.sort_values(by=keycolumns).reset_index(drop=True).sort_index(axis=1) | |
expected_sorted = expected.sort_values(by=keycolumns).reset_index(drop=True).sort_index(axis=1) | |
assert_frame_equal(results_sorted, expected_sorted, check_index_type=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys | |
from datetime import datetime | |
import pandas as pd | |
import pytest | |
from conftest import assert_frame_equal_with_sort | |
from lib_python import validations | |
sys.path.append('scripts') | |
pytestmark = pytest.mark.usefixtures("spark_context", "hive_context") | |
def date(date_str): | |
return datetime.strptime(date_str, "%Y-%m-%d").date() | |
def test_duplicate_record_is_removed(spark_context, hive_context): | |
""" test that duplicate record is removed. | |
case 1: If two records with same primary key is present keep | |
the first discard the second. | |
Args: | |
spark_context: test fixture SparkContext | |
hive_context: test fixture HiveContext | |
""" | |
pk_columns = ["productId", "regionCode", "startDate", "endDate"] | |
non_null_columns = ["productId", "regionCode", "startDate"] | |
columns = { | |
"primary_key": pk_columns, | |
"null_filter": non_null_columns | |
} | |
data_pandas = pd.DataFrame({'productId': ['1', '1', '2', '3', '3'], | |
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF', 'DE01DEF'], | |
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'], | |
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09', | |
'2018-09-09']}) | |
data_spark = hive_context.createDataFrame(data_pandas) | |
good, bad = validations.main(data_spark, hive_context, spark_context, columns) | |
expected_results = pd.DataFrame({'productId': ['1', '1', '2', '3'], | |
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF'], | |
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'], | |
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09']}) | |
assert_frame_equal_with_sort(good.toPandas(), expected_results, ['productId']) | |
def test_report_not_existing_products(spark_context, hive_context): | |
""" test that duplicate record is removed. | |
case 1: If two records with same primary key is present keep | |
the first discard the second. | |
Args: | |
spark_context: test fixture SparkContext | |
hive_context: test fixture HiveContext | |
""" | |
products = hive_context.createDataFrame(pd.DataFrame({'productId': ['1', '2', '3', '4', '5'], | |
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF', | |
'DE01DEF'], | |
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', | |
'2018-09-09', '2018-09-09'], | |
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', | |
'2018-09-09', | |
'2018-09-09']})) | |
print "Products master table" | |
products.show() | |
columns = { | |
"valid_product": ["productId", "productId", products] | |
} | |
if010_costs = pd.DataFrame({'productId': ['1', '2', '3', '4', '6'], | |
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF', 'DE01DEF'], | |
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'], | |
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09', | |
'2018-09-09']}) | |
if010_costs_df = hive_context.createDataFrame(if010_costs) | |
print "IF010_COSTS" | |
if010_costs_df.show() | |
good, bad = validations.main(if010_costs_df, hive_context, spark_context, columns) | |
print "GOOD" | |
good.show() | |
print "BAD" | |
bad.show() | |
print "Good explain" | |
good.explain() | |
print "Bad explain" | |
bad.explain() | |
expected_good_results = pd.DataFrame({'productId': ['1', '2', '3', '4'], | |
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF'], | |
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'], | |
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09']}) | |
expected_bad_results = pd.DataFrame({'productId': ['6'], | |
'regionCode': ['DE01DEF'], | |
'startDate': ['2018-09-09'], | |
'endDate': ['2018-09-09'], | |
'validation_reason': ['Invalid product']}) | |
assert_frame_equal_with_sort(good.toPandas(), expected_good_results, ['productId']) | |
assert_frame_equal_with_sort(bad.toPandas(), expected_bad_results, ['productId']) | |
def test_report_if_date_records_exists_within_range(spark_context, hive_context): | |
""" test that if records exists within same date range | |
Args: | |
spark_context: test fixture SparkContext | |
hive_context: test fixture HiveContext | |
""" | |
columns = { | |
"subset_validation": ("startDate", "endDate", ["productId", "regionCode"]) | |
} | |
if010_costs_df = hive_context.createDataFrame(pd.DataFrame([ | |
{ | |
'productId': '1', | |
'regionCode': 'AB98ABCD', | |
'startDate': date('2018-09-09'), | |
'endDate': date('2018-09-19') | |
}, | |
{ | |
'productId': '2', | |
'regionCode': 'BC99BCDF', | |
'startDate': date('2018-10-09'), | |
'endDate': date('2018-10-19') | |
}, | |
{ | |
'productId': '3', | |
'regionCode': 'CD00CDE', | |
'startDate': date('2018-11-09'), | |
'endDate': date('2018-11-19') | |
}, | |
{ | |
'productId': '4', | |
'regionCode': 'DE01DEF', | |
'startDate': date('2018-05-05'), | |
'endDate': date('2018-05-08') | |
}, | |
{ | |
'productId': '4', | |
'regionCode': 'DE01DEF', | |
'startDate': date('2018-05-01'), | |
'endDate': date('2018-05-10') | |
} | |
])) | |
print "IF010_COSTS" | |
if010_costs_df.show() | |
good, bad = validations.main(if010_costs_df, hive_context, spark_context, columns) | |
print "GOOD" | |
good.show() | |
print "BAD" | |
bad.show() | |
print "Good explain" | |
good.explain() | |
print "Bad explain" | |
bad.explain() | |
expected_good_results = pd.DataFrame([ | |
{ | |
'productId': '1', | |
'regionCode': 'AB98ABCD', | |
'startDate': date('2018-09-09'), | |
'endDate': date('2018-09-19') | |
}, | |
{ | |
'productId': '2', | |
'regionCode': 'BC99BCDF', | |
'startDate': date('2018-10-09'), | |
'endDate': date('2018-10-19') | |
}, | |
{ | |
'productId': '3', | |
'regionCode': 'CD00CDE', | |
'startDate': date('2018-11-09'), | |
'endDate': date('2018-11-19') | |
} | |
]) | |
expected_bad_results = pd.DataFrame( | |
[ | |
{ | |
'productId': '4', | |
'regionCode': 'DE01DEF', | |
'startDate': date('2018-05-05'), | |
'endDate': date('2018-05-08'), | |
'validation_reason': 'Subset range already exists for startDate and endDate' | |
}, | |
{ | |
'productId': '4', | |
'regionCode': 'DE01DEF', | |
'startDate': date('2018-05-01'), | |
'endDate': date('2018-05-10'), | |
'validation_reason': 'Subset range already exists for startDate and endDate' | |
} | |
]) | |
assert_frame_equal_with_sort(good.toPandas(), expected_good_results, ['productId', 'regionCode', 'startDate']) | |
assert_frame_equal_with_sort(bad.toPandas(), expected_bad_results, ['productId', 'regionCode', 'startDate']) | |
def test_report_filter_validation(spark_context, hive_context): | |
""" | |
Args: | |
spark_context: test fixture SparkContext | |
hive_context: test fixture HiveContext | |
""" | |
columns = { | |
"filter_validation": "startDate < endDate" | |
} | |
if010_costs_df = hive_context.createDataFrame(pd.DataFrame([ | |
{ | |
'productId': '1', | |
'regionCode': 'AB98ABCD', | |
'startDate': date('2018-09-09'), | |
'endDate': date('2018-09-19') | |
}, | |
{ | |
'productId': '2', | |
'regionCode': 'BC99BCDF', | |
'startDate': date('2018-10-09'), | |
'endDate': date('2018-10-19') | |
}, | |
{ | |
'productId': '3', | |
'regionCode': 'CD00CDE', | |
'startDate': date('2018-11-09'), | |
'endDate': date('2018-11-19') | |
}, | |
{ | |
'productId': '4', | |
'regionCode': 'DE01DEF', | |
'startDate': date('2018-05-05'), | |
'endDate': date('2018-05-04') | |
} | |
])) | |
print "IF010_COSTS" | |
if010_costs_df.show() | |
good, bad = validations.main(if010_costs_df, hive_context, spark_context, columns) | |
print "GOOD" | |
good.show() | |
print "BAD" | |
bad.show() | |
print "Good explain" | |
good.explain() | |
print "Bad explain" | |
bad.explain() | |
expected_good_results = pd.DataFrame([ | |
{ | |
'productId': '1', | |
'regionCode': 'AB98ABCD', | |
'startDate': date('2018-09-09'), | |
'endDate': date('2018-09-19') | |
}, | |
{ | |
'productId': '2', | |
'regionCode': 'BC99BCDF', | |
'startDate': date('2018-10-09'), | |
'endDate': date('2018-10-19') | |
}, | |
{ | |
'productId': '3', | |
'regionCode': 'CD00CDE', | |
'startDate': date('2018-11-09'), | |
'endDate': date('2018-11-19') | |
} | |
]) | |
expected_bad_results = pd.DataFrame( | |
[ | |
{ | |
'productId': '4', | |
'regionCode': 'DE01DEF', | |
'startDate': date('2018-05-05'), | |
'endDate': date('2018-05-04'), | |
'validation_reason': 'Unsatisfied condition startDate < endDate' | |
} | |
]) | |
assert_frame_equal_with_sort(good.toPandas(), expected_good_results, ['productId', 'regionCode', 'startDate']) | |
assert_frame_equal_with_sort(bad.toPandas(), expected_bad_results, ['productId', 'regionCode', 'startDate']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment