Last active
December 22, 2020 09:21
-
-
Save toast254/8b8a1f162299e56c1b1b05facb4f7608 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import csv | |
import psycopg2 | |
# Connection details: | |
dbname = 'sampledb' | |
user = 'postgres' | |
host = '127.0.0.1' | |
password = '123456789' | |
def get_tables_databases(): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute('select table_name from INFORMATION_SCHEMA.TABLES ' | |
'where table_type=\'BASE TABLE\' ' | |
'and table_schema not in (\'pg_catalog\', \'information_schema\');') | |
return [r for (r,) in cur] | |
def get_column_table(table_name: str): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS ' | |
'where table_name=\'' + table_name + '\';') | |
return [r for (r,) in cur] | |
def get_column_details_table(table_name: str): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute('select column_name, data_type, character_maximum_length from INFORMATION_SCHEMA.COLUMNS ' | |
'where table_name=\'' + table_name + '\';') | |
return [r for r in cur] | |
def get_length_table(table_name: str): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute('select count(*) from "' + table_name + '";') | |
return cur.fetchone()[0] | |
def get_aprox_length_table(table_name: str): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute('select reltuples as estimate from pg_class ' | |
'where relname=\'' + table_name + '\';') | |
return cur.fetchone()[0] | |
def get_size_database(): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute('select pg_size_pretty(pg_database_size(pg_database.datname)) as size from pg_database ' | |
'where pg_database.datname=\'' + dbname + '\';') | |
return cur.fetchone()[0] | |
def get_rows_table(table_name: str,): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute('select * from ' + table_name + ';') | |
return cur.fetchall() | |
def export_table_csv(table_name: str, file_path: str = None, with_title: bool = False): | |
file_name = os.path.join(file_path if file_path else '.', table_name + '.csv') | |
os.makedirs(os.path.dirname(file_name), exist_ok=True) | |
with open(file_name, mode="w", encoding="UTF-8") as csvfile: | |
csvwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
# column names | |
if with_title: | |
with conn.cursor() as cur: | |
cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS ' | |
'where table_name=\'' + table_name + '\';') | |
title = [] | |
for (r,) in cur: | |
title.append(r) | |
csvwriter.writerow(title) | |
# content | |
with conn.cursor('server-side-cursor') as cur: | |
cur.itersize = 100000 # how much records to buffer on a client | |
cur.execute('select * from ' + table_name + ';') | |
for r in cur: | |
csvwriter.writerow(r) | |
def export_database_csv(file_path: str = None, with_title: bool = False): | |
file_path = file_path if file_path else dbname | |
os.makedirs(file_path, exist_ok=True) | |
for table in get_tables_databases(): | |
export_table_csv(table, file_path, with_title) | |
def export_table_copy(table_name: str, file_path: str = None, with_title: bool = False): | |
file_name = os.path.join(file_path if file_path else '.', table_name + '.copy') | |
os.makedirs(os.path.dirname(file_name), exist_ok=True) | |
with open(file_name, mode="w", encoding="UTF-8") as copyfile: | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
# column names | |
if with_title: | |
cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS ' | |
'where table_name=\'' + table_name + '\';') | |
title = '' | |
for (r,) in cur: | |
title += r + '|' | |
copyfile.write(title[:-1] + '\n') | |
# content | |
cur.copy_to(copyfile, table_name, sep="|") | |
def export_database_copy(file_path: str = None, with_title: bool = False): | |
file_path = file_path if file_path else dbname | |
os.makedirs(file_path, exist_ok=True) | |
for table in get_tables_databases(): | |
export_table_copy(table, file_path, with_title) | |
def export_table_sql(table_name: str, file_path: str = None): | |
file_name = os.path.join(file_path if file_path else '.', table_name + '.sql') | |
os.makedirs(os.path.dirname(file_name), exist_ok=True) | |
with open(file_name, mode="w", encoding="UTF-8") as sqlfile: | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
# table | |
with conn.cursor() as cur: | |
cur.execute('select column_name, data_type, character_maximum_length from INFORMATION_SCHEMA.COLUMNS ' | |
'where table_name=\'' + table_name + '\';') | |
columns = [] | |
for r in cur: | |
(cname, ctype, clen) = r | |
if ctype in {'character varying', 'varchar', 'character', 'char', 'text'}: | |
ctype = 'varchar' | |
elif ctype == 'money': | |
ctype = 'double precision' | |
elif ctype == 'bytea': | |
ctype = 'BLOB' | |
clen = '(' + str(clen) + ')' if clen and ctype == 'varchar' else '' | |
columns.append(cname + ' ' + ctype + clen) | |
sqlfile.write('create table ' + table_name + ' (' + ', '.join(columns) + ');\n') | |
# rows | |
with conn.cursor('server-side-cursor') as cur: | |
cur.itersize = 100000 # how much records to buffer on a client | |
cur.execute('select * from ' + table_name + ';') | |
for r in cur: | |
data = [] | |
for rd in r: | |
if rd == 'null': | |
data.append('NULL') | |
else: | |
if isinstance(rd, str): | |
rd = rd.replace('\'', '\'\'') | |
data.append(repr(rd).replace('"', '\'')) | |
sqlfile.write('insert into ' + table_name + ' values (' + ', '.join(data) + ');\n') | |
def export_database_sql(file_path: str = None): | |
file_path = file_path if file_path else dbname | |
os.makedirs(file_path, exist_ok=True) | |
for table in get_tables_databases(): | |
export_table_sql(table, file_path) | |
def erase_database(): | |
tables = get_tables_databases() | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
for table in tables: | |
cur.execute('drop table if exists ' + table + ' cascade;') | |
full_vacuum() | |
def create_database(dbname: str): | |
with psycopg2.connect(dbname='postgres', user=user, host=host, password=password) as conn: | |
old_isolation_level = conn.isolation_level | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
with conn.cursor() as cur: | |
cur.execute('create database ' + dbname + ';') | |
conn.set_isolation_level(old_isolation_level) | |
def delete_database(dbname: str): | |
with psycopg2.connect(dbname='postgres', user=user, host=host, password=password) as conn: | |
old_isolation_level = conn.isolation_level | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
with conn.cursor() as cur: | |
cur.execute('drop database ' + dbname + ';') | |
conn.set_isolation_level(old_isolation_level) | |
def vacuum(full: bool= False, freeze: bool = False, verbose: bool = False, analyze: bool = False, table: str = None, column: str = None): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
old_isolation_level = conn.isolation_level | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
with conn.cursor() as cur: | |
cur.execute('vacuum ' + ('full ' if full else '') + | |
('freeze ' if freeze else '') + | |
('verbose ' if verbose else '') + | |
('analyze ' if analyze else '') + | |
(table + (' ' + column if column else '') if table else '') + ';') | |
conn.set_isolation_level(old_isolation_level) | |
def analyze(verbose: bool = False, table: str = None, column: str = None): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
old_isolation_level = conn.isolation_level | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
with conn.cursor() as cur: | |
cur.execute('analyze ' + ('verbose ' if verbose else '') + | |
(table + (' ' + column if column else '') if table else '') + ';') | |
conn.set_isolation_level(old_isolation_level) | |
def import_sql_file(file_name: str): | |
with open(file_name, mode="r", encoding="UTF-8") as sqlfile: | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
for line in sqlfile: | |
cur.execute(line) | |
def execute_query(query: str): | |
with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn: | |
with conn.cursor() as cur: | |
cur.execute(query) | |
if cur.rownumber: | |
return cur.fetchall() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment