Skip to content

Instantly share code, notes, and snippets.

@toast254
Last active December 22, 2020 09:21
Show Gist options
  • Save toast254/8b8a1f162299e56c1b1b05facb4f7608 to your computer and use it in GitHub Desktop.
Save toast254/8b8a1f162299e56c1b1b05facb4f7608 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import csv
import psycopg2
# Connection details:
dbname = 'sampledb'
user = 'postgres'
host = '127.0.0.1'
password = '123456789'
def get_tables_databases():
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute('select table_name from INFORMATION_SCHEMA.TABLES '
'where table_type=\'BASE TABLE\' '
'and table_schema not in (\'pg_catalog\', \'information_schema\');')
return [r for (r,) in cur]
def get_column_table(table_name: str):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS '
'where table_name=\'' + table_name + '\';')
return [r for (r,) in cur]
def get_column_details_table(table_name: str):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute('select column_name, data_type, character_maximum_length from INFORMATION_SCHEMA.COLUMNS '
'where table_name=\'' + table_name + '\';')
return [r for r in cur]
def get_length_table(table_name: str):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute('select count(*) from "' + table_name + '";')
return cur.fetchone()[0]
def get_aprox_length_table(table_name: str):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute('select reltuples as estimate from pg_class '
'where relname=\'' + table_name + '\';')
return cur.fetchone()[0]
def get_size_database():
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute('select pg_size_pretty(pg_database_size(pg_database.datname)) as size from pg_database '
'where pg_database.datname=\'' + dbname + '\';')
return cur.fetchone()[0]
def get_rows_table(table_name: str,):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute('select * from ' + table_name + ';')
return cur.fetchall()
def export_table_csv(table_name: str, file_path: str = None, with_title: bool = False):
file_name = os.path.join(file_path if file_path else '.', table_name + '.csv')
os.makedirs(os.path.dirname(file_name), exist_ok=True)
with open(file_name, mode="w", encoding="UTF-8") as csvfile:
csvwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL)
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
# column names
if with_title:
with conn.cursor() as cur:
cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS '
'where table_name=\'' + table_name + '\';')
title = []
for (r,) in cur:
title.append(r)
csvwriter.writerow(title)
# content
with conn.cursor('server-side-cursor') as cur:
cur.itersize = 100000 # how much records to buffer on a client
cur.execute('select * from ' + table_name + ';')
for r in cur:
csvwriter.writerow(r)
def export_database_csv(file_path: str = None, with_title: bool = False):
file_path = file_path if file_path else dbname
os.makedirs(file_path, exist_ok=True)
for table in get_tables_databases():
export_table_csv(table, file_path, with_title)
def export_table_copy(table_name: str, file_path: str = None, with_title: bool = False):
file_name = os.path.join(file_path if file_path else '.', table_name + '.copy')
os.makedirs(os.path.dirname(file_name), exist_ok=True)
with open(file_name, mode="w", encoding="UTF-8") as copyfile:
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
# column names
if with_title:
cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS '
'where table_name=\'' + table_name + '\';')
title = ''
for (r,) in cur:
title += r + '|'
copyfile.write(title[:-1] + '\n')
# content
cur.copy_to(copyfile, table_name, sep="|")
def export_database_copy(file_path: str = None, with_title: bool = False):
file_path = file_path if file_path else dbname
os.makedirs(file_path, exist_ok=True)
for table in get_tables_databases():
export_table_copy(table, file_path, with_title)
def export_table_sql(table_name: str, file_path: str = None):
file_name = os.path.join(file_path if file_path else '.', table_name + '.sql')
os.makedirs(os.path.dirname(file_name), exist_ok=True)
with open(file_name, mode="w", encoding="UTF-8") as sqlfile:
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
# table
with conn.cursor() as cur:
cur.execute('select column_name, data_type, character_maximum_length from INFORMATION_SCHEMA.COLUMNS '
'where table_name=\'' + table_name + '\';')
columns = []
for r in cur:
(cname, ctype, clen) = r
if ctype in {'character varying', 'varchar', 'character', 'char', 'text'}:
ctype = 'varchar'
elif ctype == 'money':
ctype = 'double precision'
elif ctype == 'bytea':
ctype = 'BLOB'
clen = '(' + str(clen) + ')' if clen and ctype == 'varchar' else ''
columns.append(cname + ' ' + ctype + clen)
sqlfile.write('create table ' + table_name + ' (' + ', '.join(columns) + ');\n')
# rows
with conn.cursor('server-side-cursor') as cur:
cur.itersize = 100000 # how much records to buffer on a client
cur.execute('select * from ' + table_name + ';')
for r in cur:
data = []
for rd in r:
if rd == 'null':
data.append('NULL')
else:
if isinstance(rd, str):
rd = rd.replace('\'', '\'\'')
data.append(repr(rd).replace('"', '\''))
sqlfile.write('insert into ' + table_name + ' values (' + ', '.join(data) + ');\n')
def export_database_sql(file_path: str = None):
file_path = file_path if file_path else dbname
os.makedirs(file_path, exist_ok=True)
for table in get_tables_databases():
export_table_sql(table, file_path)
def erase_database():
tables = get_tables_databases()
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
for table in tables:
cur.execute('drop table if exists ' + table + ' cascade;')
full_vacuum()
def create_database(dbname: str):
with psycopg2.connect(dbname='postgres', user=user, host=host, password=password) as conn:
old_isolation_level = conn.isolation_level
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
with conn.cursor() as cur:
cur.execute('create database ' + dbname + ';')
conn.set_isolation_level(old_isolation_level)
def delete_database(dbname: str):
with psycopg2.connect(dbname='postgres', user=user, host=host, password=password) as conn:
old_isolation_level = conn.isolation_level
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
with conn.cursor() as cur:
cur.execute('drop database ' + dbname + ';')
conn.set_isolation_level(old_isolation_level)
def vacuum(full: bool= False, freeze: bool = False, verbose: bool = False, analyze: bool = False, table: str = None, column: str = None):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
old_isolation_level = conn.isolation_level
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
with conn.cursor() as cur:
cur.execute('vacuum ' + ('full ' if full else '') +
('freeze ' if freeze else '') +
('verbose ' if verbose else '') +
('analyze ' if analyze else '') +
(table + (' ' + column if column else '') if table else '') + ';')
conn.set_isolation_level(old_isolation_level)
def analyze(verbose: bool = False, table: str = None, column: str = None):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
old_isolation_level = conn.isolation_level
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
with conn.cursor() as cur:
cur.execute('analyze ' + ('verbose ' if verbose else '') +
(table + (' ' + column if column else '') if table else '') + ';')
conn.set_isolation_level(old_isolation_level)
def import_sql_file(file_name: str):
with open(file_name, mode="r", encoding="UTF-8") as sqlfile:
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
for line in sqlfile:
cur.execute(line)
def execute_query(query: str):
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
with conn.cursor() as cur:
cur.execute(query)
if cur.rownumber:
return cur.fetchall()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment