Last active
          September 30, 2025 13:47 
        
      - 
      
 - 
        
Save adgedenkers/a857a29cab51cd69913e6ff3e015698b to your computer and use it in GitHub Desktop.  
    Data File Importer That Checks Field Names
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | import pandas as pd | |
| import pyodbc | |
| class DataImporter: | |
| """ | |
| This Python class takes in a pandas DataFrame from a CSV file and a SQL Server connection string, as | |
| well as a name for the SQL table where the data will be imported. The class compares the columns of | |
| the DataFrame to those in the SQL table and ensures that at least 50% of the fields match. If there | |
| are missing fields, the class adds them to the SQL table using the same naming convention as existing | |
| fields. All new fields are created as varchar fields by default. Once the SQL table has all the fields | |
| from the DataFrame, the class inserts the data into the [stub]_import table. This class can handle | |
| multiple data imports per day. | |
| """ | |
| def __init__(self, data_file_path, sql_conn_str, sql_table_name): | |
| # Read data from CSV file into Pandas DataFrame | |
| self.df = pd.read_csv(data_file_path) | |
| # Connect to SQL Server database using provided connection string | |
| self.sql_conn = pyodbc.connect(sql_conn_str) | |
| # Set the name of the SQL table where the data will be imported | |
| self.sql_table_name = sql_table_name | |
| def import_data(self): | |
| # Get existing columns from SQL table | |
| cursor = self.sql_conn.cursor() | |
| cursor.execute(f"SELECT TOP 0 * FROM {self.sql_table_name}") | |
| # Get column names from SQL table and store them in a list | |
| sql_columns = [column[0] for column in cursor.description] | |
| # Compare data import columns with SQL table columns | |
| common_columns = set(self.df.columns) & set(sql_columns) | |
| # Raise an error if less than 50% of the fields match between the data import and SQL table | |
| if len(common_columns) < len(self.df.columns) / 2: | |
| raise ValueError("At least 50% of fields must match between data import and SQL table.") | |
| # Add missing columns to SQL table | |
| missing_columns = set(self.df.columns) - set(sql_columns) | |
| for col in missing_columns: | |
| # Generate a new column name for the missing column using the same naming convention as existing columns | |
| sql_col_name = self.generate_column_name(col, sql_columns) | |
| # Add the missing column to the SQL table with the new column name and a default data type of VARCHAR(255) | |
| cursor.execute(f"ALTER TABLE {self.sql_table_name} ADD {sql_col_name} VARCHAR(255)") | |
| # Add the new column name to the list of SQL table columns | |
| sql_columns.append(sql_col_name) | |
| # Insert data into SQL table | |
| # Create a SQL query that inserts data into the SQL table using the column names from the SQL table and the data from the data import DataFrame | |
| query = f"INSERT INTO {self.sql_table_name} ({','.join(sql_columns)}) VALUES ({','.join(['?']*len(sql_columns))})" | |
| # Convert the data import DataFrame to a list of tuples to be used as input to the SQL query | |
| data = [tuple(row) for row in self.df.to_records(index=False)] | |
| # Execute the SQL query to insert the data into the SQL table | |
| cursor.executemany(query, data) | |
| # Commit the changes to the SQL Server database | |
| self.sql_conn.commit() | |
| def generate_column_name(self, col_name, existing_cols): | |
| # Get the first part of the SQL table name, which is the 'stub' | |
| stub = self.sql_table_name.split("_")[0] | |
| # Split the 'stub' into separate words, sort them alphabetically, and join them with underscores | |
| stub_parts = stub.split("_") | |
| stub_parts.sort() | |
| # Generate a new column name using the sorted 'stub', the name of the missing column, and replacing spaces with underscores | |
| return f"{stub}_{col_name.replace(' ', '_')}" | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment