Skip to content

Instantly share code, notes, and snippets.

@adgedenkers
Last active September 30, 2025 13:47
Show Gist options
  • Save adgedenkers/a857a29cab51cd69913e6ff3e015698b to your computer and use it in GitHub Desktop.
Save adgedenkers/a857a29cab51cd69913e6ff3e015698b to your computer and use it in GitHub Desktop.
Data File Importer That Checks Field Names
import pandas as pd
import pyodbc
class DataImporter:
"""
This Python class takes in a pandas DataFrame from a CSV file and a SQL Server connection string, as
well as a name for the SQL table where the data will be imported. The class compares the columns of
the DataFrame to those in the SQL table and ensures that at least 50% of the fields match. If there
are missing fields, the class adds them to the SQL table using the same naming convention as existing
fields. All new fields are created as varchar fields by default. Once the SQL table has all the fields
from the DataFrame, the class inserts the data into the [stub]_import table. This class can handle
multiple data imports per day.
"""
def __init__(self, data_file_path, sql_conn_str, sql_table_name):
# Read data from CSV file into Pandas DataFrame
self.df = pd.read_csv(data_file_path)
# Connect to SQL Server database using provided connection string
self.sql_conn = pyodbc.connect(sql_conn_str)
# Set the name of the SQL table where the data will be imported
self.sql_table_name = sql_table_name
def import_data(self):
# Get existing columns from SQL table
cursor = self.sql_conn.cursor()
cursor.execute(f"SELECT TOP 0 * FROM {self.sql_table_name}")
# Get column names from SQL table and store them in a list
sql_columns = [column[0] for column in cursor.description]
# Compare data import columns with SQL table columns
common_columns = set(self.df.columns) & set(sql_columns)
# Raise an error if less than 50% of the fields match between the data import and SQL table
if len(common_columns) < len(self.df.columns) / 2:
raise ValueError("At least 50% of fields must match between data import and SQL table.")
# Add missing columns to SQL table
missing_columns = set(self.df.columns) - set(sql_columns)
for col in missing_columns:
# Generate a new column name for the missing column using the same naming convention as existing columns
sql_col_name = self.generate_column_name(col, sql_columns)
# Add the missing column to the SQL table with the new column name and a default data type of VARCHAR(255)
cursor.execute(f"ALTER TABLE {self.sql_table_name} ADD {sql_col_name} VARCHAR(255)")
# Add the new column name to the list of SQL table columns
sql_columns.append(sql_col_name)
# Insert data into SQL table
# Create a SQL query that inserts data into the SQL table using the column names from the SQL table and the data from the data import DataFrame
query = f"INSERT INTO {self.sql_table_name} ({','.join(sql_columns)}) VALUES ({','.join(['?']*len(sql_columns))})"
# Convert the data import DataFrame to a list of tuples to be used as input to the SQL query
data = [tuple(row) for row in self.df.to_records(index=False)]
# Execute the SQL query to insert the data into the SQL table
cursor.executemany(query, data)
# Commit the changes to the SQL Server database
self.sql_conn.commit()
def generate_column_name(self, col_name, existing_cols):
# Get the first part of the SQL table name, which is the 'stub'
stub = self.sql_table_name.split("_")[0]
# Split the 'stub' into separate words, sort them alphabetically, and join them with underscores
stub_parts = stub.split("_")
stub_parts.sort()
# Generate a new column name using the sorted 'stub', the name of the missing column, and replacing spaces with underscores
return f"{stub}_{col_name.replace(' ', '_')}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment