Created
March 2, 2019 22:43
-
-
Save wesleyit/8b2fc032c038949d17b04538a4d4c7c6 to your computer and use it in GitHub Desktop.
Insert data from a CSV file to MySQL DB with multithreads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import pandas as pd | |
import numpy as np | |
from pandas.io import sql | |
from sqlalchemy import create_engine | |
from multiprocessing import Pool as pool | |
from threading import Lock as lock | |
# Get the data from CSV into a DataFrame | |
df = pd.read_csv('../bases/clientes.csv', sep=';') | |
df = df.set_index('cpf') | |
# Create batches slicing the df in n parts | |
def get_batches(iterable, n=1): | |
l = len(iterable) | |
for ndx in range(0, l, n): | |
yield iterable[ndx:min(ndx + n, l)] | |
# DB Connection. In prod, get them from $ENV. | |
db_name = 'hospital' | |
tb_name = 'clientes' | |
db_host = '127.0.0.1' | |
db_user = 'root' | |
db_pass = 'password' | |
# Ensure the DB exists | |
engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/mysql') | |
with engine.begin() as conn: | |
q = conn.execute(f'CREATE DATABASE IF NOT EXISTS {db_name}') | |
# Use the new DB as default | |
engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/{db_name}') | |
# Feed the batches generator and get a list. | |
# Also implements a locker for a thread-safe counter. | |
batches = [ b for b in get_batches(df, 100_000) ] | |
n_batches = len(batches) | |
counter = 0 | |
counter_lock = lock() | |
# Write progress messages using the locker | |
def one_more(): | |
global counter | |
counter += 1 | |
msg = f'\rFinished {counter} of {n_batches} batches ({round(counter / n_batches * 100, 2)}%).' | |
sys.stdout.write(msg) | |
sys.stdout.flush() | |
# Insert the data and print the progress. | |
def insert(df): | |
with engine.begin() as conn: | |
df.to_sql(tb_name, conn, if_exists='append', chunksize=10_000) | |
with counter_lock: | |
one_more() | |
# Define the threads and run. | |
THREADS = 4 | |
p = pool(THREADS) | |
p.map(insert, batches) | |
print('\nAll Done!') | |
# A little test. | |
with engine.begin() as conn: | |
df = pd.read_sql(f'select * from {tb_name} limit 10', conn, index_col='cpf') | |
df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment