Last active
December 9, 2025 14:39
-
-
Save mcchran/bd2ba7eea7f62b99b797be1d040b0ced to your computer and use it in GitHub Desktop.
Pandas upsert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| from functools import wraps | |
| from typing import Union, List, Callable, Any | |
| def upsert_df(target_df: pd.DataFrame, key_columns: Union[str, List[str]]): | |
| """ | |
| Decorator that performs an upsert operation on a target DataFrame. | |
| Args: | |
| target_df: The DataFrame to upsert into | |
| key_columns: Column name(s) to use as the key for matching rows | |
| Returns: | |
| Decorator function that will upsert the result of the decorated function | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| @wraps(func) | |
| def wrapper(*args, **kwargs) -> pd.DataFrame: | |
| # Get the new rows from the decorated function | |
| new_rows = func(*args, **kwargs) | |
| # Ensure new_rows is a DataFrame | |
| if not isinstance(new_rows, pd.DataFrame): | |
| raise ValueError("Decorated function must return a pandas DataFrame") | |
| # Ensure key_columns exist in both DataFrames | |
| key_cols = [key_columns] if isinstance(key_columns, str) else key_columns | |
| missing_cols_target = set(key_cols) - set(target_df.columns) | |
| missing_cols_new = set(key_cols) - set(new_rows.columns) | |
| if missing_cols_target: | |
| raise ValueError(f"Key columns {missing_cols_target} not found in target DataFrame") | |
| if missing_cols_new: | |
| raise ValueError(f"Key columns {missing_cols_new} not found in new rows DataFrame") | |
| # Perform the upsert | |
| if target_df.empty: | |
| # If target is empty, just return new rows | |
| result_df = new_rows.copy() | |
| else: | |
| # Remove existing rows that match the key columns | |
| mask = ~target_df.set_index(key_cols).index.isin( | |
| new_rows.set_index(key_cols).index | |
| ) | |
| result_df = pd.concat([ | |
| target_df[mask], # Keep non-matching rows from target | |
| new_rows # Add all new rows (updates + inserts) | |
| ], ignore_index=True) | |
| # Update the target DataFrame in place | |
| target_df.drop(target_df.index, inplace=True) | |
| target_df.reset_index(drop=True, inplace=True) | |
| for col in result_df.columns: | |
| target_df[col] = result_df[col].values | |
| return result_df | |
| return wrapper | |
| return decorator | |
| # Example usage: | |
| if __name__ == "__main__": | |
| # Create a sample target DataFrame | |
| users_df = pd.DataFrame({ | |
| 'id': [1, 2, 3], | |
| 'name': ['Alice', 'Bob', 'Charlie'], | |
| 'age': [25, 30, 35], | |
| 'city': ['NY', 'LA', 'Chicago'] | |
| }) | |
| print("Original DataFrame:") | |
| print(users_df) | |
| print() | |
| # Use the decorator | |
| @upsert_df(users_df, key_columns='id') | |
| def get_user_updates(): | |
| # This function returns new/updated user data | |
| return pd.DataFrame({ | |
| 'id': [2, 4], # Update user 2, insert user 4 | |
| 'name': ['Bob Updated', 'Diana'], | |
| 'age': [31, 28], | |
| 'city': ['SF', 'Miami'] | |
| }) | |
| # Execute the decorated function | |
| result = get_user_updates() | |
| print("After upsert:") | |
| print(users_df) # The original DataFrame is updated in place | |
| print() | |
| print("Returned result:") | |
| print(result) | |
| # Example with multiple key columns | |
| orders_df = pd.DataFrame({ | |
| 'customer_id': [1, 1, 2], | |
| 'product_id': [101, 102, 101], | |
| 'quantity': [2, 1, 3], | |
| 'price': [10.0, 15.0, 10.0] | |
| }) | |
| print("\n" + "="*50) | |
| print("Multi-key example:") | |
| print("Original orders:") | |
| print(orders_df) | |
| @upsert_df(orders_df, key_columns=['customer_id', 'product_id']) | |
| def update_orders(): | |
| return pd.DataFrame({ | |
| 'customer_id': [1, 3], # Update existing (1,101), insert new (3,103) | |
| 'product_id': [101, 103], | |
| 'quantity': [5, 2], # Updated quantity for customer 1, product 101 | |
| 'price': [12.0, 20.0] # Updated price | |
| }) | |
| result2 = update_orders() | |
| print("\nAfter upsert:") | |
| print(orders_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment