Created
July 18, 2013 01:43
-
-
Save openinx/6026089 to your computer and use it in GitHub Desktop.
A MySQL Client Ternimal implemented by pure PYTHON.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pymysql | |
import prettytable | |
import itertools | |
import traceback | |
import ConfigParser | |
from nepo.common import utils | |
import nepo.web.constant as web_const | |
try: | |
import readline | |
readline.parse_and_bind('tab: complete') | |
readline.parse_and_bind('set editing-mode emacs') | |
except: | |
print 'Cannot ``import readline`` package.' | |
cfg_parser = None | |
def get_cfg(section, key): | |
fn = utils.fetch_cfgpath('nepo.conf', locpath=__file__, count=2) | |
global cfg_parser | |
if not cfg_parser: | |
cfg_parser = ConfigParser.ConfigParser() | |
cfg_parser.read(fn) | |
return cfg_parser.get(section, key) | |
class cfg: | |
mysql_host = get_cfg('mysql', 'host') | |
mysql_port = int(get_cfg('mysql', 'port')) | |
mysql_user = get_cfg('mysql', 'user') | |
mysql_password = get_cfg('mysql', 'password') | |
mysql_database = get_cfg('mysql', 'db_name') | |
webdb_user = web_const.DBCONFIG.CON_DB_USER | |
webdb_password = web_const.DBCONFIG.CON_DB_PASSWD | |
webdb_host = web_const.DBCONFIG.CON_DB_HOST | |
webdb_port = web_const.DBCONFIG.CON_DB_PORT | |
webdb_database = web_const.DBCONFIG.CON_DB | |
databases = { | |
cfg.mysql_database: { | |
'host': cfg.mysql_host, | |
'port': cfg.mysql_port, | |
'user': cfg.mysql_user, | |
'passwd': cfg.mysql_password, | |
'charset': 'utf8', | |
}, | |
cfg.webdb_database: { | |
'host': cfg.webdb_host, | |
'port': cfg.webdb_port, | |
'user': cfg.webdb_user, | |
'passwd': cfg.webdb_password, | |
'charset': 'utf8', | |
}, | |
} | |
CURRENT_DATABASE = databases.keys()[0] | |
class sql_type: | |
_map = { | |
'query': ['select', 'desc', 'show'], | |
'insert': ['insert'], | |
'update': ['update'], | |
'delete': ['delete'], | |
'change_db': ['use'], | |
} | |
QUERY = 'query' | |
INSERT = 'insert' | |
UPDATE = 'update' | |
DELETE = 'delete' | |
CHANGE_DB = 'change_db' | |
UNKNOWN = 'unknown' | |
@staticmethod | |
def decide(sql): | |
head = sql.partition(' ')[0] | |
for typ in sql_type._map.keys(): | |
if head and head.lower() in sql_type._map[typ]: | |
return typ | |
return sql_type.UNKNOWN | |
def connect_db(): | |
args = dict(db=CURRENT_DATABASE) | |
kw = databases[CURRENT_DATABASE] | |
args.update(kw) | |
conn = pymysql.Connect(**args) | |
return conn | |
def print_list(objs, fields, sortby_index=None): | |
if sortby_index == None: | |
sortby = None | |
else: | |
sortby = fields[sortby_index] | |
pt = prettytable.PrettyTable([f for f in fields], caching=False) | |
pt.align = 'l' | |
for o in objs: | |
row = [] | |
for field in fields: | |
data = getattr(o, field, None) | |
row.append(data) | |
pt.add_row(row) | |
print pt.get_string(sortby=sortby) | |
def print_dict(d, dict_property="Property"): | |
pt = prettytable.PrettyTable([dict_property, 'Value'], caching=False) | |
pt.align = 'l' | |
[pt.add_row(list(r)) for r in d.iteritems()] | |
print pt.get_string(sortby=dict_property) | |
class Row(dict): | |
"""A dict that allows for object-like property access syntax.""" | |
def __getattr__(self, name): | |
try: | |
return self[name] | |
except KeyError: | |
raise AttributeError(name) | |
class Method: | |
def query(self, sql, is_row=True): | |
conn = connect_db() | |
cursor = conn.cursor() | |
try: | |
start_t = time.time() | |
cursor.execute(sql) | |
column_names = [d[0] for d in cursor.description] | |
rows = [Row(itertools.izip(column_names, row)) for row in cursor] | |
if is_row: | |
print_list(rows, column_names) | |
else: | |
index = 0 | |
for row in rows: | |
index += 1 | |
print '*' * 35, 'Row', index, '*' * 35 | |
print_dict(row) | |
end_t = time.time() | |
print '%s row in set (%0.2f sec)' % (len(rows), end_t - start_t) | |
except: | |
traceback.print_exc() | |
finally: | |
cursor.close() | |
conn.close() | |
def _execute(self, sql): | |
conn = connect_db() | |
cursor = conn.cursor() | |
try: | |
start_t = time.time() | |
rows = cursor.execute(sql) | |
end_t = time.time() | |
print 'Query OK, %d rows affected (%0.2f sec)' % ( | |
rows, end_t - start_t) | |
conn.commit() | |
except: | |
traceback.print_exc() | |
finally: | |
cursor.close() | |
conn.close() | |
def delete(self, sql, is_row=True): | |
self._execute(sql) | |
def update(self, sql, is_row=True): | |
self._execute(sql) | |
def insert(self, sql, is_row=True): | |
self._execute(sql) | |
def unknown(self, sql, is_row=True): | |
print 'Unknown SQL sentences, Ignore.' | |
def change_db(self, sql, is_row=True): | |
global CURRENT_DATABASE | |
CURRENT_DATABASE = sql.rpartition(' ')[2] | |
print 'Database changed' | |
def _G(sql): | |
sql = sql.strip() | |
if sql[-1] == ';': | |
sql = sql[:-1] | |
sql = sql.strip() | |
if len(sql) >= 2: | |
tail = sql[-2:] | |
if tail == '\G' or tail == '\g': | |
return sql[:-2], False | |
return sql, True | |
if __name__ == '__main__': | |
method_instance = Method() | |
while True: | |
sql = raw_input('pysql> ') | |
sql = sql.strip() | |
stype = sql_type.decide(sql) | |
method = getattr(method_instance, stype) | |
sql, is_row = _G(sql) | |
method(sql, is_row) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment