Skip to content

Instantly share code, notes, and snippets.

@fuhoi
Created August 24, 2025 21:38
Show Gist options
  • Save fuhoi/c30b11667d02ab477ed6d2286d3b6459 to your computer and use it in GitHub Desktop.
Save fuhoi/c30b11667d02ab477ed6d2286d3b6459 to your computer and use it in GitHub Desktop.
pd_merge
import pandas as pd
import hashlib
from datetime import datetime
def detect_dataframe_changes(source_df: pd.DataFrame, target_df: pd.DataFrame, primary_key: str, ignored_columns: list):
"""
Detects inserted, updated, and deleted rows between two DataFrames.
Args:
source_df (pd.DataFrame): The source DataFrame.
target_df (pd.DataFrame): The target DataFrame.
primary_key (str): The name of the primary key column.
ignored_columns (list): A list of columns to ignore for change detection.
Returns:
tuple: A tuple containing:
- inserted_rows (pd.DataFrame): DataFrame of inserted rows.
- updated_rows (pd.DataFrame): DataFrame of updated rows.
- deleted_rows (pd.DataFrame): DataFrame of deleted rows.
- target_df (pd.DataFrame): The updated target DataFrame with InsertedAt/By and UpdatedAt/By.
"""
# Columns to consider for hashing (relevant columns for comparison)
comparison_columns = [col for col in source_df.columns if col not in ignored_columns and col != primary_key]
# Create a hash for each row in source and target DataFrames based on comparison columns
def hash_row(row):
return hashlib.md5(pd.util.hash_pandas_object(row[comparison_columns], index=False).values.tobytes()).hexdigest()
source_df['__hash__'] = source_df.apply(hash_row, axis=1)
target_df['__hash__'] = target_df.apply(hash_row, axis=1)
# Identify deleted rows
deleted_rows = target_df[~target_df[primary_key].isin(source_df[primary_key])].copy()
# Identify inserted rows
inserted_rows = source_df[~source_df[primary_key].isin(target_df[primary_key])].copy()
now = datetime.utcnow()
if not inserted_rows.empty:
inserted_rows['InsertedAt'] = now
inserted_rows['InsertedBy'] = 'SystemProcess'
inserted_rows['UpdatedAt'] = pd.NaT # Set to Not a Time for new inserts
inserted_rows['UpdatedBy'] = None # Set to None for new inserts
# Identify potential updates (rows present in both)
common_keys = pd.merge(source_df[[primary_key, '__hash__']], target_df[[primary_key, '__hash__']], on=primary_key, suffixes=('_source', '_target'))
updated_keys = common_keys[common_keys['__hash___source'] != common_keys['__hash___target']]
updated_rows = source_df[source_df[primary_key].isin(updated_keys[primary_key])].copy()
if not updated_rows.empty:
updated_rows['UpdatedAt'] = now
updated_rows['UpdatedBy'] = 'SystemProcess'
# Preserve original InsertedAt/By for updated rows if they exist in target_df
existing_target_info = target_df[target_df[primary_key].isin(updated_rows[primary_key])][[primary_key, 'InsertedAt', 'InsertedBy']]
updated_rows = pd.merge(updated_rows.drop(columns=['InsertedAt', 'InsertedBy'], errors='ignore'), existing_target_info, on=primary_key, how='left')
# Combine inserted and updated rows to form the new target_df
# Start with target_df, remove deleted and updated rows, then add new and updated rows
# Remove rows from target_df that are either deleted or updated
target_df_filtered = target_df[~target_df[primary_key].isin(deleted_rows[primary_key])].copy()
target_df_filtered = target_df_filtered[~target_df_filtered[primary_key].isin(updated_rows[primary_key])].copy()
# Append inserted and updated rows
final_target_df = pd.concat([target_df_filtered, inserted_rows.drop(columns=['__hash__']), updated_rows.drop(columns=['__hash__'])], ignore_index=True)
# Clean up hash columns
source_df.drop(columns=['__hash__'], inplace=True)
target_df.drop(columns=['__hash__'], inplace=True)
return inserted_rows.drop(columns=['__hash__']), updated_rows.drop(columns=['__hash__']), deleted_rows, final_target_df
# --- Sample Data Generation ---
# Define the schema
schema = {
'UserId': int,
'UserName': str,
'DisplayName': str,
'BirthYear': int,
'InsertedAt': 'datetime64[ns]',
'InsertedBy': str,
'UpdatedAt': 'datetime64[ns]',
'UpdatedBy': str,
}
# Initial data for target_df
data_target = [
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Bob Johnson', 'BirthYear': 1985, 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': 1992, 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 4, 'UserName': 'david', 'DisplayName': 'David Lee', 'BirthYear': 1988, 'InsertedAt': datetime(2023, 2, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
target_df = pd.DataFrame(data_target).astype(schema)
# Source data with inserts, updates, and deletes
data_source = [
# Existing user, no change
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Existing user, updated DisplayName and BirthYear
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Robert Johnson', 'BirthYear': 1986, 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# New user (insert)
{'UserId': 5, 'UserName': 'eve', 'DisplayName': 'Eve Adams', 'BirthYear': 1995, 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Existing user, updated DisplayName (BirthYear is null in source, but not in target, should still be an update)
{'UserId': 4, 'UserName': 'david', 'DisplayName': 'Dave Lee', 'BirthYear': 1988, 'InsertedAt': datetime(2023, 2, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Another new user (insert)
{'UserId': 6, 'UserName': 'frank', 'DisplayName': 'Frank White', 'BirthYear': 1991, 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
source_df = pd.DataFrame(data_source).astype(schema)
# --- Configuration ---
primary_key_col = 'UserName'
ignored_cols = ['UserId', 'InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy']
# --- Detect Changes ---
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
# --- Output Results ---
print("--- Change Detection Results ---")
print(f"Inserted Rows Count: {len(inserted)}")
print(f"Updated Rows Count: {len(updated)}")
print(f"Deleted Rows Count: {len(deleted)}")
print("\nInserted Rows:")
print(inserted)
print("\nUpdated Rows:")
print(updated)
print("\nDeleted Rows:")
print(deleted)
print("\nFinal Target DataFrame:")
print(final_target_df)
import pandas as pd
import pytest
from datetime import datetime, timedelta
from pd_merge_1 import detect_dataframe_changes
@pytest.fixture
def common_schema():
return {
'UserId': int,
'UserName': str,
'DisplayName': str,
'BirthYear': int,
'EnrollmentDate': 'datetime64[ns]',
'LastLogin': 'datetime64[ns]',
'DurationOnline': 'timedelta64[ns]',
'InsertedAt': 'datetime64[ns]',
'InsertedBy': str,
'UpdatedAt': 'datetime64[ns]',
'UpdatedBy': str,
}
@pytest.fixture
def initial_data():
return [
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Bob Johnson', 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': 1992, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
@pytest.fixture
def primary_key_col():
return 'UserName'
@pytest.fixture
def ignored_cols():
return ['InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy']
def test_no_changes(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema)
source_df = pd.DataFrame(initial_data).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 0
assert len(updated) == 0
assert len(deleted) == 0
pd.testing.assert_frame_equal(final_target_df.drop(columns=['__hash__'], errors='ignore'), target_df.drop(columns=['__hash__'], errors='ignore'))
def test_inserts_only(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema)
new_row_data = {
'UserId': 4, 'UserName': 'diana', 'DisplayName': 'Diana Prince', 'BirthYear': 1995,
'EnrollmentDate': datetime(2022, 4, 1), 'LastLogin': datetime(2023, 1, 15, 9, 0, 0),
'DurationOnline': timedelta(hours=3), 'InsertedAt': pd.NaT, 'InsertedBy': None,
'UpdatedAt': pd.NaT, 'UpdatedBy': None
}
source_df = pd.concat([target_df, pd.DataFrame([new_row_data]).astype(common_schema)], ignore_index=True)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 1
assert len(updated) == 0
assert len(deleted) == 0
# Assert the new row is in the final_target_df
assert 'diana' in final_target_df[primary_key_col].values
# Assert InsertedAt and InsertedBy are set for the new row
inserted_row = final_target_df[final_target_df[primary_key_col] == 'diana']
assert pd.notna(inserted_row['InsertedAt'].iloc)
assert inserted_row['InsertedBy'].iloc == 'SystemProcess'
assert pd.isna(inserted_row['UpdatedAt'].iloc)
assert inserted_row['UpdatedBy'].iloc is None
# Assert the total count of rows
assert len(final_target_df) == len(initial_data) + 1
def test_updates_only(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema)
# Create a modified version of an existing row (Alice)
updated_alice_data = initial_data.copy()
updated_alice_data['DisplayName'] = 'Alice Wonderland' # Change a non-ignored column
updated_alice_data['BirthYear'] = 1991 # Change another non-ignored column
# Create source_df with the updated row and other unchanged rows
source_df_data = [
updated_alice_data,
initial_data, # Bob
initial_data # Charlie
]
source_df = pd.DataFrame(source_df_data).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 0
assert len(updated) == 1
assert len(deleted) == 0
# Assert the updated row is in the final_target_df with new values
updated_row_in_final = final_target_df[final_target_df[primary_key_col] == 'alice']
assert updated_row_in_final['DisplayName'].iloc == 'Alice Wonderland'
assert updated_row_in_final['BirthYear'].iloc == 1991
# Assert UpdatedAt and UpdatedBy are set for the updated row
assert pd.notna(updated_row_in_final['UpdatedAt'].iloc)
assert updated_row_in_final['UpdatedBy'].iloc == 'SystemProcess'
# Assert InsertedAt and InsertedBy are preserved
assert pd.notna(updated_row_in_final['InsertedAt'].iloc)
assert updated_row_in_final['InsertedBy'].iloc == 'Admin'
# Assert the total count of rows remains the same
assert len(final_target_df) == len(initial_data)
def test_deletes_only(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema)
# Source DataFrame has one less row (Charlie is deleted)
source_df_data = [
initial_data, # Alice
initial_data # Bob
]
source_df = pd.DataFrame(source_df_data).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 0
assert len(updated) == 0
assert len(deleted) == 1
# Assert the deleted row is not in the final_target_df
assert 'charlie' not in final_target_df[primary_key_col].values
# Assert the total count of rows is reduced by one
assert len(final_target_df) == len(initial_data) - 1
def test_mixed_changes(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema) # Alice, Bob, Charlie
# Alice: Updated
updated_alice_data = initial_data.copy()
updated_alice_data['DisplayName'] = 'Alice X'
updated_alice_data['BirthYear'] = 1989
# Bob: No Change (will be implicitly handled by not being in updated/deleted)
# Charlie: Deleted (not included in source_df)
# Diana: Inserted
new_diana_data = {
'UserId': 4, 'UserName': 'diana', 'DisplayName': 'Diana Prince', 'BirthYear': 1995,
'EnrollmentDate': datetime(2022, 4, 1), 'LastLogin': datetime(2023, 1, 15, 9, 0, 0),
'DurationOnline': timedelta(hours=3), 'InsertedAt': pd.NaT, 'InsertedBy': None,
'UpdatedAt': pd.NaT, 'UpdatedBy': None
}
# Eve: Inserted
new_eve_data = {
'UserId': 5, 'UserName': 'eve', 'DisplayName': 'Eve Adams', 'BirthYear': 1998,
'EnrollmentDate': datetime(2022, 5, 1), 'LastLogin': datetime(2023, 1, 20, 14, 0, 0),
'DurationOnline': timedelta(hours=2), 'InsertedAt': pd.NaT, 'InsertedBy': None,
'UpdatedAt': pd.NaT, 'UpdatedBy': None
}
source_df_data = [
updated_alice_data,
initial_data, # Bob (unchanged)
new_diana_data,
new_eve_data
]
source_df = pd.DataFrame(source_df_data).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 2 # Diana, Eve
assert len(updated) == 1 # Alice
assert len(deleted) == 1 # Charlie
# Assert final_target_df contains Alice (updated), Bob (unchanged), Diana (inserted), Eve (inserted)
expected_user_names = {'alice', 'bob', 'diana', 'eve'}
assert set(final_target_df[primary_key_col].values) == expected_user_names
assert len(final_target_df) == len(initial_data) - 1 + 2 # 3 - 1 (deleted) + 2 (inserted) = 4
# Assert Alice's updated values
alice_in_final = final_target_df[final_target_df[primary_key_col] == 'alice']
assert alice_in_final['DisplayName'].iloc == 'Alice X'
assert alice_in_final['BirthYear'].iloc == 1989
assert pd.notna(alice_in_final['UpdatedAt'].iloc)
assert alice_in_final['UpdatedBy'].iloc == 'SystemProcess'
assert pd.notna(alice_in_final['InsertedAt'].iloc)
assert alice_in_final['InsertedBy'].iloc == 'Admin'
# Assert Diana's inserted values
diana_in_final = final_target_df[final_target_df[primary_key_col] == 'diana']
assert pd.notna(diana_in_final['InsertedAt'].iloc)
assert diana_in_final['InsertedBy'].iloc == 'SystemProcess'
assert pd.isna(diana_in_final['UpdatedAt'].iloc)
assert diana_in_final['UpdatedBy'].iloc is None
# Assert Eve's inserted values
eve_in_final = final_target_df[final_target_df[primary_key_col] == 'eve']
assert pd.notna(eve_in_final['InsertedAt'].iloc)
assert eve_in_final['InsertedBy'].iloc == 'SystemProcess'
assert pd.isna(eve_in_final['UpdatedAt'].iloc)
assert eve_in_final['UpdatedBy'].iloc is None
# Assert Bob's unchanged values
bob_in_final = final_target_df[final_target_df[primary_key_col] == 'bob']
assert bob_in_final['DisplayName'].iloc == 'Bob Johnson'
assert bob_in_final['BirthYear'].iloc == 1985
assert pd.isna(bob_in_final['UpdatedAt'].iloc)
assert bob_in_final['UpdatedBy'].iloc is None
assert pd.notna(bob_in_final['InsertedAt'].iloc)
assert bob_in_final['InsertedBy'].iloc == 'Admin'
def test_empty_source_dataframe(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema)
source_df = pd.DataFrame(columns=common_schema.keys()).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 0
assert len(updated) == 0
assert len(deleted) == len(initial_data) # All rows in target should be deleted
# Assert final_target_df is empty
assert final_target_df.empty
def test_empty_target_dataframe(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(columns=common_schema.keys()).astype(common_schema)
source_df = pd.DataFrame(initial_data).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == len(initial_data) # All source rows should be inserted
assert len(updated) == 0
assert len(deleted) == 0
# Assert final_target_df contains all initial_data rows
pd.testing.assert_frame_equal(
final_target_df.drop(columns=['InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy', '__hash__'], errors='ignore'),
source_df.drop(columns=['InsertedAt', 'InsertedBy', 'UpdatedAt', 'UpdatedBy', '__hash__'], errors='ignore')
)
# Check InsertedAt/By for all rows
assert final_target_df['InsertedBy'].eq('SystemProcess').all()
assert final_target_df['UpdatedAt'].isna().all()
assert final_target_df['UpdatedBy'].isna().all()
def test_source_with_null_rows(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema)
# Create a source_df with a new row containing nulls and an updated row with nulls
source_data_with_nulls = [
# Alice - updated with a null in DisplayName
{'UserId': 1, 'UserName': 'alice', 'DisplayName': None, 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Bob - no change
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Bob Johnson', 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# New user Frank with some nulls
{'UserId': 4, 'UserName': 'frank', 'DisplayName': 'Frank', 'BirthYear': None, 'EnrollmentDate': datetime(2022, 6, 1), 'LastLogin': None, 'DurationOnline': timedelta(hours=1), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
source_df = pd.DataFrame(source_data_with_nulls).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 1 # Frank
assert len(updated) == 1 # Alice
assert len(deleted) == 1 # Charlie
# Assert Frank is inserted with nulls handled
frank_in_final = final_target_df[final_target_df[primary_key_col] == 'frank']
assert frank_in_final['DisplayName'].iloc == 'Frank'
assert pd.isna(frank_in_final['BirthYear'].iloc)
assert pd.isna(frank_in_final['LastLogin'].iloc)
assert pd.notna(frank_in_final['InsertedAt'].iloc)
assert frank_in_final['InsertedBy'].iloc == 'SystemProcess'
# Assert Alice is updated with null DisplayName
alice_in_final = final_target_df[final_target_df[primary_key_col] == 'alice']
assert pd.isna(alice_in_final['DisplayName'].iloc)
assert alice_in_final['BirthYear'].iloc == 1990 # BirthYear should be unchanged
assert pd.notna(alice_in_final['UpdatedAt'].iloc)
assert alice_in_final['UpdatedBy'].iloc == 'SystemProcess'
assert pd.notna(alice_in_final['InsertedAt'].iloc)
assert alice_in_final['InsertedBy'].iloc == 'Admin'
# Assert Charlie is deleted
assert 'charlie' not in final_target_df[primary_key_col].values
# Assert Bob is unchanged
bob_in_final = final_target_df[final_target_df[primary_key_col] == 'bob']
assert bob_in_final['DisplayName'].iloc == 'Bob Johnson'
assert pd.isna(bob_in_final['UpdatedAt'].iloc)
assert bob_in_final['UpdatedBy'].iloc is None
def test_target_with_null_rows(common_schema, initial_data, primary_key_col, ignored_cols):
# Target DataFrame with nulls
target_data_with_nulls = [
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 2, 'UserName': 'bob', 'DisplayName': None, 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': None, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': None, 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
target_df = pd.DataFrame(target_data_with_nulls).astype(common_schema)
# Source DataFrame with updates and inserts, some matching nulls
source_data = [
# Alice - no change
{'UserId': 1, 'UserName': 'alice', 'DisplayName': 'Alice Smith', 'BirthYear': 1990, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Bob - DisplayName updated from None to 'Robert Johnson'
{'UserId': 2, 'UserName': 'bob', 'DisplayName': 'Robert Johnson', 'BirthYear': 1985, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Charlie - BirthYear updated from None to 1992, LastLogin updated from None
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': 'Charlie Brown', 'BirthYear': 1992, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# New user David
{'UserId': 4, 'UserName': 'david', 'DisplayName': 'David Lee', 'BirthYear': 1988, 'EnrollmentDate': datetime(2022, 7, 1), 'LastLogin': datetime(2023, 1, 25, 8, 0, 0), 'DurationOnline': timedelta(hours=6), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
source_df = pd.DataFrame(source_data).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 1 # David
assert len(updated) == 2 # Bob, Charlie
assert len(deleted) == 0
# Assert David is inserted
assert 'david' in final_target_df[primary_key_col].values
# Assert Bob is updated
bob_in_final = final_target_df[final_target_df[primary_key_col] == 'bob']
assert bob_in_final['DisplayName'].iloc == 'Robert Johnson'
assert pd.notna(bob_in_final['UpdatedAt'].iloc)
assert bob_in_final['UpdatedBy'].iloc == 'SystemProcess'
# Assert Charlie is updated
charlie_in_final = final_target_df[final_target_df[primary_key_col] == 'charlie']
assert charlie_in_final['BirthYear'].iloc == 1992
assert pd.notna(charlie_in_final['LastLogin'].iloc)
assert pd.notna(charlie_in_final['UpdatedAt'].iloc)
assert charlie_in_final['UpdatedBy'].iloc == 'SystemProcess'
def test_source_with_null_columns(common_schema, initial_data, primary_key_col, ignored_cols):
target_df = pd.DataFrame(initial_data).astype(common_schema)
# Create a source_df where 'DisplayName' and 'BirthYear' are entirely null
source_data_null_cols = [
{'UserId': 1, 'UserName': 'alice', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 2, 'UserName': 'bob', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
source_df = pd.DataFrame(source_data_null_cols).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 0
assert len(updated) == 3 # All three rows should be updated as non-ignored columns changed to null
assert len(deleted) == 0
# Assert all rows in final_target_df have nulls in DisplayName and BirthYear
assert final_target_df['DisplayName'].isna().all()
assert final_target_df['BirthYear'].isna().all()
# Assert UpdatedAt/By are set for all rows
assert final_target_df['UpdatedAt'].notna().all()
assert final_target_df['UpdatedBy'].eq('SystemProcess').all()
def test_target_with_null_columns(common_schema, initial_data, primary_key_col, ignored_cols):
# Target DataFrame with 'DisplayName' and 'BirthYear' columns entirely null
target_data_null_cols = [
{'UserId': 1, 'UserName': 'alice', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=5), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 2, 'UserName': 'bob', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 30, 0), 'DurationOnline': timedelta(hours=10), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 3, 'UserName': 'charlie', 'DisplayName': None, 'BirthYear': None, 'EnrollmentDate': datetime(2022, 3, 1), 'LastLogin': datetime(2023, 1, 10, 12, 0, 0), 'DurationOnline': timedelta(hours=7), 'InsertedAt': datetime(2023, 1, 10), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
target_df = pd.DataFrame(target_data_null_cols).astype(common_schema)
# Source DataFrame with actual values for 'DisplayName' and 'BirthYear'
source_df = pd.DataFrame(initial_data).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 0
assert len(updated) == 3 # All three rows should be updated as non-ignored columns changed from null to value
assert len(deleted) == 0
# Assert all rows in final_target_df have non-nulls in DisplayName and BirthYear
assert final_target_df['DisplayName'].notna().all()
assert final_target_df['BirthYear'].notna().all()
# Assert UpdatedAt/By are set for all rows
assert final_target_df['UpdatedAt'].notna().all()
assert final_target_df['UpdatedBy'].eq('SystemProcess').all()
def test_mixed_data_types_and_nulls(common_schema, primary_key_col, ignored_cols):
# Initial data with mixed types and some nulls
initial_data_mixed = [
{'UserId': 1, 'UserName': 'alpha', 'DisplayName': 'Alpha User', 'BirthYear': 2000, 'EnrollmentDate': datetime(2021, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=1), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 2, 'UserName': 'beta', 'DisplayName': None, 'BirthYear': 1995, 'EnrollmentDate': datetime(2021, 2, 1), 'LastLogin': datetime(2023, 1, 5, 11, 0, 0), 'DurationOnline': timedelta(minutes=30), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
{'UserId': 3, 'UserName': 'gamma', 'DisplayName': 'Gamma User', 'BirthYear': None, 'EnrollmentDate': datetime(2021, 3, 1), 'LastLogin': None, 'DurationOnline': timedelta(seconds=120), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
]
target_df = pd.DataFrame(initial_data_mixed).astype(common_schema)
# Source data with various changes, including nulls and different data types
source_data_mixed = [
# Alpha: Update DisplayName, BirthYear, DurationOnline
{'UserId': 1, 'UserName': 'alpha', 'DisplayName': 'Alpha New', 'BirthYear': 2001, 'EnrollmentDate': datetime(2021, 1, 1), 'LastLogin': datetime(2023, 1, 1, 10, 0, 0), 'DurationOnline': timedelta(hours=2), 'InsertedAt': datetime(2023, 1, 1), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Beta: Update DisplayName from None to a value, LastLogin
{'UserId': 2, 'UserName': 'beta', 'DisplayName': 'Beta New', 'BirthYear': 1995, 'EnrollmentDate': datetime(2021, 2, 1), 'LastLogin': datetime(2023, 1, 6, 12, 0, 0), 'DurationOnline': timedelta(minutes=30), 'InsertedAt': datetime(2023, 1, 5), 'InsertedBy': 'Admin', 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Delta: Insert new row with some nulls
{'UserId': 4, 'UserName': 'delta', 'DisplayName': 'Delta User', 'BirthYear': 1990, 'EnrollmentDate': datetime(2021, 4, 1), 'LastLogin': None, 'DurationOnline': timedelta(hours=3), 'InsertedAt': pd.NaT, 'InsertedBy': None, 'UpdatedAt': pd.NaT, 'UpdatedBy': None},
# Gamma is deleted (not in source)
]
source_df = pd.DataFrame(source_data_mixed).astype(common_schema)
inserted, updated, deleted, final_target_df = detect_dataframe_changes(source_df, target_df, primary_key_col, ignored_cols)
assert len(inserted) == 1 # Delta
assert len(updated) == 2 # Alpha, Beta
assert len(deleted) == 1 # Gamma
# Assert Delta is inserted
delta_in_final = final_target_df[final_target_df[primary_key_col] == 'delta']
assert delta_in_final['DisplayName'].iloc == 'Delta User'
assert pd.isna(delta_in_final['LastLogin'].iloc)
assert pd.notna(delta_in_final['InsertedAt'].iloc)
assert delta_in_final['InsertedBy'].iloc == 'SystemProcess'
# Assert Alpha is updated
alpha_in_final = final_target_df[final_target_df[primary_key_col] == 'alpha']
assert alpha_in_final['DisplayName'].iloc == 'Alpha New'
assert alpha_in_final['BirthYear'].iloc == 2001
assert alpha_in_final['DurationOnline'].iloc == timedelta(hours=2)
assert pd.notna(alpha_in_final['UpdatedAt'].iloc)
assert alpha_in_final['UpdatedBy'].iloc == 'SystemProcess'
# Assert Beta is updated
beta_in_final = final_target_df[final_target_df[primary_key_col] == 'beta']
assert beta_in_final['DisplayName'].iloc == 'Beta New'
assert beta_in_final['LastLogin'].iloc == datetime(2023, 1, 6, 12, 0, 0)
assert pd.notna(beta_in_final['UpdatedAt'].iloc)
assert beta_in_final['UpdatedBy'].iloc == 'SystemProcess'
# Assert Gamma is deleted
assert 'gamma' not in final_target_df[primary_key_col].values
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment