Skip to content

Instantly share code, notes, and snippets.

@syfun
Created July 1, 2017 04:17
Show Gist options
  • Save syfun/56feffcb0e5b772b45eca0e365b030e9 to your computer and use it in GitHub Desktop.
Save syfun/56feffcb0e5b772b45eca0e365b030e9 to your computer and use it in GitHub Desktop.
python async mysql redis http
import asyncio
import aiomysql
from aiomysql.cursors import DictCursor
class MysqlWrapper:
def __init__(self, host, port, user, password, db):
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self._pool = None
async def pool(self):
if not self._pool:
self._pool = await aiomysql.create_pool(
host=self.host, port=self.port, user=self.user,
password=self.password, db=self.db
)
return self._pool
async def close(self):
if not self._pool:
return
self._pool.close()
await self._pool.wait_closed()
mysql = MysqlWrapper(
host='localhost', port=3306, user='root',
password='', db='mysql'
)
def mysql_context(wrapper):
def _mysql_context(func):
async def __mysql_context(*args, **kwargs):
pool = await wrapper.pool()
async with pool.acquire() as conn:
await conn.set_charset('utf8')
r = await func(conn=conn, *args, **kwargs)
await conn.commit()
return r
return __mysql_context
return _mysql_context
@mysql_context(mysql)
async def mysql_test(conn=None):
async with conn.cursor(DictCursor) as cur:
await cur.execute('SELECT Host,User FROM user')
print(cur.rowcount)
print(await cur.fetchone())
print(await cur.fetchall())
async def close_pool():
await mysql.close()
print('Close mysql pool')
loop = asyncio.get_event_loop()
loop.run_until_complete(mysql_test())
loop.run_until_complete(close_pool())
loop.close()
import asyncio
import aioredis
class RedisWrapper:
def __init__(self, host, port, db):
self.host = host
self.port = port
self.db = db
self._pool = None
async def pool(self):
if not self._pool:
self._pool = await aioredis.create_pool((self.host, self.port), db=self.db)
return self._pool
async def close(self):
if not self._pool:
return
self._pool.close()
await self._pool.wait_closed()
redis = RedisWrapper('localhost', 6379, 0)
def redis_context(wrapper):
def _redis_context(func):
async def __redis_context(*args, **kwargs):
pool = await wrapper.pool()
async with pool.get() as conn:
r = await func(conn=conn, *args, **kwargs)
return r
return __redis_context
return _redis_context
@redis_context(redis)
async def redis_test(conn=None):
print(await conn.keys('*'))
async def close_pool():
await redis.close()
print('Close redis pool')
loop = asyncio.get_event_loop()
loop.run_until_complete(redis_test())
loop.run_until_complete(close_pool())
loop.close()
import asyncio
import aiohttp
class SessionWrapper:
def __init__(self):
self._session = None
async def session(self):
if not self._session:
self._session = aiohttp.ClientSession()
return self._session
async def close(self):
if not self._session:
return
self._session.close()
sess = SessionWrapper()
async def http_test():
session = await sess.session()
async with session.get('http://www.cip.cc/') as resp:
print(await resp.text())
async def close_session():
await sess.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(http_test())
loop.run_until_complete(close_session())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment