Skip to content

Instantly share code, notes, and snippets.

@tilakpatidar
Last active June 1, 2018 10:51
Show Gist options
  • Save tilakpatidar/4011270592211085164a024c6dc83d62 to your computer and use it in GitHub Desktop.
Save tilakpatidar/4011270592211085164a024c6dc83d62 to your computer and use it in GitHub Desktop.
Pyspark testing
# coding=utf-8
import findspark
from pandas.util.testing import assert_frame_equal
findspark.init()
import logging
import pytest
from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def quiet_py4j():
""" turn down spark logging for the test context """
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_context(request):
""" fixture for creating a spark context
Args:
request: pytest.FixtureRequest object
"""
conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
sc = SparkContext(conf=conf)
request.addfinalizer(lambda: sc.stop())
quiet_py4j()
return sc
@pytest.fixture(scope="session")
def hive_context(spark_context):
""" fixture for creating a Hive Context. Creating a fixture enables it to be reused across all
tests in a session
Args:
spark_context: spark_context fixture
Returns:
HiveContext for tests
"""
return HiveContext(spark_context)
@pytest.fixture(scope="session")
def streaming_context(spark_context):
return StreamingContext(spark_context, 1)
def assert_frame_equal_with_sort(results, expected, keycolumns):
results_sorted = results.sort_values(by=keycolumns).reset_index(drop=True).sort_index(axis=1)
expected_sorted = expected.sort_values(by=keycolumns).reset_index(drop=True).sort_index(axis=1)
assert_frame_equal(results_sorted, expected_sorted, check_index_type=False)
# -*- coding: utf-8 -*-
import sys
from datetime import datetime
import pandas as pd
import pytest
from conftest import assert_frame_equal_with_sort
from lib_python import validations
sys.path.append('scripts')
pytestmark = pytest.mark.usefixtures("spark_context", "hive_context")
def date(date_str):
return datetime.strptime(date_str, "%Y-%m-%d").date()
def test_duplicate_record_is_removed(spark_context, hive_context):
""" test that duplicate record is removed.
case 1: If two records with same primary key is present keep
the first discard the second.
Args:
spark_context: test fixture SparkContext
hive_context: test fixture HiveContext
"""
pk_columns = ["productId", "regionCode", "startDate", "endDate"]
non_null_columns = ["productId", "regionCode", "startDate"]
columns = {
"primary_key": pk_columns,
"null_filter": non_null_columns
}
data_pandas = pd.DataFrame({'productId': ['1', '1', '2', '3', '3'],
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF', 'DE01DEF'],
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'],
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09',
'2018-09-09']})
data_spark = hive_context.createDataFrame(data_pandas)
good, bad = validations.main(data_spark, hive_context, spark_context, columns)
expected_results = pd.DataFrame({'productId': ['1', '1', '2', '3'],
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF'],
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'],
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09']})
assert_frame_equal_with_sort(good.toPandas(), expected_results, ['productId'])
def test_report_not_existing_products(spark_context, hive_context):
""" test that duplicate record is removed.
case 1: If two records with same primary key is present keep
the first discard the second.
Args:
spark_context: test fixture SparkContext
hive_context: test fixture HiveContext
"""
products = hive_context.createDataFrame(pd.DataFrame({'productId': ['1', '2', '3', '4', '5'],
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF',
'DE01DEF'],
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09',
'2018-09-09', '2018-09-09'],
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09',
'2018-09-09',
'2018-09-09']}))
print "Products master table"
products.show()
columns = {
"valid_product": ["productId", "productId", products]
}
if010_costs = pd.DataFrame({'productId': ['1', '2', '3', '4', '6'],
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF', 'DE01DEF'],
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'],
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09',
'2018-09-09']})
if010_costs_df = hive_context.createDataFrame(if010_costs)
print "IF010_COSTS"
if010_costs_df.show()
good, bad = validations.main(if010_costs_df, hive_context, spark_context, columns)
print "GOOD"
good.show()
print "BAD"
bad.show()
print "Good explain"
good.explain()
print "Bad explain"
bad.explain()
expected_good_results = pd.DataFrame({'productId': ['1', '2', '3', '4'],
'regionCode': ['AB98ABCD', 'BC99BCDF', 'CD00CDE', 'DE01DEF'],
'startDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09'],
'endDate': ['2018-09-09', '2018-09-09', '2018-09-09', '2018-09-09']})
expected_bad_results = pd.DataFrame({'productId': ['6'],
'regionCode': ['DE01DEF'],
'startDate': ['2018-09-09'],
'endDate': ['2018-09-09'],
'validation_reason': ['Invalid product']})
assert_frame_equal_with_sort(good.toPandas(), expected_good_results, ['productId'])
assert_frame_equal_with_sort(bad.toPandas(), expected_bad_results, ['productId'])
def test_report_if_date_records_exists_within_range(spark_context, hive_context):
""" test that if records exists within same date range
Args:
spark_context: test fixture SparkContext
hive_context: test fixture HiveContext
"""
columns = {
"subset_validation": ("startDate", "endDate", ["productId", "regionCode"])
}
if010_costs_df = hive_context.createDataFrame(pd.DataFrame([
{
'productId': '1',
'regionCode': 'AB98ABCD',
'startDate': date('2018-09-09'),
'endDate': date('2018-09-19')
},
{
'productId': '2',
'regionCode': 'BC99BCDF',
'startDate': date('2018-10-09'),
'endDate': date('2018-10-19')
},
{
'productId': '3',
'regionCode': 'CD00CDE',
'startDate': date('2018-11-09'),
'endDate': date('2018-11-19')
},
{
'productId': '4',
'regionCode': 'DE01DEF',
'startDate': date('2018-05-05'),
'endDate': date('2018-05-08')
},
{
'productId': '4',
'regionCode': 'DE01DEF',
'startDate': date('2018-05-01'),
'endDate': date('2018-05-10')
}
]))
print "IF010_COSTS"
if010_costs_df.show()
good, bad = validations.main(if010_costs_df, hive_context, spark_context, columns)
print "GOOD"
good.show()
print "BAD"
bad.show()
print "Good explain"
good.explain()
print "Bad explain"
bad.explain()
expected_good_results = pd.DataFrame([
{
'productId': '1',
'regionCode': 'AB98ABCD',
'startDate': date('2018-09-09'),
'endDate': date('2018-09-19')
},
{
'productId': '2',
'regionCode': 'BC99BCDF',
'startDate': date('2018-10-09'),
'endDate': date('2018-10-19')
},
{
'productId': '3',
'regionCode': 'CD00CDE',
'startDate': date('2018-11-09'),
'endDate': date('2018-11-19')
}
])
expected_bad_results = pd.DataFrame(
[
{
'productId': '4',
'regionCode': 'DE01DEF',
'startDate': date('2018-05-05'),
'endDate': date('2018-05-08'),
'validation_reason': 'Subset range already exists for startDate and endDate'
},
{
'productId': '4',
'regionCode': 'DE01DEF',
'startDate': date('2018-05-01'),
'endDate': date('2018-05-10'),
'validation_reason': 'Subset range already exists for startDate and endDate'
}
])
assert_frame_equal_with_sort(good.toPandas(), expected_good_results, ['productId', 'regionCode', 'startDate'])
assert_frame_equal_with_sort(bad.toPandas(), expected_bad_results, ['productId', 'regionCode', 'startDate'])
def test_report_filter_validation(spark_context, hive_context):
"""
Args:
spark_context: test fixture SparkContext
hive_context: test fixture HiveContext
"""
columns = {
"filter_validation": "startDate < endDate"
}
if010_costs_df = hive_context.createDataFrame(pd.DataFrame([
{
'productId': '1',
'regionCode': 'AB98ABCD',
'startDate': date('2018-09-09'),
'endDate': date('2018-09-19')
},
{
'productId': '2',
'regionCode': 'BC99BCDF',
'startDate': date('2018-10-09'),
'endDate': date('2018-10-19')
},
{
'productId': '3',
'regionCode': 'CD00CDE',
'startDate': date('2018-11-09'),
'endDate': date('2018-11-19')
},
{
'productId': '4',
'regionCode': 'DE01DEF',
'startDate': date('2018-05-05'),
'endDate': date('2018-05-04')
}
]))
print "IF010_COSTS"
if010_costs_df.show()
good, bad = validations.main(if010_costs_df, hive_context, spark_context, columns)
print "GOOD"
good.show()
print "BAD"
bad.show()
print "Good explain"
good.explain()
print "Bad explain"
bad.explain()
expected_good_results = pd.DataFrame([
{
'productId': '1',
'regionCode': 'AB98ABCD',
'startDate': date('2018-09-09'),
'endDate': date('2018-09-19')
},
{
'productId': '2',
'regionCode': 'BC99BCDF',
'startDate': date('2018-10-09'),
'endDate': date('2018-10-19')
},
{
'productId': '3',
'regionCode': 'CD00CDE',
'startDate': date('2018-11-09'),
'endDate': date('2018-11-19')
}
])
expected_bad_results = pd.DataFrame(
[
{
'productId': '4',
'regionCode': 'DE01DEF',
'startDate': date('2018-05-05'),
'endDate': date('2018-05-04'),
'validation_reason': 'Unsatisfied condition startDate < endDate'
}
])
assert_frame_equal_with_sort(good.toPandas(), expected_good_results, ['productId', 'regionCode', 'startDate'])
assert_frame_equal_with_sort(bad.toPandas(), expected_bad_results, ['productId', 'regionCode', 'startDate'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment