Last active
November 19, 2016 18:54
-
-
Save mitya57/1cb49c876d094ca8be650187a3d1fdd4 to your computer and use it in GitHub Desktop.
My Oracle Shell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Alternative shell for Oracle database. | |
# Alpha version, do not use for production purposes. | |
# | |
# Author: 2016 Dmitry Shachnev <[email protected]> | |
import readline | |
import shutil | |
import sys | |
import cx_Oracle | |
def resize(string, length, fill_character=' '): | |
if length < len(string): | |
return string[:length] | |
return string + fill_character * (length - len(string)) | |
def format_headers(headers, lines): | |
columns = shutil.get_terminal_size().columns | |
header_names = list(zip(*headers))[0] | |
lines_stringified = [list(map(str, line)) for line in lines] | |
max_lengths = [max(map(len, column)) for column in zip(*[header_names] + lines_stringified)] | |
max_possible_sum = columns - 3 * (len(max_lengths) - 1) | |
lengths_sum = sum(max_lengths) | |
if lengths_sum > max_possible_sum: | |
max_lengths = [length * max_possible_sum // lengths_sum for length in max_lengths] | |
while sum(max_lengths) < max_possible_sum: | |
max_diff = 0 | |
index = None | |
for i, column in enumerate(zip(*lines_stringified)): | |
diff = max(map(len, column)) - max_lengths[i] | |
if diff > max_diff: | |
max_diff, index = diff, i | |
if index is not None: | |
max_lengths[index] += 1 | |
new_headers = (resize(*pair) for pair in zip(header_names, max_lengths)) | |
print(' | '.join(new_headers)) | |
print('-+-'.join('-' * length for length in max_lengths)) | |
for line in lines_stringified: | |
new_items = (resize(*pair) for pair in zip(line, max_lengths)) | |
print(' | '.join(new_items)) | |
def execute_query(query, query_args, cursor): | |
cursor.execute(query, query_args) | |
format_headers(cursor.description, cursor) | |
def process_query(line, cursor): | |
query = line | |
query_args = {} | |
if line.lower().startswith('desc '): | |
query = "select COLUMN_NAME, DATA_TYPE, NULLABLE from COLS where TABLE_NAME = :table_name" | |
query_args["table_name"] = line[5:].upper() | |
try: | |
execute_query(query, query_args, cursor) | |
except cx_Oracle.DatabaseError as exc: | |
error, = exc.args | |
print(error.message, file=sys.stderr) | |
def main(argv): | |
if len(argv) < 2: | |
sys.exit("usage: %s <logon string>" % sys.argv[0]) | |
try: | |
connection = cx_Oracle.Connection(argv[1]) | |
except cx_Oracle.DatabaseError as exc: | |
error, = exc.args | |
sys.exit(error.message) | |
cursor = connection.cursor() | |
while True: | |
try: | |
line = input('SQL> ') | |
except EOFError: | |
print() | |
break | |
process_query(line, cursor) | |
if __name__ == "__main__": | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment