Skip to content

Instantly share code, notes, and snippets.

@dazfuller
Last active October 26, 2024 12:09
Show Gist options
  • Save dazfuller/f58a17401e5bab70e767dbfc083e7c23 to your computer and use it in GitHub Desktop.
Save dazfuller/f58a17401e5bab70e767dbfc083e7c23 to your computer and use it in GitHub Desktop.
Multi-process loading of data into MySQL from Python
import csv
import logging
import multiprocessing as mp
from contextlib import contextmanager
from pathlib import Path
from typing import Dict
from mysql.connector import MySQLConnection
from mysql.connector.cursor import MySQLCursor
import mysql.connector
logging.basicConfig(level=logging.DEBUG)
config = {
'host': '<server>.mysql.database.azure.com',
'username': 'user',
'password': 'password',
'database': 'db_name',
'autocommit': False
}
@contextmanager
def mysql_connect(connection_config: Dict) -> MySQLConnection:
conn: MySQLConnection = mysql.connector.connect(**connection_config)
logging.info('Connected to database \'%s\' at \'%s\'', connection_config['database'], connection_config['host'])
yield conn
logging.info('Closing connection to server \'%s\'', connection_config['host'])
conn.close()
@contextmanager
def mysql_cursor(conn: MySQLConnection) -> MySQLCursor:
cur: MySQLCursor = conn.cursor()
yield cur
logging.info('Closing cursor')
cur.close()
insert_sql = """
INSERT INTO parking_citations (
TicketNumber
, IssueDate
, IssueTime
, MeterId
, MarkedTime
, RPStatePlate
, PlateExpiryDate
, VIN
, Make
, BodyStyle
, Color
, Location
, Route
, Agency
, ViolationCode
, ViolationDescription
, FineAmount
, Latitude
, Longitude
) VALUES (
%s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
)
"""
def worker(rows):
with mysql_connect(config) as conn:
with mysql_cursor(conn) as cursor:
logging.info('Processing batch')
cursor.executemany(insert_sql, rows)
conn.commit()
def get_chunks(batch_size: int, source_file: Path):
with open(source_file, 'r') as f:
csv_reader = csv.reader(f, delimiter=',')
next(csv_reader, None)
batch_data = []
batch_count = 0
for row in csv_reader:
batch_data.append([v if v is not '' else None for v in row])
batch_count += 1
if batch_count % batch_size == 0:
yield batch_data
batch_data = []
if batch_data:
yield batch_data
def main():
batch_size = 5000
source_file = Path('data/parking-citations.csv')
chunk_gen = get_chunks(batch_size, source_file)
pool = mp.Pool(mp.cpu_count()-1)
results = pool.imap(worker, chunk_gen)
pool.close()
pool.join()
if __name__ == '__main__':
main()
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
mysql-connector-python = "*"
[requires]
python_version = "3.6"
@dazfuller
Copy link
Author

Data loaded is the Los Angeles Parking Citations data available from Kaggle

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment