Last active
May 24, 2019 12:54
-
-
Save nevadajames/60f0720302085752ffd637926d314ebd to your computer and use it in GitHub Desktop.
Connect and run basic query on postgres database
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
class CsvMapper(): | |
"""export database queries to csv""" | |
def __init__(self, data): | |
self.data = data | |
self.headers = data['headers'] | |
self.records = data['records'] | |
def write_csv(self, destination): | |
"""write csv file for results""" | |
with open(destination, 'wt', newline='') as file: | |
writer = csv.writer(file, delimiter=',') | |
writer.writerow(i for i in self.headers) | |
for record in self.records: | |
writer.writerow(record) | |
def prompt_filename(self): | |
"""prompt user for destination for csv""" | |
filename = input("Where would you like to save the file?\n") | |
if filename.split(".")[-1] == 'csv': | |
print(f"Writing results to #{filename}") | |
self.write_csv(filename) | |
else: | |
print(f"Writing results to #{filename}.csv") | |
self.write_csv(f"#{filename}.csv") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Configuration(): | |
settings = {'name':'your_database_name', 'user': 'postgres', 'host': 'localhost', 'port': '5432'} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
"""Run queries on PostgreSQL database and export as CSV""" | |
from colorama import init | |
from termcolor import cprint | |
from pyfiglet import figlet_format | |
from db_settings import Configuration | |
from postgres_connection import PostgresConnection | |
from csv_mapper import CsvMapper | |
def db_request(connection): | |
""" Interface for user input to make query""" | |
response = input("Would you like to make a request? Y/y. any other key to exit: ") | |
if response in ('y', 'Y'): | |
query = input('Make a query: ') | |
try: | |
records = connection.select_data(query) | |
connection.print_table(records) | |
csv_request = input('Would you like to export this request?: ') | |
if csv_request in ('y', 'Y'): | |
mapper = CsvMapper(records) | |
mapper.write_csv() | |
db_request(connection) | |
except Exception as ex: | |
template = "An exception of type {0} occurred. Arguments:\n{1!r}" | |
message = template.format(type(ex).__name__, ex.args) | |
print(message) | |
connection.abort_query() | |
db_request(connection) | |
else: | |
response = input("Are you sure?: ") | |
if response in ('y', 'Y'): | |
print("Closing connection to database. Goodbye") | |
connection.close_connection() | |
exit() | |
else: | |
db_request(connection) | |
def main(): | |
"""Run script""" | |
cprint(figlet_format("Nevada's PostgreSQL Client", font='standard', width=200), 'cyan', attrs=['bold']) | |
pg_connection = PostgresConnection(Configuration.settings) | |
pg_connection.start() | |
db_request(pg_connection) | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Logic for connecting to Database and executing queries""" | |
import psycopg2 | |
from tabulate import tabulate | |
class PostgresConnection(): | |
"""PostgreSQL client""" | |
def __init__(self, database_settings): | |
self.name = database_settings['name'] | |
self.user = database_settings['user'] | |
self.host = database_settings['host'] | |
self.port = database_settings['port'] | |
self.settings = (f"dbname={self.name} user={self.user} host={self.host} port={self.port}") | |
self.conn = None | |
def start(self): | |
"""Initialize connection to database""" | |
try: | |
self.conn = psycopg2.connect(self.settings) | |
self.postgres_version() | |
except psycopg2.OperationalError as e: | |
print(f"I am unable to connect to the database:{e}") | |
def inspect(self): | |
"""Display database connection settings""" | |
print(self.conn.get_dsn_parameters(), "\n") | |
def postgres_version(self): | |
"""Display PostgreSQL version""" | |
cursor = self.conn.cursor() | |
cursor.execute('SELECT VERSION()') | |
print(cursor.fetchone(), "\n") | |
def select_data(self, query): | |
""" Format query to send to database""" | |
print(query) | |
select_query = f"{query}" | |
records = self.execute_query("all", select_query) | |
return records | |
@staticmethod | |
def print_table(results): | |
""" Display results of query in tabular format""" | |
records = results['records'] | |
headers = results['headers'] | |
print(tabulate(records, headers=headers, tablefmt='psql')) | |
def close_connection(self): | |
"""close connection to database""" | |
self.conn.close() | |
def abort_query(self): | |
"""Rollback transaction""" | |
self.conn.rollback() | |
def execute_query(self, option, query): | |
"""execute fetchall or fetchone query""" | |
cursor = self.conn.cursor() | |
print(query) | |
cursor.execute(query) | |
fields_names = [i[0] for i in cursor.description] | |
result = { | |
'all': cursor.fetchall(), | |
'one': cursor.fetchone() | |
}[option] | |
return{'records': result, 'headers': fields_names} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment