Last active
October 24, 2022 07:40
-
-
Save zxkane/7b1edcdfe00c078ebbad2ffdb225b69f to your computer and use it in GitHub Desktop.
diff two dataset by pandas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import os | |
import glob | |
import logging | |
pandas_etl_data_dir = '/etl-diff/panda-output' | |
spark_etl_data_dir = '/etl-diff/spark-output' | |
def get_logger(name): | |
logger = logging.getLogger(name) | |
log_format = '%(asctime)s %(levelname)s %(name)s: %(message)s' | |
logging.basicConfig(format=log_format, level=logging.INFO) | |
logger.setLevel(logging.INFO) | |
return logger | |
def get_files(filename_pattern, root_dir): | |
return glob.iglob(os.path.join(root_dir, '') + filename_pattern, recursive=True) | |
def loadDF(f): | |
logging.debug(f'Loading dataframe from {f}...') | |
try: | |
return pd.read_csv(f) | |
except pd.errors.EmptyDataError as e: | |
logging.debug(f'Ignore the empty file {f}') | |
return pd.DataFrame() | |
def diffData(data_file): | |
files = get_files(data_file + '/*', spark_etl_data_dir) | |
logging.debug(f'Loading Spark etl csvs to dataframe') | |
smallDfs = map(loadDF, files) | |
spark_df = pd.concat(smallDfs, ignore_index=True, sort=False) | |
logging.debug(f'Loading pandas etl csv to dataframe') | |
pandas_df = pd.read_csv(os.path.join(pandas_etl_data_dir, data_file + '.csv')) | |
# with pd.option_context('display.max_rows', None, 'display.max_columns', None): # more options can be specified also | |
# pandaRow = pandas_df.loc[pandas_df['TransactionID'] == 2987137] | |
# sparkRow = spark_df.loc[spark_df['TransactionID'] == 2987137] | |
# logging.info(pandaRow.compare(sparkRow)) | |
# del pandas_df['TransactionAmt'] | |
# del spark_df['TransactionAmt'] | |
logging.debug(f'Pandas df is {len(pandas_df)} lines') | |
logging.debug(f'Spark df is {len(spark_df)} lines') | |
logging.debug('Diffing the data generated by TWO pipeline...') | |
merged = spark_df.merge(pandas_df, indicator=True, how='outer') | |
diffed = merged[merged['_merge'] != 'both'] | |
if len(diffed) > 0: | |
logging.info(f'Found the diffed dataset of {data_file},\n\t{diffed}') | |
else: | |
logging.info(f'The dataset "{data_file}" is identical.') | |
logging = get_logger(__name__) | |
datasets = list(filter(lambda x: x != 'features' and x != 'test', map(lambda f: os.path.basename(f), get_files('*', spark_etl_data_dir)))) | |
for data in datasets: | |
logging.info(f'Comparing the dataset {data}...') | |
diffData(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment