Skip to content

Instantly share code, notes, and snippets.

@saiqulhaq
Forked from sanzoghenzo/compare.py
Created November 25, 2021 01:59
Show Gist options
  • Save saiqulhaq/b51b2f8f387b0c989f77bbbef111c764 to your computer and use it in GitHub Desktop.
Save saiqulhaq/b51b2f8f387b0c989f77bbbef111c764 to your computer and use it in GitHub Desktop.
Compare Excel sheets with Pandas
"""
Compare two Excel sheets
Inspired by https://pbpython.com/excel-diff-pandas-update.html
For the documentation, download this file and type:
python compare.py --help
"""
import argparse
import pandas as pd
import numpy as np
def report_diff(x):
"""Function to use with groupby.apply to highlight value changes."""
return x[0] if x[0] == x[1] or pd.isna(x).all() else f'{x[0]} ---> {x[1]}'
def strip(x):
"""Function to use with applymap to strip whitespaces from a dataframe."""
return x.strip() if isinstance(x, str) else x
def diff_pd(old_df, new_df, idx_col):
"""
Identify differences between two pandas DataFrames using a key column.
Key column is assumed to have a unique row identifier, i.e. no duplicates.
Args:
old_df (pd.DataFrame): first dataframe
new_df (pd.DataFrame): second dataframe
idx_col (str|list(str)): column name(s) of the index,
needs to be present in both DataFrames
"""
# setting the column name as index for fast operations
old_df = old_df.set_index(idx_col)
new_df = new_df.set_index(idx_col)
# get the added and removed rows
old_keys = old_df.index
new_keys = new_df.index
if isinstance(old_keys, pd.MultiIndex):
removed_keys = old_keys.difference(new_keys)
added_keys = new_keys.difference(old_keys)
else:
removed_keys = np.setdiff1d(old_keys, new_keys)
added_keys = np.setdiff1d(new_keys, old_keys)
# populate the output data with non empty dataframes
out_data = {}
removed = old_df.loc[removed_keys]
if not removed.empty:
out_data["removed"] = removed
added = new_df.loc[added_keys]
if not added.empty:
out_data["added"] = added
# focusing on common data of both dataframes
common_keys = np.intersect1d(old_keys, new_keys, assume_unique=True)
common_columns = np.intersect1d(
old_df.columns, new_df.columns, assume_unique=True
)
new_common = new_df.loc[common_keys, common_columns].applymap(strip)
old_common = old_df.loc[common_keys, common_columns].applymap(strip)
# get the changed rows keys by dropping identical rows
# (indexes are ignored, so we'll reset them)
common_data = pd.concat(
[old_common.reset_index(), new_common.reset_index()], sort=True
)
changed_keys = common_data.drop_duplicates(keep=False)[idx_col]
if isinstance(changed_keys, pd.Series):
changed_keys = changed_keys.unique()
else:
changed_keys = changed_keys.drop_duplicates().set_index(idx_col).index
# combining the changed rows via multi level columns
df_all_changes = pd.concat(
[old_common.loc[changed_keys], new_common.loc[changed_keys]],
axis='columns',
keys=['old', 'new']
).swaplevel(axis='columns')
# using report_diff to merge the changes in a single cell with "-->"
df_changed = df_all_changes.groupby(level=0, axis=1).apply(
lambda frame: frame.apply(report_diff, axis=1))
# add changed dataframe to output data only if non empty
if not df_changed.empty:
out_data['changed'] = df_changed
return out_data
def compare_excel(
path1, path2, out_path, sheet_name, index_col_name, **kwargs
):
old_df = pd.read_excel(path1, sheet_name=sheet_name, **kwargs)
new_df = pd.read_excel(path2, sheet_name=sheet_name, **kwargs)
diff = diff_pd(old_df, new_df, index_col_name)
if diff:
with pd.ExcelWriter(out_path) as writer:
for sname, data in diff.items():
data.to_excel(writer, sheet_name=sname)
print(f"Differences saved in {out_path}")
else:
print("No differences spotted")
def build_parser():
cfg = argparse.ArgumentParser(
description="Compares two Excel sheets and outputs the differences "
"to a separate Excel file."
)
cfg.add_argument("path1", help="Fist Excel file")
cfg.add_argument("path2", help="Second Excel file")
cfg.add_argument("sheetname", help="Name of the sheet to compare.")
cfg.add_argument(
"key_column",
help="Name of the column(s) with unique row identifier. It has to be "
"the actual text of the first row, not the excel notation."
"Use multiple times to create a composite index.",
nargs="+",
)
cfg.add_argument("-o", "--output-path", default="compared.xlsx",
help="Path of the comparison results")
cfg.add_argument("--skiprows", help='Excel row containing the table headers',
type=int, action='append', default=None)
return cfg
def main():
cfg = build_parser()
opt = cfg.parse_args()
compare_excel(opt.path1, opt.path2, opt.output_path, opt.sheetname,
opt.key_column, skiprows=opt.skiprows)
if __name__ == '__main__':
main()
"""Excel compare test suite."""
import io
import pandas as pd
import compare
def test_parser():
cfg = compare.build_parser()
opt = cfg.parse_args(["test1.xlsx", "test2.xlsx", "Sheet 1", "Col1", "Col2", "-o", "output.xlsx"])
assert opt.path1 == "test1.xlsx"
assert opt.path2 == "test2.xlsx"
assert opt.output_path == "output.xlsx"
assert opt.sheetname == "Sheet 1"
assert opt.key_column == ["Col1", "Col2"]
assert opt.skiprows is None
def build_excel_stream(df, sheetname):
"""Create an excel workbook as a file-like object."""
output = io.BytesIO()
with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
df.to_excel(writer, sheet_name=sheetname, index=False)
return output
def sample_xlsx(df_1, df_2):
xlsx_1 = build_excel_stream(df_1, "Sheet1")
xlsx_2 = build_excel_stream(df_2, "Sheet1")
return xlsx_1, xlsx_2
def sample_dfs():
df_1 = pd.DataFrame({
"ID": [123456, 654321, 543219, 432198, 765432],
"Name": ["Lemonade", "Cola", "Orange", "Fruit Punch", "Tobacco"],
"Flavour Description": ["Fuzzy", "Fuzzy", "Fuzzy", "Fuzzy", "Smoky"],
})
df_2 = pd.DataFrame({
"ID": [123456, 654321, 543219, 432198, 876543],
"Name": ["Lemonade", "Cola", "Orange", "Fruit Punch", "Soda"],
"Flavour Description": ["Fuzzy", "Bubbly", "Fuzzy", "Fuzzy", "Sugary"],
})
return df_1, df_2
def run_assertion(diff):
changed = diff["changed"]
assert len(changed) == 1
assert changed.iloc[0]["Flavour Description"] == "Fuzzy ---> Bubbly"
added = diff["added"]
assert len(added) == 1
assert added.iloc[0]["Flavour Description"] == "Sugary"
removed = diff["removed"]
assert len(removed) == 1
assert removed.iloc[0]["Flavour Description"] == "Smoky"
print("OK.")
def test_single_index():
df_1, df_2 = sample_dfs()
diff = compare.diff_pd(df_1, df_2, ["ID"])
run_assertion(diff)
def test_single_index_excel():
xlsx_1, xlsx_2 = sample_xlsx(*sample_dfs())
diff_io = io.BytesIO()
compare.compare_excel(xlsx_1, xlsx_2, diff_io, "Sheet1", "ID")
diff = pd.read_excel(diff_io, sheet_name=None)
run_assertion(diff)
def sample_multiindex_dfs():
df_1 = pd.DataFrame({
"ID": [123456, 123456, 654321, 543219, 432198, 765432],
"Name": ["Lemonade", "Lemonade", "Cola", "Orange", "Fruit Punch", "Tobacco"],
"Flavour ID": [1, 2, None, None, None, None],
"Flavour Description": ["Fuzzy", "Fuzzy", "Fuzzy", "Fuzzy", "Fuzzy", "Smoky"],
})
df_2 = pd.DataFrame({
"ID": [123456, 123456, 654321, 543219, 432198, 876543],
"Name": ["Lemonade", "Lemonade", "Cola", "Orange", "Fruit Punch", "Soda"],
"Flavour ID": [1, 2, None, None, None, None],
"Flavour Description": ["Fuzzy", "Bubbly", "Fuzzy", "Fuzzy", "Fuzzy", "Sugary"],
})
return df_1, df_2
def test_multiindex():
df_1, df_2 = sample_multiindex_dfs()
diff = compare.diff_pd(df_1, df_2, ["ID", "Flavour ID"])
run_assertion(diff)
def test_multiindex_excel():
xlsx_1, xlsx_2 = sample_xlsx(*sample_multiindex_dfs())
diff_io = io.BytesIO()
compare.compare_excel(xlsx_1, xlsx_2, diff_io, "Sheet1", ["ID", "Flavour ID"])
diff = pd.read_excel(diff_io, sheet_name=None)
run_assertion(diff)
def test_no_diffs():
df_1, _ = sample_multiindex_dfs()
diff = compare.diff_pd(df_1, df_1, ["ID", "Flavour ID"])
assert not diff
print("OK.")
if __name__ == '__main__':
test_multiindex()
test_multiindex_excel()
test_single_index()
test_single_index_excel()
test_parser()
test_no_diff()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment