Skip to content

Instantly share code, notes, and snippets.

@jaf7
Forked from larry1001/libmysql.py
Created February 26, 2021 16:54
Show Gist options
  • Save jaf7/74ddf846ddd29c2c5a4876b86ac6075b to your computer and use it in GitHub Desktop.
Save jaf7/74ddf846ddd29c2c5a4876b86ac6075b to your computer and use it in GitHub Desktop.
mysql wrapper in python
import pymysql
import os.path
import sys
from configparser import ConfigParser
from pymysql import (escape_sequence, cursors)
sys.path.append(os.path.dirname(__file__))
config = ConfigParser()
base_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..'))
file_path = os.path.abspath(
os.path.join(base_path, 'config', 'config.ini'))
config.read(file_path)
__all__ = ['MysqlConnection']
class MysqlConnection(object):
_instance = None
def __init__(self):
self.create_connection()
def create_connection(self):
self.connection = pymysql.connect(host=config.get('mysql', 'host'), user=config.get('mysql', 'user'),
passwd=config.get('mysql', 'pass'), db=config.get('mysql', 'database'),
port=3306, charset='utf8', cursorclass=cursors.DictCursor)
def get_conn(self):
self.ping_connect()
return self.connection
def ping_connect(self):
try:
self.connection.ping()
except:
self.create_connection()
def insert(self, table, data):
with self.connection.cursor() as cursor:
params = self.join_field_value(data)
sql = "INSERT IGNORE INTO {table} SET {params}".format(
table=table, params=params)
cursor.execute(sql, tuple(data.values()))
last_id = self.connection.insert_id()
self.connection.commit()
return last_id
def bulk_insert(self, table, data):
assert isinstance(data, list) and data != [], "data format is error"
with self.connection.cursor() as cursor:
params = []
for param in data:
params.append(escape_sequence(param.values(), 'utf-8'))
values = ', '.join(params)
fields = ', '.join('`{}`'.format(x) for x in param.keys())
sql = u"INSERT IGNORE INTO {table} ({fields}) VALUES {values}".format(
fields=fields, table=table, values=values)
cursor.execute(sql)
last_id = self.connection.insert_id()
self.connection.commit()
return last_id
def update(self, table, data, condition=None):
"""
mysql update() function
"""
with self.connection.cursor() as cursor:
prepared = []
params = self.join_field_value(data)
prepared.extend(data.values())
if not condition:
where = '1'
elif isinstance(condition, dict):
where = self.join_field_value(condition, ' AND ')
prepared.extend(condition.values())
else:
where = condition
sql = "UPDATE IGNORE {table} SET {params} WHERE {where}".format(
table=table, params=params, where=where)
# check PreparedStatement
if not prepared:
result = cursor.execute(sql)
else:
result = cursor.execute(sql, tuple(prepared))
self.connection.commit()
return result
def count(self, table, condition=None):
"""
count database record
Use sql.PreparedStatement method
"""
with self.connection.cursor() as cursor:
prepared = []
if not condition:
where = '1'
elif isinstance(condition, dict):
where = self.join_field_value(condition, ' AND ')
prepared.extend(condition.values())
else:
where = condition
sql = "SELECT COUNT(*) as cnt FROM {table} WHERE {where}".format(
table=table, where=where)
if not prepared:
cursor.execute(sql)
else:
cursor.execute(sql, tuple(prepared))
self.connection.commit()
return cursor.fetchone().get('cnt')
def fetch_rows(self, table, fields=None, condition=None, order=None, limit=None, fetchone=False):
"""
mysql select() function
Use sql.PreparedStatement method
"""
with self.connection.cursor() as cursor:
prepared = []
if not fields:
fields = '*'
elif isinstance(fields, tuple) or isinstance(fields, list):
fields = '`{0}`'.format('`, `'.join(fields))
else:
fields = fields
if not condition:
where = '1'
elif isinstance(condition, dict):
where = self.join_field_value(condition, ' AND ')
prepared.extend(condition.values())
else:
where = condition
if not order:
orderby = ''
else:
orderby = 'ORDER BY {order}'.format(order=order)
limits = "LIMIT {limit}".format(limit=limit) if limit else ""
sql = "SELECT {fields} FROM {table} WHERE {where} {orderby} {limits}".format(
fields=fields, table=table, where=where, orderby=orderby, limits=limits)
if not prepared:
cursor.execute(sql)
else:
cursor.execute(sql, tuple(prepared))
self.connection.commit()
return cursor.fetchone() if fetchone else cursor.fetchall()
def query(self, sql, fetchone=False, execute=False):
"""execute custom sql query"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
self.connection.commit()
if execute:
return
return cursor.fetchone() if fetchone else cursor.fetchall()
def join_field_value(self, data, glue=', '):
sql = comma = ''
for key in data.keys():
sql += "{}`{}` = %s".format(comma, key)
comma = glue
return sql
def close(self):
if getattr(self, 'connection', 0):
return self.connection.close()
def __del__(self):
self.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment