Skip to content

Instantly share code, notes, and snippets.

@nevadajames
Last active May 24, 2019 12:54
Show Gist options
  • Save nevadajames/60f0720302085752ffd637926d314ebd to your computer and use it in GitHub Desktop.
Save nevadajames/60f0720302085752ffd637926d314ebd to your computer and use it in GitHub Desktop.
Connect and run basic query on postgres database
import csv
class CsvMapper():
"""export database queries to csv"""
def __init__(self, data):
self.data = data
self.headers = data['headers']
self.records = data['records']
def write_csv(self, destination):
"""write csv file for results"""
with open(destination, 'wt', newline='') as file:
writer = csv.writer(file, delimiter=',')
writer.writerow(i for i in self.headers)
for record in self.records:
writer.writerow(record)
def prompt_filename(self):
"""prompt user for destination for csv"""
filename = input("Where would you like to save the file?\n")
if filename.split(".")[-1] == 'csv':
print(f"Writing results to #{filename}")
self.write_csv(filename)
else:
print(f"Writing results to #{filename}.csv")
self.write_csv(f"#{filename}.csv")
class Configuration():
settings = {'name':'your_database_name', 'user': 'postgres', 'host': 'localhost', 'port': '5432'}
#!/usr/local/bin/python3
"""Run queries on PostgreSQL database and export as CSV"""
from colorama import init
from termcolor import cprint
from pyfiglet import figlet_format
from db_settings import Configuration
from postgres_connection import PostgresConnection
from csv_mapper import CsvMapper
def db_request(connection):
""" Interface for user input to make query"""
response = input("Would you like to make a request? Y/y. any other key to exit: ")
if response in ('y', 'Y'):
query = input('Make a query: ')
try:
records = connection.select_data(query)
connection.print_table(records)
csv_request = input('Would you like to export this request?: ')
if csv_request in ('y', 'Y'):
mapper = CsvMapper(records)
mapper.write_csv()
db_request(connection)
except Exception as ex:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(ex).__name__, ex.args)
print(message)
connection.abort_query()
db_request(connection)
else:
response = input("Are you sure?: ")
if response in ('y', 'Y'):
print("Closing connection to database. Goodbye")
connection.close_connection()
exit()
else:
db_request(connection)
def main():
"""Run script"""
cprint(figlet_format("Nevada's PostgreSQL Client", font='standard', width=200), 'cyan', attrs=['bold'])
pg_connection = PostgresConnection(Configuration.settings)
pg_connection.start()
db_request(pg_connection)
main()
""" Logic for connecting to Database and executing queries"""
import psycopg2
from tabulate import tabulate
class PostgresConnection():
"""PostgreSQL client"""
def __init__(self, database_settings):
self.name = database_settings['name']
self.user = database_settings['user']
self.host = database_settings['host']
self.port = database_settings['port']
self.settings = (f"dbname={self.name} user={self.user} host={self.host} port={self.port}")
self.conn = None
def start(self):
"""Initialize connection to database"""
try:
self.conn = psycopg2.connect(self.settings)
self.postgres_version()
except psycopg2.OperationalError as e:
print(f"I am unable to connect to the database:{e}")
def inspect(self):
"""Display database connection settings"""
print(self.conn.get_dsn_parameters(), "\n")
def postgres_version(self):
"""Display PostgreSQL version"""
cursor = self.conn.cursor()
cursor.execute('SELECT VERSION()')
print(cursor.fetchone(), "\n")
def select_data(self, query):
""" Format query to send to database"""
print(query)
select_query = f"{query}"
records = self.execute_query("all", select_query)
return records
@staticmethod
def print_table(results):
""" Display results of query in tabular format"""
records = results['records']
headers = results['headers']
print(tabulate(records, headers=headers, tablefmt='psql'))
def close_connection(self):
"""close connection to database"""
self.conn.close()
def abort_query(self):
"""Rollback transaction"""
self.conn.rollback()
def execute_query(self, option, query):
"""execute fetchall or fetchone query"""
cursor = self.conn.cursor()
print(query)
cursor.execute(query)
fields_names = [i[0] for i in cursor.description]
result = {
'all': cursor.fetchall(),
'one': cursor.fetchone()
}[option]
return{'records': result, 'headers': fields_names}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment