Skip to content

Instantly share code, notes, and snippets.

@wesleyit
Created March 2, 2019 22:43
Show Gist options
  • Save wesleyit/8b2fc032c038949d17b04538a4d4c7c6 to your computer and use it in GitHub Desktop.
Save wesleyit/8b2fc032c038949d17b04538a4d4c7c6 to your computer and use it in GitHub Desktop.
Insert data from a CSV file to MySQL DB with multithreads
import sys
import pandas as pd
import numpy as np
from pandas.io import sql
from sqlalchemy import create_engine
from multiprocessing import Pool as pool
from threading import Lock as lock
# Get the data from CSV into a DataFrame
df = pd.read_csv('../bases/clientes.csv', sep=';')
df = df.set_index('cpf')
# Create batches slicing the df in n parts
def get_batches(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
# DB Connection. In prod, get them from $ENV.
db_name = 'hospital'
tb_name = 'clientes'
db_host = '127.0.0.1'
db_user = 'root'
db_pass = 'password'
# Ensure the DB exists
engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/mysql')
with engine.begin() as conn:
q = conn.execute(f'CREATE DATABASE IF NOT EXISTS {db_name}')
# Use the new DB as default
engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/{db_name}')
# Feed the batches generator and get a list.
# Also implements a locker for a thread-safe counter.
batches = [ b for b in get_batches(df, 100_000) ]
n_batches = len(batches)
counter = 0
counter_lock = lock()
# Write progress messages using the locker
def one_more():
global counter
counter += 1
msg = f'\rFinished {counter} of {n_batches} batches ({round(counter / n_batches * 100, 2)}%).'
sys.stdout.write(msg)
sys.stdout.flush()
# Insert the data and print the progress.
def insert(df):
with engine.begin() as conn:
df.to_sql(tb_name, conn, if_exists='append', chunksize=10_000)
with counter_lock:
one_more()
# Define the threads and run.
THREADS = 4
p = pool(THREADS)
p.map(insert, batches)
print('\nAll Done!')
# A little test.
with engine.begin() as conn:
df = pd.read_sql(f'select * from {tb_name} limit 10', conn, index_col='cpf')
df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment