-
-
Save jaf7/74ddf846ddd29c2c5a4876b86ac6075b to your computer and use it in GitHub Desktop.
mysql wrapper in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pymysql | |
import os.path | |
import sys | |
from configparser import ConfigParser | |
from pymysql import (escape_sequence, cursors) | |
sys.path.append(os.path.dirname(__file__)) | |
config = ConfigParser() | |
base_path = os.path.abspath( | |
os.path.join(os.path.dirname(__file__), '..')) | |
file_path = os.path.abspath( | |
os.path.join(base_path, 'config', 'config.ini')) | |
config.read(file_path) | |
__all__ = ['MysqlConnection'] | |
class MysqlConnection(object): | |
_instance = None | |
def __init__(self): | |
self.create_connection() | |
def create_connection(self): | |
self.connection = pymysql.connect(host=config.get('mysql', 'host'), user=config.get('mysql', 'user'), | |
passwd=config.get('mysql', 'pass'), db=config.get('mysql', 'database'), | |
port=3306, charset='utf8', cursorclass=cursors.DictCursor) | |
def get_conn(self): | |
self.ping_connect() | |
return self.connection | |
def ping_connect(self): | |
try: | |
self.connection.ping() | |
except: | |
self.create_connection() | |
def insert(self, table, data): | |
with self.connection.cursor() as cursor: | |
params = self.join_field_value(data) | |
sql = "INSERT IGNORE INTO {table} SET {params}".format( | |
table=table, params=params) | |
cursor.execute(sql, tuple(data.values())) | |
last_id = self.connection.insert_id() | |
self.connection.commit() | |
return last_id | |
def bulk_insert(self, table, data): | |
assert isinstance(data, list) and data != [], "data format is error" | |
with self.connection.cursor() as cursor: | |
params = [] | |
for param in data: | |
params.append(escape_sequence(param.values(), 'utf-8')) | |
values = ', '.join(params) | |
fields = ', '.join('`{}`'.format(x) for x in param.keys()) | |
sql = u"INSERT IGNORE INTO {table} ({fields}) VALUES {values}".format( | |
fields=fields, table=table, values=values) | |
cursor.execute(sql) | |
last_id = self.connection.insert_id() | |
self.connection.commit() | |
return last_id | |
def update(self, table, data, condition=None): | |
""" | |
mysql update() function | |
""" | |
with self.connection.cursor() as cursor: | |
prepared = [] | |
params = self.join_field_value(data) | |
prepared.extend(data.values()) | |
if not condition: | |
where = '1' | |
elif isinstance(condition, dict): | |
where = self.join_field_value(condition, ' AND ') | |
prepared.extend(condition.values()) | |
else: | |
where = condition | |
sql = "UPDATE IGNORE {table} SET {params} WHERE {where}".format( | |
table=table, params=params, where=where) | |
# check PreparedStatement | |
if not prepared: | |
result = cursor.execute(sql) | |
else: | |
result = cursor.execute(sql, tuple(prepared)) | |
self.connection.commit() | |
return result | |
def count(self, table, condition=None): | |
""" | |
count database record | |
Use sql.PreparedStatement method | |
""" | |
with self.connection.cursor() as cursor: | |
prepared = [] | |
if not condition: | |
where = '1' | |
elif isinstance(condition, dict): | |
where = self.join_field_value(condition, ' AND ') | |
prepared.extend(condition.values()) | |
else: | |
where = condition | |
sql = "SELECT COUNT(*) as cnt FROM {table} WHERE {where}".format( | |
table=table, where=where) | |
if not prepared: | |
cursor.execute(sql) | |
else: | |
cursor.execute(sql, tuple(prepared)) | |
self.connection.commit() | |
return cursor.fetchone().get('cnt') | |
def fetch_rows(self, table, fields=None, condition=None, order=None, limit=None, fetchone=False): | |
""" | |
mysql select() function | |
Use sql.PreparedStatement method | |
""" | |
with self.connection.cursor() as cursor: | |
prepared = [] | |
if not fields: | |
fields = '*' | |
elif isinstance(fields, tuple) or isinstance(fields, list): | |
fields = '`{0}`'.format('`, `'.join(fields)) | |
else: | |
fields = fields | |
if not condition: | |
where = '1' | |
elif isinstance(condition, dict): | |
where = self.join_field_value(condition, ' AND ') | |
prepared.extend(condition.values()) | |
else: | |
where = condition | |
if not order: | |
orderby = '' | |
else: | |
orderby = 'ORDER BY {order}'.format(order=order) | |
limits = "LIMIT {limit}".format(limit=limit) if limit else "" | |
sql = "SELECT {fields} FROM {table} WHERE {where} {orderby} {limits}".format( | |
fields=fields, table=table, where=where, orderby=orderby, limits=limits) | |
if not prepared: | |
cursor.execute(sql) | |
else: | |
cursor.execute(sql, tuple(prepared)) | |
self.connection.commit() | |
return cursor.fetchone() if fetchone else cursor.fetchall() | |
def query(self, sql, fetchone=False, execute=False): | |
"""execute custom sql query""" | |
with self.connection.cursor() as cursor: | |
cursor.execute(sql) | |
self.connection.commit() | |
if execute: | |
return | |
return cursor.fetchone() if fetchone else cursor.fetchall() | |
def join_field_value(self, data, glue=', '): | |
sql = comma = '' | |
for key in data.keys(): | |
sql += "{}`{}` = %s".format(comma, key) | |
comma = glue | |
return sql | |
def close(self): | |
if getattr(self, 'connection', 0): | |
return self.connection.close() | |
def __del__(self): | |
self.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment