Skip to content

Instantly share code, notes, and snippets.

@Jspascal
Created February 15, 2024 10:18
Show Gist options
  • Save Jspascal/16664c5ff4789ab3578f5bd1253a026c to your computer and use it in GitHub Desktop.
Save Jspascal/16664c5ff4789ab3578f5bd1253a026c to your computer and use it in GitHub Desktop.
Data migration with python
import pandas as pd
import sqlalchemy
import tabula
import pymongo
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
# Step 2: Define the paths or URLs of the data sources
csv_path = "data.csv"
sql_path = "sqlite:///data.db"
excel_path = "data.xlsx"
pdf_path = "data.pdf"
text_path = "data.txt"
# Step 3: Define the connection string and the database name for MongoDB
mongo_uri = "mongodb://localhost:27017"
mongo_db = "product_store"
# Step 4: Define the functions for reading and processing the data from each source
def read_csv():
# Read data from a CSV file
df = pd.read_csv(csv_path)
# Perform any data processing operations, such as cleaning, transforming, or aggregating
# For example, you can rename the columns, drop the duplicates, or calculate the average price
df = df.rename(
columns={
"name": "product_name",
"category": "product_category",
"price": "product_price",
}
)
df = df.drop_duplicates()
df["average_price"] = df.groupby("product_category")["product_price"].transform(
"mean"
)
# Return the processed DataFrame
return df
def read_sql():
# Create a database connection object
engine = sqlalchemy.create_engine(sql_path)
# Read data from an SQL database
df = pd.read_sql("SELECT * FROM customers", engine)
# Perform any data processing operations, such as cleaning, transforming, or aggregating
# For example, you can split the name column into first name and last name, or calculate the total orders
df[["first_name", "last_name"]] = df["name"].str.split(" ", expand=True)
df["total_orders"] = df.groupby("customer_id")["order_id"].count()
# Return the processed DataFrame
return df
def read_excel():
# Read data from an Excel file
df = pd.read_excel(excel_path)
# Perform any data processing operations, such as cleaning, transforming, or aggregating
# For example, you can convert the date column to datetime format, or filter the orders by status
df["date"] = pd.to_datetime(df["date"])
df = df[df["status"] == "completed"]
# Return the processed DataFrame
return df
def read_pdf():
# Read data from a PDF file
dfs = tabula.read_pdf(pdf_path)
# Perform any data processing operations, such as cleaning, transforming, or aggregating
# For example, you can concatenate the DataFrames, or calculate the stock value
df = pd.concat(dfs)
df["stock_value"] = df["quantity"] * df["unit_price"]
# Return the processed DataFrame
return df
def read_text():
# Read data from a text file
with open(text_path) as f:
# Perform any data processing operations, such as cleaning, transforming, or aggregating
# For example, you can create a list of dictionaries, or extract the keywords
data = []
for line in f:
# Split the line by comma and strip the whitespace
line = line.strip().split(",")
# Create a dictionary with the keys and values
record = {
"product_id": line[0],
"product_name": line[1],
"product_description": line[2],
}
# Append the dictionary to the data list
data.append(record)
# Convert the data list to a DataFrame
df = pd.DataFrame(data)
# Extract the keywords from the product description using a simple rule
df["keywords"] = df["product_description"].str.extract("(\w+ \w+ \w+)")
# Return the processed DataFrame
return df
# Step 5: Define the function for writing the data to MongoDB
def write_mongo(df, collection):
# Create a MongoDB client object
client = pymongo.MongoClient(mongo_uri)
# Get the MongoDB database object
db = client[mongo_db]
# Get the MongoDB collection object
col = db[collection]
# Convert the DataFrame to a list of dictionaries
data = df.to_dict("records")
# Insert the data into the collection
try:
col.insert_many(data)
print(
f"Successfully inserted {len(data)} records into {collection} collection."
)
except Exception as e:
print(f"Failed to insert records into {collection} collection. Error: {e}")
# Close the MongoDB client object
client.close()
# Step 6: Define the functions for orchestrating and automating the data collection and processing tasks
def collect_data():
# Create a DAG object with the parameters for the data collection workflow
dag = DAG(
dag_id="collect_data",
description="A workflow to collect data from various sources",
schedule_interval="@daily",
start_date=airflow.utils.dates.days_ago(1),
end_date=airflow.utils.dates.days_ago(0),
catchup=False,
)
# Create a PythonOperator object for each data source
csv_task = PythonOperator(task_id="csv_task", python_callable=read_csv, dag=dag)
sql_task = PythonOperator(task_id="sql_task", python_callable=read_sql, dag=dag)
excel_task = PythonOperator(
task_id="excel_task", python_callable=read_excel, dag=dag
)
pdf_task = PythonOperator(task_id="pdf_task", python_callable=read_pdf, dag=dag)
text_task = PythonOperator(task_id="text_task", python_callable=read_text, dag=dag)
# Define the dependencies and order of execution of the tasks
csv_task >> sql_task >> excel_task >> pdf_task >> text_task
# Return the DAG object
return dag
def process_data():
# Create a DAG object with the parameters for the data processing workflow
dag = DAG(
dag_id="process_data",
description="A workflow to process data and store it in MongoDB",
schedule_interval="@daily",
start_date=airflow.utils.dates.days_ago(1),
end_date=airflow.utils.dates.days_ago(0),
catchup=False,
)
# Create a PythonOperator object for each data source and collection
products_task = PythonOperator(
task_id="products_task",
python_callable=lambda: write_mongo(read_csv(), "products"),
dag=dag,
)
customers_task = PythonOperator(
task_id="customers_task",
python_callable=lambda: write_mongo(read_sql(), "customers"),
dag=dag,
)
orders_task = PythonOperator(
task_id="orders_task",
python_callable=lambda: write_mongo(read_excel(), "orders"),
dag=dag,
)
inventory_task = PythonOperator(
task_id="inventory_task",
python_callable=lambda: write_mongo(read_pdf(), "inventory"),
dag=dag,
)
keywords_task = PythonOperator(
task_id="keywords_task",
python_callable=lambda: write_mongo(read_text(), "keywords"),
dag=dag,
)
# Define the dependencies and order of execution of the tasks
products_task >> customers_task >> orders_task >> inventory_task >> keywords_task
# Return the DAG object
return dag
# Step 7: Run the collect_data() and process_data() functions to start the data collection and processing workflows
collect_data()
process_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment