Skip to content

Instantly share code, notes, and snippets.

@craigderington
Created January 30, 2019 20:45
Show Gist options
  • Save craigderington/21fe43b72b91687fac7177f8635be934 to your computer and use it in GitHub Desktop.
Save craigderington/21fe43b72b91687fac7177f8635be934 to your computer and use it in GitHub Desktop.
GeoNames ZipCode data Import to MySQL
#!.env/bin/python
# -*- code: utf-8 -*-
import sys
import csv
import config
import MySQLdb
import logging
log_format = '%(asctime)s %(levelname)s:%(message)s'
logging.basicConfig(
format=log_format,
filename=__name__ + '.zipcodes',
level=logging.INFO
)
class DatabaseConnection(object):
"""
Connection class for MySQL
:return db conn
"""
config = (config.db_host, config.db_user, config.db_pass, config.db)
def __init__(self):
self.conn = MySQLdb.connect(*self.config)
self.cur = self.conn.cursor()
super(DatabaseConnection, self).__init__()
def __enter__(self):
return DatabaseConnection()
def query(self, sql, params):
self.cur.execute(sql, params)
self.conn.commit()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()
def main():
"""
Read data file and add zip codes to MySQL database
:return: none
"""
counter = 0
sql = '''insert into zipcodes(country_code, postal_code, city_name, state, state_abbr, county, county_code, \
community, community_code, latitude, longitude, accuracy) \
values(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);'''
try:
with open('data/zip_codes/US.txt', 'r') as f1:
reader = csv.reader(f1, delimiter='\t')
next(reader)
for row in reader:
# set the param values to their table columns
params = (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7],
row[8], row[9], row[10], row[11])
try:
with DatabaseConnection() as db:
db.query(sql, params)
counter += 1
logging.info('Zip code: {} record added successfully'.format(str(row[1])))
print('Zip code: {} record added successfully'.format(str(row[1])))
except MySQLdb.Error as db_err:
logging.critical('Database error: {}'.format(str(db_err)))
print('Database error: {}'.format(str(db_err)))
except IOError as err:
print('Can not read the source data file: {}'.format(str(err)))
sys.exit(1)
return counter
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment