Last active
October 26, 2024 12:09
-
-
Save dazfuller/f58a17401e5bab70e767dbfc083e7c23 to your computer and use it in GitHub Desktop.
Multi-process loading of data into MySQL from Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import logging | |
import multiprocessing as mp | |
from contextlib import contextmanager | |
from pathlib import Path | |
from typing import Dict | |
from mysql.connector import MySQLConnection | |
from mysql.connector.cursor import MySQLCursor | |
import mysql.connector | |
logging.basicConfig(level=logging.DEBUG) | |
config = { | |
'host': '<server>.mysql.database.azure.com', | |
'username': 'user', | |
'password': 'password', | |
'database': 'db_name', | |
'autocommit': False | |
} | |
@contextmanager | |
def mysql_connect(connection_config: Dict) -> MySQLConnection: | |
conn: MySQLConnection = mysql.connector.connect(**connection_config) | |
logging.info('Connected to database \'%s\' at \'%s\'', connection_config['database'], connection_config['host']) | |
yield conn | |
logging.info('Closing connection to server \'%s\'', connection_config['host']) | |
conn.close() | |
@contextmanager | |
def mysql_cursor(conn: MySQLConnection) -> MySQLCursor: | |
cur: MySQLCursor = conn.cursor() | |
yield cur | |
logging.info('Closing cursor') | |
cur.close() | |
insert_sql = """ | |
INSERT INTO parking_citations ( | |
TicketNumber | |
, IssueDate | |
, IssueTime | |
, MeterId | |
, MarkedTime | |
, RPStatePlate | |
, PlateExpiryDate | |
, VIN | |
, Make | |
, BodyStyle | |
, Color | |
, Location | |
, Route | |
, Agency | |
, ViolationCode | |
, ViolationDescription | |
, FineAmount | |
, Latitude | |
, Longitude | |
) VALUES ( | |
%s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
, %s | |
) | |
""" | |
def worker(rows): | |
with mysql_connect(config) as conn: | |
with mysql_cursor(conn) as cursor: | |
logging.info('Processing batch') | |
cursor.executemany(insert_sql, rows) | |
conn.commit() | |
def get_chunks(batch_size: int, source_file: Path): | |
with open(source_file, 'r') as f: | |
csv_reader = csv.reader(f, delimiter=',') | |
next(csv_reader, None) | |
batch_data = [] | |
batch_count = 0 | |
for row in csv_reader: | |
batch_data.append([v if v is not '' else None for v in row]) | |
batch_count += 1 | |
if batch_count % batch_size == 0: | |
yield batch_data | |
batch_data = [] | |
if batch_data: | |
yield batch_data | |
def main(): | |
batch_size = 5000 | |
source_file = Path('data/parking-citations.csv') | |
chunk_gen = get_chunks(batch_size, source_file) | |
pool = mp.Pool(mp.cpu_count()-1) | |
results = pool.imap(worker, chunk_gen) | |
pool.close() | |
pool.join() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[[source]] | |
name = "pypi" | |
url = "https://pypi.org/simple" | |
verify_ssl = true | |
[dev-packages] | |
[packages] | |
mysql-connector-python = "*" | |
[requires] | |
python_version = "3.6" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Data loaded is the Los Angeles Parking Citations data available from Kaggle