Created
January 14, 2019 07:55
-
-
Save zhiweio/f4ca849feab00fd57873a5b8577626e7 to your computer and use it in GitHub Desktop.
copy excel data to postgres, simple implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
from xlrd import xldate_as_datetime | |
import psycopg2 | |
import xlrd | |
import sys | |
DB_CONFIG_TEMPLATE = { | |
'host': 'localhost', | |
'port': 5432, | |
'database': 'test', | |
'user': 'wangzhiwei', | |
'password': '' | |
} | |
class PgHelper(object): | |
"""docstring for PGHelper | |
@param: db_config [dict]: db connection config | |
""" | |
def __init__(self, db_config): | |
super(PgHelper, self).__init__() | |
self.db_config = db_config | |
self.conn = None | |
def connect(self, db_schema=None): | |
""" Connect to the PostgreSQL database server. | |
@params: db_schema: db schema name | |
""" | |
params = self.db_config | |
try: | |
# connect to the PostgreSQL server | |
print('Connecting to the PostgreSQL database...') | |
self.conn = psycopg2.connect(**params) | |
cur = self.conn.cursor() | |
# display the PostgreSQL database server version | |
cur.execute('SELECT version()') | |
db_version = cur.fetchone() | |
print(f"PostgreSQL database version:\n\t{db_version}\n") | |
# set schema | |
if db_schema: | |
cur.execute(f"set search_path to {db_schema!r},public;") | |
# close the communication with the PostgreSQL | |
cur.close() | |
# return db connection | |
return self.conn | |
except (Exception, psycopg2.DatabaseError) as error: | |
print(error) | |
sys.exit(1) | |
def table_structure(self, conn, table_name): | |
'''get table structure, return dict | |
example: | |
{'column_a': 'integer', | |
'column_b': 'boolean', | |
..., | |
} | |
''' | |
with conn: | |
with conn.cursor() as cur: | |
sqlcmd = (f"SELECT column_name, data_type " | |
f"FROM information_schema.columns " | |
f"WHERE table_name = {table_name!r};") | |
# (table_name,) passed as tuple | |
cur.execute(sqlcmd, ('BADGES_SFR',)) | |
structure = cur.fetchall() | |
cur.close() | |
if len(structure) > 0: | |
return dict(structure) | |
else: | |
return | |
class DataTransformation(object): | |
"""docstring for DataTransformation""" | |
def __init__(self, raw_data): | |
super(DataTransformation, self).__init__() | |
self.raw_data = raw_data | |
def process(self, headers, t_struct): | |
records = list(self.raw_data) | |
for i, x in enumerate(headers): | |
if t_struct[x] == 'integer' \ | |
or t_struct[x] == 'bigint' \ | |
or t_struct[x] == 'smallint': | |
self._as_numeric(records, i, int) | |
elif t_struct[x] == 'double precision' \ | |
or t_struct[x] == 'real' \ | |
or t_struct[x] == 'numeric': | |
self._as_numeric(records, i, float) | |
elif t_struct[x] == 'boolean': | |
self._as_numeric(records, i, bool) | |
elif t_struct[x].startswith('character') \ | |
or t_struct[x] == 'text': | |
pass | |
elif t_struct[x] == 'date': | |
self._as_date(records, i) | |
elif t_struct[x].startswith('timestamp'): | |
self._as_timestamp(records, i) | |
elif t_struct[x].startswith('time'): | |
self._as_time(records, i) | |
return records | |
@staticmethod | |
def _as_numeric(records, index, _type): | |
if _type not in (int, float, bool): | |
raise TypeError( | |
'_type must be Python built-in numeric type: int, float, bool') | |
for x in records: | |
x[index] = _type(x[index]) | |
'''date string format | |
TODO: | |
optimize use RE | |
''' | |
@staticmethod | |
def _as_date(records, index): | |
for x in records: | |
ts = x[index] | |
if ts == '': | |
continue | |
elif isinstance(ts, float): | |
ts = xldate_as_datetime(ts, 0).strftime('%Y-%m-%d') | |
x[index] = ts | |
else: | |
raise ValueError(f'date value: {ts} - must be date format in Excel') | |
@staticmethod | |
def _as_time(records, index): | |
for x in records: | |
ts = x[index] | |
if ts == '': | |
continue | |
elif isinstance(ts, float): | |
ts = xldate_as_datetime(ts, 0).strftime('%H:%M:%S') | |
x[index] = ts | |
else: | |
raise ValueError(f'date value: {ts} - must be date format in Excel') | |
@staticmethod | |
def _as_timestamp(records, index): | |
for x in records: | |
ts = x[index] | |
if ts == '': | |
continue | |
elif isinstance(ts, float): | |
ts = xldate_as_datetime(ts, 0).isoformat() | |
x[index] = ts | |
else: | |
raise ValueError(f'date value: {ts} - must be date format in Excel') | |
class Excel2Pg(object): | |
"""docstring for Excel2Pg""" | |
def __init__(self, table_name, xls_file): | |
super(Excel2Pg, self).__init__() | |
self.table_name = table_name | |
self.xls_file = xls_file | |
self.headers = None | |
def read_xlsx(self, xls_file=None, index=0): | |
'''read MS Excel and return dicts form of generator | |
@param: xls_file: name of excel file | |
@param: index: index of Excel worksheets | |
''' | |
xls_file = xls_file if xls_file else self.xls_file | |
book = xlrd.open_workbook(xls_file) | |
print("The number of worksheets is {0}".format(book.nsheets)) | |
print("Worksheet name(s): {0}".format(book.sheet_names())) | |
sheet = book.sheet_by_index(index) | |
print("{0} rows: {1} columns: {2}\n".format( | |
sheet.name, sheet.nrows, sheet.ncols)) | |
rows = (sheet.row_values(rx) for rx in range(sheet.nrows)) | |
headers = next(rows) | |
# data = (dict(zip(headers, r)) for r in rows) | |
self.headers = headers | |
return rows | |
def process_data(self, raw, t_struct): | |
'''process excel data, format fields' data types | |
@param: raw [generator]: raw excel data | |
@param: t_struct [dict]: table structure, name and data_type of columns | |
''' | |
if not isinstance(t_struct, dict) or \ | |
not set(self.headers).issubset(set(t_struct.keys())): | |
raise ValueError( | |
"table columns must contains all fields of excel file") | |
pre = DataTransformation(raw) | |
records = pre.process(self.headers, t_struct) | |
return records | |
def copy2pg(self, conn, unique_keys, records): | |
with conn: | |
with conn.cursor() as cur: | |
for rec in records: | |
values = ''.join([f"{x!r}, " for x in rec]).rstrip(', ') | |
columns = ', '.join(self.headers) | |
r = dict(zip(self.headers, rec)) | |
update = ''.join([f"{k}={v!r}, " for k, v in r.items()]).rstrip(', ') | |
if len(unique_keys) == 0: | |
sqlcmd = (f"INSERT INTO {self.table_name} ({columns}) " | |
f"VALUES ({values})") | |
else: | |
sqlcmd = (f"INSERT INTO {self.table_name} ({columns}) " | |
f"VALUES ({values}) " | |
f"ON CONFLICT ({', '.join(unique_keys)}) " | |
f"DO UPDATE " | |
f"SET {update}") | |
# print(sqlcmd) | |
cur.execute(sqlcmd) | |
print(f"successful copy to posgres.") | |
if __name__ == '__main__': | |
# db connection configures | |
db = { | |
'host': 'localhost', | |
'port': 5432, | |
'database': 'test', | |
'user': 'wangzhiwei', | |
'password': '' | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tbool | tvchar | tint | tdate | ttime | tts | |
---|---|---|---|---|---|---|
TRUE | test | 5000 | 1999-01-08 | 04:05:06 | 1999-1-8 04:05 | |
TRUE | test | 200 | 1999-01-09 | 04:05:06 | 1999-1-9 04:05 | |
FALSE | hhh | 3000 | 1999-01-10 | 05:05:06 | 1999-1-10 04:05 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
from excel2pg import PgHelper, Excel2Pg | |
# db connection configures | |
db = { | |
'host': 'localhost', | |
'port': 5432, | |
'database': 'test', | |
'user': 'wangzhiwei', | |
'password': '' | |
} | |
if __name__ == '__main__': | |
db_schema = 'szch_v01' | |
table_name = 'test_data' | |
xls_file = 'test.xlsx' | |
conn = None | |
pg = PgHelper(db) | |
conn = pg.connect() | |
t_struct = pg.table_structure(conn, table_name) | |
# print(t_struct) | |
e2pg = Excel2Pg(table_name, xls_file) | |
raw_data = e2pg.read_xlsx() | |
records = e2pg.process_data(raw_data, t_struct) | |
e2pg.copy2pg(conn, ('tint', 'tts'), records) | |
# close connection | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment