Skip to content

Instantly share code, notes, and snippets.

@zxkane
Last active October 24, 2022 07:40
Show Gist options
  • Save zxkane/7b1edcdfe00c078ebbad2ffdb225b69f to your computer and use it in GitHub Desktop.
Save zxkane/7b1edcdfe00c078ebbad2ffdb225b69f to your computer and use it in GitHub Desktop.
diff two dataset by pandas
import pandas as pd
import os
import glob
import logging
pandas_etl_data_dir = '/etl-diff/panda-output'
spark_etl_data_dir = '/etl-diff/spark-output'
def get_logger(name):
logger = logging.getLogger(name)
log_format = '%(asctime)s %(levelname)s %(name)s: %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)
logger.setLevel(logging.INFO)
return logger
def get_files(filename_pattern, root_dir):
return glob.iglob(os.path.join(root_dir, '') + filename_pattern, recursive=True)
def loadDF(f):
logging.debug(f'Loading dataframe from {f}...')
try:
return pd.read_csv(f)
except pd.errors.EmptyDataError as e:
logging.debug(f'Ignore the empty file {f}')
return pd.DataFrame()
def diffData(data_file):
files = get_files(data_file + '/*', spark_etl_data_dir)
logging.debug(f'Loading Spark etl csvs to dataframe')
smallDfs = map(loadDF, files)
spark_df = pd.concat(smallDfs, ignore_index=True, sort=False)
logging.debug(f'Loading pandas etl csv to dataframe')
pandas_df = pd.read_csv(os.path.join(pandas_etl_data_dir, data_file + '.csv'))
# with pd.option_context('display.max_rows', None, 'display.max_columns', None): # more options can be specified also
# pandaRow = pandas_df.loc[pandas_df['TransactionID'] == 2987137]
# sparkRow = spark_df.loc[spark_df['TransactionID'] == 2987137]
# logging.info(pandaRow.compare(sparkRow))
# del pandas_df['TransactionAmt']
# del spark_df['TransactionAmt']
logging.debug(f'Pandas df is {len(pandas_df)} lines')
logging.debug(f'Spark df is {len(spark_df)} lines')
logging.debug('Diffing the data generated by TWO pipeline...')
merged = spark_df.merge(pandas_df, indicator=True, how='outer')
diffed = merged[merged['_merge'] != 'both']
if len(diffed) > 0:
logging.info(f'Found the diffed dataset of {data_file},\n\t{diffed}')
else:
logging.info(f'The dataset "{data_file}" is identical.')
logging = get_logger(__name__)
datasets = list(filter(lambda x: x != 'features' and x != 'test', map(lambda f: os.path.basename(f), get_files('*', spark_etl_data_dir))))
for data in datasets:
logging.info(f'Comparing the dataset {data}...')
diffData(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment