Created
August 24, 2025 21:38
-
-
Save fuhoi/c30b11667d02ab477ed6d2286d3b6459 to your computer and use it in GitHub Desktop.
pd_merge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import hashlib | |
from datetime import datetime | |
def detect_dataframe_changes(source_df: pd.DataFrame, target_df: pd.DataFrame, primary_key: str, ignored_columns: list): | |
""" | |
Detects inserted, updated, and deleted rows between two DataFrames. | |
Args: | |
source_df (pd.DataFrame): The source DataFrame. | |
target_df (pd.DataFrame): The target DataFrame. | |
primary_key (str): The name of the primary key column. | |
ignored_columns (list): A list of columns to ignore for change detection. | |
Returns: | |
tuple: A tuple containing: | |
- inserted_rows (pd.DataFrame): DataFrame of inserted rows. | |
- updated_rows (pd.DataFrame): DataFrame of updated rows. | |
- deleted_rows (pd.DataFrame): DataFrame of deleted rows. | |
- target_df (pd.DataFrame): The updated target DataFrame with InsertedAt/By and UpdatedAt/By. | |
""" | |
# Columns to consider for hashing (relevant columns for comparison) | |
comparison_columns = [col for col in source_df.columns if col not in ignored_columns and col != primary_key] | |
# Create a hash for each row in source and target DataFrames based on comparison columns | |
def hash_row(row): | |
return hashlib.md5(pd.util.hash_pandas_object(row[comparison_columns], index=False).values.tobytes()).hexdigest() | |
source_df['__hash__'] = source_df.apply(hash_row, axis=1) | |
target_df['__hash__'] = target_df.apply(hash_row, axis=1) | |
# Identify deleted rows | |
deleted_rows = target_df[~target_df[primary_key].isin(source_df[primary_key])].copy() | |
# Identify inserted rows | |
inserted_rows = source_df[~source_df[primary_key].isin(target_df[primary_key])].copy() | |
now = datetime.utcnow() | |
if not inserted_rows.empty: | |
inserted_rows['InsertedAt'] = now | |
inserted_rows['InsertedBy'] = 'SystemProcess' | |
inserted_rows['UpdatedAt'] = pd.NaT # Set to Not a Time for new inserts | |
inserted_rows['UpdatedBy'] = None # Set to None for new inserts | |
# Identify potential updates (rows present in both) | |
common_keys = pd.merge(source_df[[primary_key, '__hash__']], target_df[[primary_key, '__hash__']], on=primary_key, suffixes=('_source', '_target')) | |
updated_keys = common_keys[common_keys['__hash___source'] != common_keys['__hash___target']] | |
updated_rows = source_df[source_df[primary_key].isin(updated_keys[primary_key])].copy() | |
if not updated_rows.empty: | |
updated_rows['UpdatedAt'] = now | |
updated_rows['UpdatedBy'] = 'SystemProcess' | |
# Preserve original InsertedAt/By for updated rows if they exist in target_df | |
existing_target_info = target_df[target_df[primary_key].isin(updated_rows[primary_key])][[primary_key, 'InsertedAt', 'InsertedBy']] | |
updated_rows = pd.merge(updated_rows.drop(columns=['InsertedAt', 'InsertedBy'], errors='ignore'), existing_target_info, on=primary_key, how='left') | |
# Combine inserted and updated rows to form the new target_df | |
# Start with target_df, remove deleted and updated rows, then add new and updated rows | |
# Remove rows from target_df that are either deleted or updated | |
target_df_filtered = target_df[~target_df[primary_key].isin(deleted_rows[primary_key])].copy() | |
target_df_filtered = target_df_filtered[~target_df_filtered[primary_key].isin(updated_rows[primary_key])].copy() | |
# Append inserted and updated rows | |
final_target_df = pd.concat([target_df_filtered, inserted_rows.drop(columns=['__hash__']), updated_rows.drop(columns=['__hash__'])], ignore_index=True) | |
# Clean up hash columns | |
source_df.drop(columns=['__hash__'], inplace=True) | |
target_df.drop(columns=['__hash__'], inplace=True) | |
return inserted_rows.drop(columns=['__hash__']), updated_rows.drop(columns=['__hash__']), deleted_rows, final_target_df | |
# --- Sample Data Generation --- | |
# Define the schema | |
schema = { | |
'UserId': int, | |
'UserName': str, | |
'DisplayName': str, | |
'BirthYear': int, | |
'InsertedAt': 'datetime64[ns]', | |
'InsertedBy': str, | |
'UpdatedAt': 'datetime64[ns]', | |
'UpdatedBy': str, | |
} | |
# Initial data for target_df | |
data_target = [ | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Bob Johnson', 'BirthYear': 1985, 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': 1992, 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 4, 'UserName': 'david', 'DisplayName': 'David Lee', 'BirthYear': 1988, 'InsertedAt': datetime(2023, 2, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
target_df = pd.DataFrame(data_target).astype(schema) | |
# Source data with inserts, updates, and deletes | |
data_source = [ | |
# Existing user, no change | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Existing user, updated DisplayName and BirthYear | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Robert Johnson', 'BirthYear': 1986, 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# New user (insert) | |
{'UserId': 5, 'UserName': 'eve', 'DisplayName': 'Eve Adams', 'BirthYear': 1995, 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Existing user, updated DisplayName (BirthYear is null in source, but not in target, should still be an update) | |
{'UserId': 4, 'UserName': 'david', 'DisplayName': 'Dave Lee', 'BirthYear': 1988, 'InsertedAt': datetime(2023, 2, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Another new user (insert) | |
{'UserId': 6, 'UserName': 'frank', 'DisplayName': 'Frank White', 'BirthYear': 1991, 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
source_df = pd.DataFrame(data_source).astype(schema) | |
# --- Configuration --- | |
primary_key_col = 'UserName' | |
ignored_cols = ['UserId', 'InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy'] | |
# --- Detect Changes --- | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
# --- Output Results --- | |
print("--- Change Detection Results ---") | |
print(f"Inserted Rows Count: {len(inserted)}") | |
print(f"Updated Rows Count: {len(updated)}") | |
print(f"Deleted Rows Count: {len(deleted)}") | |
print("\nInserted Rows:") | |
print(inserted) | |
print("\nUpdated Rows:") | |
print(updated) | |
print("\nDeleted Rows:") | |
print(deleted) | |
print("\nFinal Target DataFrame:") | |
print(final_target_df) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import pytest | |
from datetime import datetime, timedelta | |
from pd_merge_1 import detect_dataframe_changes | |
@pytest.fixture | |
def common_schema(): | |
return { | |
'UserId': int, | |
'UserName': str, | |
'DisplayName': str, | |
'BirthYear': int, | |
'EnrollmentDate': 'datetime64[ns]', | |
'LastLogin': 'datetime64[ns]', | |
'DurationOnline': 'timedelta64[ns]', | |
'InsertedAt': 'datetime64[ns]', | |
'InsertedBy': str, | |
'UpdatedAt': 'datetime64[ns]', | |
'UpdatedBy': str, | |
} | |
@pytest.fixture | |
def initial_data(): | |
return [ | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Bob Johnson', 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': 1992, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
@pytest.fixture | |
def primary_key_col(): | |
return 'UserName' | |
@pytest.fixture | |
def ignored_cols(): | |
return ['InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy'] | |
def test_no_changes(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) | |
source_df = pd.DataFrame(initial_data).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 0 | |
assert len(updated) == 0 | |
assert len(deleted) == 0 | |
pd.testing.assert_frame_equal(final_target_df.drop(columns=['__hash__'], errors='ignore'), target_df.drop(columns=['__hash__'], errors='ignore')) | |
def test_inserts_only(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) | |
new_row_data = { | |
'UserId': 4, 'UserName': 'diana', 'DisplayName': 'Diana Prince', 'BirthYear': 1995, | |
'EnrollmentDate': datetime(2022, 4, 1), 'LastLogin': datetime(2023, 1, 15, 9, 0, 0), | |
'DurationOnline': timedelta(hours=3), 'InsertedAt': pd.NaT, 'InsertedBy': None, | |
'UpdatedAt': pd.NaT, 'UpdatedBy': None | |
} | |
source_df = pd.concat([target_df, pd.DataFrame([new_row_data]).astype(common_schema)], ignore_index=True) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 1 | |
assert len(updated) == 0 | |
assert len(deleted) == 0 | |
# Assert the new row is in the final_target_df | |
assert 'diana' in final_target_df[primary_key_col].values | |
# Assert InsertedAt and InsertedBy are set for the new row | |
inserted_row = final_target_df[final_target_df[primary_key_col] == 'diana'] | |
assert pd.notna(inserted_row['InsertedAt'].iloc) | |
assert inserted_row['InsertedBy'].iloc == 'SystemProcess' | |
assert pd.isna(inserted_row['UpdatedAt'].iloc) | |
assert inserted_row['UpdatedBy'].iloc is None | |
# Assert the total count of rows | |
assert len(final_target_df) == len(initial_data) + 1 | |
def test_updates_only(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) | |
# Create a modified version of an existing row (Alice) | |
updated_alice_data = initial_data.copy() | |
updated_alice_data['DisplayName'] = 'Alice Wonderland' # Change a non-ignored column | |
updated_alice_data['BirthYear'] = 1991 # Change another non-ignored column | |
# Create source_df with the updated row and other unchanged rows | |
source_df_data = [ | |
updated_alice_data, | |
initial_data, # Bob | |
initial_data # Charlie | |
] | |
source_df = pd.DataFrame(source_df_data).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 0 | |
assert len(updated) == 1 | |
assert len(deleted) == 0 | |
# Assert the updated row is in the final_target_df with new values | |
updated_row_in_final = final_target_df[final_target_df[primary_key_col] == 'alice'] | |
assert updated_row_in_final['DisplayName'].iloc == 'Alice Wonderland' | |
assert updated_row_in_final['BirthYear'].iloc == 1991 | |
# Assert UpdatedAt and UpdatedBy are set for the updated row | |
assert pd.notna(updated_row_in_final['UpdatedAt'].iloc) | |
assert updated_row_in_final['UpdatedBy'].iloc == 'SystemProcess' | |
# Assert InsertedAt and InsertedBy are preserved | |
assert pd.notna(updated_row_in_final['InsertedAt'].iloc) | |
assert updated_row_in_final['InsertedBy'].iloc == 'Admin' | |
# Assert the total count of rows remains the same | |
assert len(final_target_df) == len(initial_data) | |
def test_deletes_only(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) | |
# Source DataFrame has one less row (Charlie is deleted) | |
source_df_data = [ | |
initial_data, # Alice | |
initial_data # Bob | |
] | |
source_df = pd.DataFrame(source_df_data).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 0 | |
assert len(updated) == 0 | |
assert len(deleted) == 1 | |
# Assert the deleted row is not in the final_target_df | |
assert 'charlie' not in final_target_df[primary_key_col].values | |
# Assert the total count of rows is reduced by one | |
assert len(final_target_df) == len(initial_data) - 1 | |
def test_mixed_changes(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) # Alice, Bob, Charlie | |
# Alice: Updated | |
updated_alice_data = initial_data.copy() | |
updated_alice_data['DisplayName'] = 'Alice X' | |
updated_alice_data['BirthYear'] = 1989 | |
# Bob: No Change (will be implicitly handled by not being in updated/deleted) | |
# Charlie: Deleted (not included in source_df) | |
# Diana: Inserted | |
new_diana_data = { | |
'UserId': 4, 'UserName': 'diana', 'DisplayName': 'Diana Prince', 'BirthYear': 1995, | |
'EnrollmentDate': datetime(2022, 4, 1), 'LastLogin': datetime(2023, 1, 15, 9, 0, 0), | |
'DurationOnline': timedelta(hours=3), 'InsertedAt': pd.NaT, 'InsertedBy': None, | |
'UpdatedAt': pd.NaT, 'UpdatedBy': None | |
} | |
# Eve: Inserted | |
new_eve_data = { | |
'UserId': 5, 'UserName': 'eve', 'DisplayName': 'Eve Adams', 'BirthYear': 1998, | |
'EnrollmentDate': datetime(2022, 5, 1), 'LastLogin': datetime(2023, 1, 20, 14, 0, 0), | |
'DurationOnline': timedelta(hours=2), 'InsertedAt': pd.NaT, 'InsertedBy': None, | |
'UpdatedAt': pd.NaT, 'UpdatedBy': None | |
} | |
source_df_data = [ | |
updated_alice_data, | |
initial_data, # Bob (unchanged) | |
new_diana_data, | |
new_eve_data | |
] | |
source_df = pd.DataFrame(source_df_data).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 2 # Diana, Eve | |
assert len(updated) == 1 # Alice | |
assert len(deleted) == 1 # Charlie | |
# Assert final_target_df contains Alice (updated), Bob (unchanged), Diana (inserted), Eve (inserted) | |
expected_user_names = {'alice', 'bob', 'diana', 'eve'} | |
assert set(final_target_df[primary_key_col].values) == expected_user_names | |
assert len(final_target_df) == len(initial_data) - 1 + 2 # 3 - 1 (deleted) + 2 (inserted) = 4 | |
# Assert Alice's updated values | |
alice_in_final = final_target_df[final_target_df[primary_key_col] == 'alice'] | |
assert alice_in_final['DisplayName'].iloc == 'Alice X' | |
assert alice_in_final['BirthYear'].iloc == 1989 | |
assert pd.notna(alice_in_final['UpdatedAt'].iloc) | |
assert alice_in_final['UpdatedBy'].iloc == 'SystemProcess' | |
assert pd.notna(alice_in_final['InsertedAt'].iloc) | |
assert alice_in_final['InsertedBy'].iloc == 'Admin' | |
# Assert Diana's inserted values | |
diana_in_final = final_target_df[final_target_df[primary_key_col] == 'diana'] | |
assert pd.notna(diana_in_final['InsertedAt'].iloc) | |
assert diana_in_final['InsertedBy'].iloc == 'SystemProcess' | |
assert pd.isna(diana_in_final['UpdatedAt'].iloc) | |
assert diana_in_final['UpdatedBy'].iloc is None | |
# Assert Eve's inserted values | |
eve_in_final = final_target_df[final_target_df[primary_key_col] == 'eve'] | |
assert pd.notna(eve_in_final['InsertedAt'].iloc) | |
assert eve_in_final['InsertedBy'].iloc == 'SystemProcess' | |
assert pd.isna(eve_in_final['UpdatedAt'].iloc) | |
assert eve_in_final['UpdatedBy'].iloc is None | |
# Assert Bob's unchanged values | |
bob_in_final = final_target_df[final_target_df[primary_key_col] == 'bob'] | |
assert bob_in_final['DisplayName'].iloc == 'Bob Johnson' | |
assert bob_in_final['BirthYear'].iloc == 1985 | |
assert pd.isna(bob_in_final['UpdatedAt'].iloc) | |
assert bob_in_final['UpdatedBy'].iloc is None | |
assert pd.notna(bob_in_final['InsertedAt'].iloc) | |
assert bob_in_final['InsertedBy'].iloc == 'Admin' | |
def test_empty_source_dataframe(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) | |
source_df = pd.DataFrame(columns=common_schema.keys()).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 0 | |
assert len(updated) == 0 | |
assert len(deleted) == len(initial_data) # All rows in target should be deleted | |
# Assert final_target_df is empty | |
assert final_target_df.empty | |
def test_empty_target_dataframe(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(columns=common_schema.keys()).astype(common_schema) | |
source_df = pd.DataFrame(initial_data).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == len(initial_data) # All source rows should be inserted | |
assert len(updated) == 0 | |
assert len(deleted) == 0 | |
# Assert final_target_df contains all initial_data rows | |
pd.testing.assert_frame_equal( | |
final_target_df.drop(columns=['InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy', '__hash__'], errors='ignore'), | |
source_df.drop(columns=['InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy', '__hash__'], errors='ignore') | |
) | |
# Check InsertedAt/By for all rows | |
assert final_target_df['InsertedBy'].eq('SystemProcess').all() | |
assert final_target_df['UpdatedAt'].isna().all() | |
assert final_target_df['UpdatedBy'].isna().all() | |
def test_source_with_null_rows(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) | |
# Create a source_df with a new row containing nulls and an updated row with nulls | |
source_data_with_nulls = [ | |
# Alice - updated with a null in DisplayName | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': None, 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Bob - no change | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Bob Johnson', 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# New user Frank with some nulls | |
{'UserId': 4, 'UserName': 'frank', 'DisplayName': 'Frank', 'BirthYear': None, 'EnrollmentDate': datetime(2022, 6, 1), 'LastLogin': None, 'DurationOnline': timedelta(hours=1), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
source_df = pd.DataFrame(source_data_with_nulls).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 1 # Frank | |
assert len(updated) == 1 # Alice | |
assert len(deleted) == 1 # Charlie | |
# Assert Frank is inserted with nulls handled | |
frank_in_final = final_target_df[final_target_df[primary_key_col] == 'frank'] | |
assert frank_in_final['DisplayName'].iloc == 'Frank' | |
assert pd.isna(frank_in_final['BirthYear'].iloc) | |
assert pd.isna(frank_in_final['LastLogin'].iloc) | |
assert pd.notna(frank_in_final['InsertedAt'].iloc) | |
assert frank_in_final['InsertedBy'].iloc == 'SystemProcess' | |
# Assert Alice is updated with null DisplayName | |
alice_in_final = final_target_df[final_target_df[primary_key_col] == 'alice'] | |
assert pd.isna(alice_in_final['DisplayName'].iloc) | |
assert alice_in_final['BirthYear'].iloc == 1990 # BirthYear should be unchanged | |
assert pd.notna(alice_in_final['UpdatedAt'].iloc) | |
assert alice_in_final['UpdatedBy'].iloc == 'SystemProcess' | |
assert pd.notna(alice_in_final['InsertedAt'].iloc) | |
assert alice_in_final['InsertedBy'].iloc == 'Admin' | |
# Assert Charlie is deleted | |
assert 'charlie' not in final_target_df[primary_key_col].values | |
# Assert Bob is unchanged | |
bob_in_final = final_target_df[final_target_df[primary_key_col] == 'bob'] | |
assert bob_in_final['DisplayName'].iloc == 'Bob Johnson' | |
assert pd.isna(bob_in_final['UpdatedAt'].iloc) | |
assert bob_in_final['UpdatedBy'].iloc is None | |
def test_target_with_null_rows(common_schema, initial_data, primary_key_col, ignored_cols): | |
# Target DataFrame with nulls | |
target_data_with_nulls = [ | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': None, 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': None, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': None, 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
target_df = pd.DataFrame(target_data_with_nulls).astype(common_schema) | |
# Source DataFrame with updates and inserts, some matching nulls | |
source_data = [ | |
# Alice - no change | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Bob - DisplayName updated from None to 'Robert Johnson' | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Robert Johnson', 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Charlie - BirthYear updated from None to 1992, LastLogin updated from None | |
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': 1992, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# New user David | |
{'UserId': 4, 'UserName': 'david', 'DisplayName': 'David Lee', 'BirthYear': 1988, 'EnrollmentDate': datetime(2022, 7, 1), 'LastLogin': datetime(2023, 1, 25, 8, 0, 0), 'DurationOnline': timedelta(hours=6), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
source_df = pd.DataFrame(source_data).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 1 # David | |
assert len(updated) == 2 # Bob, Charlie | |
assert len(deleted) == 0 | |
# Assert David is inserted | |
assert 'david' in final_target_df[primary_key_col].values | |
# Assert Bob is updated | |
bob_in_final = final_target_df[final_target_df[primary_key_col] == 'bob'] | |
assert bob_in_final['DisplayName'].iloc == 'Robert Johnson' | |
assert pd.notna(bob_in_final['UpdatedAt'].iloc) | |
assert bob_in_final['UpdatedBy'].iloc == 'SystemProcess' | |
# Assert Charlie is updated | |
charlie_in_final = final_target_df[final_target_df[primary_key_col] == 'charlie'] | |
assert charlie_in_final['BirthYear'].iloc == 1992 | |
assert pd.notna(charlie_in_final['LastLogin'].iloc) | |
assert pd.notna(charlie_in_final['UpdatedAt'].iloc) | |
assert charlie_in_final['UpdatedBy'].iloc == 'SystemProcess' | |
def test_source_with_null_columns(common_schema, initial_data, primary_key_col, ignored_cols): | |
target_df = pd.DataFrame(initial_data).astype(common_schema) | |
# Create a source_df where 'DisplayName' and 'BirthYear' are entirely null | |
source_data_null_cols = [ | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
source_df = pd.DataFrame(source_data_null_cols).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 0 | |
assert len(updated) == 3 # All three rows should be updated as non-ignored columns changed to null | |
assert len(deleted) == 0 | |
# Assert all rows in final_target_df have nulls in DisplayName and BirthYear | |
assert final_target_df['DisplayName'].isna().all() | |
assert final_target_df['BirthYear'].isna().all() | |
# Assert UpdatedAt/By are set for all rows | |
assert final_target_df['UpdatedAt'].notna().all() | |
assert final_target_df['UpdatedBy'].eq('SystemProcess').all() | |
def test_target_with_null_columns(common_schema, initial_data, primary_key_col, ignored_cols): | |
# Target DataFrame with 'DisplayName' and 'BirthYear' columns entirely null | |
target_data_null_cols = [ | |
{'UserId': 1, 'UserName': 'alice', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 2, 'UserName': 'bob', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
target_df = pd.DataFrame(target_data_null_cols).astype(common_schema) | |
# Source DataFrame with actual values for 'DisplayName' and 'BirthYear' | |
source_df = pd.DataFrame(initial_data).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 0 | |
assert len(updated) == 3 # All three rows should be updated as non-ignored columns changed from null to value | |
assert len(deleted) == 0 | |
# Assert all rows in final_target_df have non-nulls in DisplayName and BirthYear | |
assert final_target_df['DisplayName'].notna().all() | |
assert final_target_df['BirthYear'].notna().all() | |
# Assert UpdatedAt/By are set for all rows | |
assert final_target_df['UpdatedAt'].notna().all() | |
assert final_target_df['UpdatedBy'].eq('SystemProcess').all() | |
def test_mixed_data_types_and_nulls(common_schema, primary_key_col, ignored_cols): | |
# Initial data with mixed types and some nulls | |
initial_data_mixed = [ | |
{'UserId': 1, 'UserName': 'alpha', 'DisplayName': 'Alpha User', 'BirthYear': 2000, 'EnrollmentDate': datetime(2021, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=1), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 2, 'UserName': 'beta', 'DisplayName': None, 'BirthYear': 1995, 'EnrollmentDate': datetime(2021, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 0, 0), 'DurationOnline': timedelta(minutes=30), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
{'UserId': 3, 'UserName': 'gamma', 'DisplayName': 'Gamma User', 'BirthYear': None, 'EnrollmentDate': datetime(2021, 3, 1), 'LastLogin': None, 'DurationOnline': timedelta(seconds=120), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
] | |
target_df = pd.DataFrame(initial_data_mixed).astype(common_schema) | |
# Source data with various changes, including nulls and different data types | |
source_data_mixed = [ | |
# Alpha: Update DisplayName, BirthYear, DurationOnline | |
{'UserId': 1, 'UserName': 'alpha', 'DisplayName': 'Alpha New', 'BirthYear': 2001, 'EnrollmentDate': datetime(2021, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=2), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Beta: Update DisplayName from None to a value, LastLogin | |
{'UserId': 2, 'UserName': 'beta', 'DisplayName': 'Beta New', 'BirthYear': 1995, 'EnrollmentDate': datetime(2021, 2, 1), 'LastLogin': datetime(2023, 1, 6, 12, 0, 0), 'DurationOnline': timedelta(minutes=30), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Delta: Insert new row with some nulls | |
{'UserId': 4, 'UserName': 'delta', 'DisplayName': 'Delta User', 'BirthYear': 1990, 'EnrollmentDate': datetime(2021, 4, 1), 'LastLogin': None, 'DurationOnline': timedelta(hours=3), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None}, | |
# Gamma is deleted (not in source) | |
] | |
source_df = pd.DataFrame(source_data_mixed).astype(common_schema) | |
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols) | |
assert len(inserted) == 1 # Delta | |
assert len(updated) == 2 # Alpha, Beta | |
assert len(deleted) == 1 # Gamma | |
# Assert Delta is inserted | |
delta_in_final = final_target_df[final_target_df[primary_key_col] == 'delta'] | |
assert delta_in_final['DisplayName'].iloc == 'Delta User' | |
assert pd.isna(delta_in_final['LastLogin'].iloc) | |
assert pd.notna(delta_in_final['InsertedAt'].iloc) | |
assert delta_in_final['InsertedBy'].iloc == 'SystemProcess' | |
# Assert Alpha is updated | |
alpha_in_final = final_target_df[final_target_df[primary_key_col] == 'alpha'] | |
assert alpha_in_final['DisplayName'].iloc == 'Alpha New' | |
assert alpha_in_final['BirthYear'].iloc == 2001 | |
assert alpha_in_final['DurationOnline'].iloc == timedelta(hours=2) | |
assert pd.notna(alpha_in_final['UpdatedAt'].iloc) | |
assert alpha_in_final['UpdatedBy'].iloc == 'SystemProcess' | |
# Assert Beta is updated | |
beta_in_final = final_target_df[final_target_df[primary_key_col] == 'beta'] | |
assert beta_in_final['DisplayName'].iloc == 'Beta New' | |
assert beta_in_final['LastLogin'].iloc == datetime(2023, 1, 6, 12, 0, 0) | |
assert pd.notna(beta_in_final['UpdatedAt'].iloc) | |
assert beta_in_final['UpdatedBy'].iloc == 'SystemProcess' | |
# Assert Gamma is deleted | |
assert 'gamma' not in final_target_df[primary_key_col].values |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment