Skip to content

Instantly share code, notes, and snippets.

@zhiweio
Created January 14, 2019 07:55
Show Gist options
  • Save zhiweio/f4ca849feab00fd57873a5b8577626e7 to your computer and use it in GitHub Desktop.
Save zhiweio/f4ca849feab00fd57873a5b8577626e7 to your computer and use it in GitHub Desktop.
copy excel data to postgres, simple implementation
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from xlrd import xldate_as_datetime
import psycopg2
import xlrd
import sys
DB_CONFIG_TEMPLATE = {
'host': 'localhost',
'port': 5432,
'database': 'test',
'user': 'wangzhiwei',
'password': ''
}
class PgHelper(object):
"""docstring for PGHelper
@param: db_config [dict]: db connection config
"""
def __init__(self, db_config):
super(PgHelper, self).__init__()
self.db_config = db_config
self.conn = None
def connect(self, db_schema=None):
""" Connect to the PostgreSQL database server.
@params: db_schema: db schema name
"""
params = self.db_config
try:
# connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
self.conn = psycopg2.connect(**params)
cur = self.conn.cursor()
# display the PostgreSQL database server version
cur.execute('SELECT version()')
db_version = cur.fetchone()
print(f"PostgreSQL database version:\n\t{db_version}\n")
# set schema
if db_schema:
cur.execute(f"set search_path to {db_schema!r},public;")
# close the communication with the PostgreSQL
cur.close()
# return db connection
return self.conn
except (Exception, psycopg2.DatabaseError) as error:
print(error)
sys.exit(1)
def table_structure(self, conn, table_name):
'''get table structure, return dict
example:
{'column_a': 'integer',
'column_b': 'boolean',
...,
}
'''
with conn:
with conn.cursor() as cur:
sqlcmd = (f"SELECT column_name, data_type "
f"FROM information_schema.columns "
f"WHERE table_name = {table_name!r};")
# (table_name,) passed as tuple
cur.execute(sqlcmd, ('BADGES_SFR',))
structure = cur.fetchall()
cur.close()
if len(structure) > 0:
return dict(structure)
else:
return
class DataTransformation(object):
"""docstring for DataTransformation"""
def __init__(self, raw_data):
super(DataTransformation, self).__init__()
self.raw_data = raw_data
def process(self, headers, t_struct):
records = list(self.raw_data)
for i, x in enumerate(headers):
if t_struct[x] == 'integer' \
or t_struct[x] == 'bigint' \
or t_struct[x] == 'smallint':
self._as_numeric(records, i, int)
elif t_struct[x] == 'double precision' \
or t_struct[x] == 'real' \
or t_struct[x] == 'numeric':
self._as_numeric(records, i, float)
elif t_struct[x] == 'boolean':
self._as_numeric(records, i, bool)
elif t_struct[x].startswith('character') \
or t_struct[x] == 'text':
pass
elif t_struct[x] == 'date':
self._as_date(records, i)
elif t_struct[x].startswith('timestamp'):
self._as_timestamp(records, i)
elif t_struct[x].startswith('time'):
self._as_time(records, i)
return records
@staticmethod
def _as_numeric(records, index, _type):
if _type not in (int, float, bool):
raise TypeError(
'_type must be Python built-in numeric type: int, float, bool')
for x in records:
x[index] = _type(x[index])
'''date string format
TODO:
optimize use RE
'''
@staticmethod
def _as_date(records, index):
for x in records:
ts = x[index]
if ts == '':
continue
elif isinstance(ts, float):
ts = xldate_as_datetime(ts, 0).strftime('%Y-%m-%d')
x[index] = ts
else:
raise ValueError(f'date value: {ts} - must be date format in Excel')
@staticmethod
def _as_time(records, index):
for x in records:
ts = x[index]
if ts == '':
continue
elif isinstance(ts, float):
ts = xldate_as_datetime(ts, 0).strftime('%H:%M:%S')
x[index] = ts
else:
raise ValueError(f'date value: {ts} - must be date format in Excel')
@staticmethod
def _as_timestamp(records, index):
for x in records:
ts = x[index]
if ts == '':
continue
elif isinstance(ts, float):
ts = xldate_as_datetime(ts, 0).isoformat()
x[index] = ts
else:
raise ValueError(f'date value: {ts} - must be date format in Excel')
class Excel2Pg(object):
"""docstring for Excel2Pg"""
def __init__(self, table_name, xls_file):
super(Excel2Pg, self).__init__()
self.table_name = table_name
self.xls_file = xls_file
self.headers = None
def read_xlsx(self, xls_file=None, index=0):
'''read MS Excel and return dicts form of generator
@param: xls_file: name of excel file
@param: index: index of Excel worksheets
'''
xls_file = xls_file if xls_file else self.xls_file
book = xlrd.open_workbook(xls_file)
print("The number of worksheets is {0}".format(book.nsheets))
print("Worksheet name(s): {0}".format(book.sheet_names()))
sheet = book.sheet_by_index(index)
print("{0} rows: {1} columns: {2}\n".format(
sheet.name, sheet.nrows, sheet.ncols))
rows = (sheet.row_values(rx) for rx in range(sheet.nrows))
headers = next(rows)
# data = (dict(zip(headers, r)) for r in rows)
self.headers = headers
return rows
def process_data(self, raw, t_struct):
'''process excel data, format fields' data types
@param: raw [generator]: raw excel data
@param: t_struct [dict]: table structure, name and data_type of columns
'''
if not isinstance(t_struct, dict) or \
not set(self.headers).issubset(set(t_struct.keys())):
raise ValueError(
"table columns must contains all fields of excel file")
pre = DataTransformation(raw)
records = pre.process(self.headers, t_struct)
return records
def copy2pg(self, conn, unique_keys, records):
with conn:
with conn.cursor() as cur:
for rec in records:
values = ''.join([f"{x!r}, " for x in rec]).rstrip(', ')
columns = ', '.join(self.headers)
r = dict(zip(self.headers, rec))
update = ''.join([f"{k}={v!r}, " for k, v in r.items()]).rstrip(', ')
if len(unique_keys) == 0:
sqlcmd = (f"INSERT INTO {self.table_name} ({columns}) "
f"VALUES ({values})")
else:
sqlcmd = (f"INSERT INTO {self.table_name} ({columns}) "
f"VALUES ({values}) "
f"ON CONFLICT ({', '.join(unique_keys)}) "
f"DO UPDATE "
f"SET {update}")
# print(sqlcmd)
cur.execute(sqlcmd)
print(f"successful copy to posgres.")
if __name__ == '__main__':
# db connection configures
db = {
'host': 'localhost',
'port': 5432,
'database': 'test',
'user': 'wangzhiwei',
'password': ''
}
tbool tvchar tint tdate ttime tts
TRUE test 5000 1999-01-08 04:05:06 1999-1-8 04:05
TRUE test 200 1999-01-09 04:05:06 1999-1-9 04:05
FALSE hhh 3000 1999-01-10 05:05:06 1999-1-10 04:05
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from excel2pg import PgHelper, Excel2Pg
# db connection configures
db = {
'host': 'localhost',
'port': 5432,
'database': 'test',
'user': 'wangzhiwei',
'password': ''
}
if __name__ == '__main__':
db_schema = 'szch_v01'
table_name = 'test_data'
xls_file = 'test.xlsx'
conn = None
pg = PgHelper(db)
conn = pg.connect()
t_struct = pg.table_structure(conn, table_name)
# print(t_struct)
e2pg = Excel2Pg(table_name, xls_file)
raw_data = e2pg.read_xlsx()
records = e2pg.process_data(raw_data, t_struct)
e2pg.copy2pg(conn, ('tint', 'tts'), records)
# close connection
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment