Created
January 11, 2019 16:47
-
-
Save millerjs/7636b1549fe3fe1df392c081f0bb69eb to your computer and use it in GitHub Desktop.
CSV Postgres importer script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from os.path import basename, splitext | |
from sqlalchemy import create_engine | |
import argparse | |
import getpass | |
import os | |
import pandas as pd | |
import re | |
def read_dataframe(path): | |
print(f"Parsing CSV '{path}'\n") | |
extension = os.path.splitext(path)[1].lower() | |
if extension == '.json': | |
df = pd.read_json(path) | |
elif extension == '.csv': | |
df = pd.read_csv(path) | |
elif extension.startswith('.xl'): | |
df = pd.read_excel(path) | |
else: | |
raise RuntimeError(f"Unknown file extension, '{extension}") | |
df.columns = [to_snake_case(c) for c in df.columns] | |
return df | |
def drop_table(engine, table): | |
print(f"Droping table '{table}'") | |
engine.execute(f'drop table if exists {table}') | |
def csv_to_sql(engine, path, table, no_drop=False): | |
if not no_drop: | |
drop_table(engine, table) | |
df = read_dataframe(path) | |
print(f"Loading CSV '{path}' into table '{table}'\n") | |
df.to_sql(table, engine) | |
def to_snake_case(name): | |
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) | |
s2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1) | |
s3 = re.sub('[^a-zA-Z\d]', r'_', s2) | |
s4 = re.sub('_+', r'_', s3) | |
s5 = re.sub('_$', r'', s4) | |
return s5.lower() | |
def parse_table_arg(arg): | |
if ':' in arg: | |
table, path = arg.split(':', 1) | |
else: | |
path = arg | |
table = os.path.splitext(os.path.basename(arg))[0] | |
return to_snake_case(table), path | |
def run(args): | |
engine = create_engine(f'postgresql://{args.user}@{args.host}:{args.port}/{args.database}') | |
table_args = [parse_table_arg(arg) for arg in args.args] | |
for table, path in table_args: | |
csv_to_sql(engine, path, table, args.no_drop) | |
os.system(f"psql --host {args.host} --port {args.port} --user {args.user} {args.database}") | |
def arg_parser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('args', nargs='*', help='Paths to files. optionally prepend with \'tablename:\', e.g. \'users:Users.csv\'') | |
parser.add_argument('-n', '--no-drop', action='store_true') | |
parser.add_argument('-t', '--table', default='csv') | |
parser.add_argument('-U', '--user', default=getpass.getuser()) | |
parser.add_argument('-H', '--host', default='localhost') | |
parser.add_argument('-D', '--database', default='cql') | |
parser.add_argument('-P', '--port', default='5432') | |
parser.add_argument('--debug', action='store_true', help='show stacktrace on error') | |
return parser | |
if __name__ == '__main__': | |
args = arg_parser().parse_args() | |
try: | |
run(args) | |
except Exception as exception: | |
if args.debug: | |
raise | |
print(f"\nError: {exception}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment