Created
February 15, 2024 10:18
-
-
Save Jspascal/16664c5ff4789ab3578f5bd1253a026c to your computer and use it in GitHub Desktop.
Data migration with python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import sqlalchemy | |
| import tabula | |
| import pymongo | |
| import airflow | |
| from airflow import DAG | |
| from airflow.operators.python import PythonOperator | |
| # Step 2: Define the paths or URLs of the data sources | |
| csv_path = "data.csv" | |
| sql_path = "sqlite:///data.db" | |
| excel_path = "data.xlsx" | |
| pdf_path = "data.pdf" | |
| text_path = "data.txt" | |
| # Step 3: Define the connection string and the database name for MongoDB | |
| mongo_uri = "mongodb://localhost:27017" | |
| mongo_db = "product_store" | |
| # Step 4: Define the functions for reading and processing the data from each source | |
| def read_csv(): | |
| # Read data from a CSV file | |
| df = pd.read_csv(csv_path) | |
| # Perform any data processing operations, such as cleaning, transforming, or aggregating | |
| # For example, you can rename the columns, drop the duplicates, or calculate the average price | |
| df = df.rename( | |
| columns={ | |
| "name": "product_name", | |
| "category": "product_category", | |
| "price": "product_price", | |
| } | |
| ) | |
| df = df.drop_duplicates() | |
| df["average_price"] = df.groupby("product_category")["product_price"].transform( | |
| "mean" | |
| ) | |
| # Return the processed DataFrame | |
| return df | |
| def read_sql(): | |
| # Create a database connection object | |
| engine = sqlalchemy.create_engine(sql_path) | |
| # Read data from an SQL database | |
| df = pd.read_sql("SELECT * FROM customers", engine) | |
| # Perform any data processing operations, such as cleaning, transforming, or aggregating | |
| # For example, you can split the name column into first name and last name, or calculate the total orders | |
| df[["first_name", "last_name"]] = df["name"].str.split(" ", expand=True) | |
| df["total_orders"] = df.groupby("customer_id")["order_id"].count() | |
| # Return the processed DataFrame | |
| return df | |
| def read_excel(): | |
| # Read data from an Excel file | |
| df = pd.read_excel(excel_path) | |
| # Perform any data processing operations, such as cleaning, transforming, or aggregating | |
| # For example, you can convert the date column to datetime format, or filter the orders by status | |
| df["date"] = pd.to_datetime(df["date"]) | |
| df = df[df["status"] == "completed"] | |
| # Return the processed DataFrame | |
| return df | |
| def read_pdf(): | |
| # Read data from a PDF file | |
| dfs = tabula.read_pdf(pdf_path) | |
| # Perform any data processing operations, such as cleaning, transforming, or aggregating | |
| # For example, you can concatenate the DataFrames, or calculate the stock value | |
| df = pd.concat(dfs) | |
| df["stock_value"] = df["quantity"] * df["unit_price"] | |
| # Return the processed DataFrame | |
| return df | |
| def read_text(): | |
| # Read data from a text file | |
| with open(text_path) as f: | |
| # Perform any data processing operations, such as cleaning, transforming, or aggregating | |
| # For example, you can create a list of dictionaries, or extract the keywords | |
| data = [] | |
| for line in f: | |
| # Split the line by comma and strip the whitespace | |
| line = line.strip().split(",") | |
| # Create a dictionary with the keys and values | |
| record = { | |
| "product_id": line[0], | |
| "product_name": line[1], | |
| "product_description": line[2], | |
| } | |
| # Append the dictionary to the data list | |
| data.append(record) | |
| # Convert the data list to a DataFrame | |
| df = pd.DataFrame(data) | |
| # Extract the keywords from the product description using a simple rule | |
| df["keywords"] = df["product_description"].str.extract("(\w+ \w+ \w+)") | |
| # Return the processed DataFrame | |
| return df | |
| # Step 5: Define the function for writing the data to MongoDB | |
| def write_mongo(df, collection): | |
| # Create a MongoDB client object | |
| client = pymongo.MongoClient(mongo_uri) | |
| # Get the MongoDB database object | |
| db = client[mongo_db] | |
| # Get the MongoDB collection object | |
| col = db[collection] | |
| # Convert the DataFrame to a list of dictionaries | |
| data = df.to_dict("records") | |
| # Insert the data into the collection | |
| try: | |
| col.insert_many(data) | |
| print( | |
| f"Successfully inserted {len(data)} records into {collection} collection." | |
| ) | |
| except Exception as e: | |
| print(f"Failed to insert records into {collection} collection. Error: {e}") | |
| # Close the MongoDB client object | |
| client.close() | |
| # Step 6: Define the functions for orchestrating and automating the data collection and processing tasks | |
| def collect_data(): | |
| # Create a DAG object with the parameters for the data collection workflow | |
| dag = DAG( | |
| dag_id="collect_data", | |
| description="A workflow to collect data from various sources", | |
| schedule_interval="@daily", | |
| start_date=airflow.utils.dates.days_ago(1), | |
| end_date=airflow.utils.dates.days_ago(0), | |
| catchup=False, | |
| ) | |
| # Create a PythonOperator object for each data source | |
| csv_task = PythonOperator(task_id="csv_task", python_callable=read_csv, dag=dag) | |
| sql_task = PythonOperator(task_id="sql_task", python_callable=read_sql, dag=dag) | |
| excel_task = PythonOperator( | |
| task_id="excel_task", python_callable=read_excel, dag=dag | |
| ) | |
| pdf_task = PythonOperator(task_id="pdf_task", python_callable=read_pdf, dag=dag) | |
| text_task = PythonOperator(task_id="text_task", python_callable=read_text, dag=dag) | |
| # Define the dependencies and order of execution of the tasks | |
| csv_task >> sql_task >> excel_task >> pdf_task >> text_task | |
| # Return the DAG object | |
| return dag | |
| def process_data(): | |
| # Create a DAG object with the parameters for the data processing workflow | |
| dag = DAG( | |
| dag_id="process_data", | |
| description="A workflow to process data and store it in MongoDB", | |
| schedule_interval="@daily", | |
| start_date=airflow.utils.dates.days_ago(1), | |
| end_date=airflow.utils.dates.days_ago(0), | |
| catchup=False, | |
| ) | |
| # Create a PythonOperator object for each data source and collection | |
| products_task = PythonOperator( | |
| task_id="products_task", | |
| python_callable=lambda: write_mongo(read_csv(), "products"), | |
| dag=dag, | |
| ) | |
| customers_task = PythonOperator( | |
| task_id="customers_task", | |
| python_callable=lambda: write_mongo(read_sql(), "customers"), | |
| dag=dag, | |
| ) | |
| orders_task = PythonOperator( | |
| task_id="orders_task", | |
| python_callable=lambda: write_mongo(read_excel(), "orders"), | |
| dag=dag, | |
| ) | |
| inventory_task = PythonOperator( | |
| task_id="inventory_task", | |
| python_callable=lambda: write_mongo(read_pdf(), "inventory"), | |
| dag=dag, | |
| ) | |
| keywords_task = PythonOperator( | |
| task_id="keywords_task", | |
| python_callable=lambda: write_mongo(read_text(), "keywords"), | |
| dag=dag, | |
| ) | |
| # Define the dependencies and order of execution of the tasks | |
| products_task >> customers_task >> orders_task >> inventory_task >> keywords_task | |
| # Return the DAG object | |
| return dag | |
| # Step 7: Run the collect_data() and process_data() functions to start the data collection and processing workflows | |
| collect_data() | |
| process_data() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment