Created
July 1, 2017 04:17
-
-
Save syfun/56feffcb0e5b772b45eca0e365b030e9 to your computer and use it in GitHub Desktop.
python async mysql redis http
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiomysql | |
from aiomysql.cursors import DictCursor | |
class MysqlWrapper: | |
def __init__(self, host, port, user, password, db): | |
self.host = host | |
self.port = port | |
self.user = user | |
self.password = password | |
self.db = db | |
self._pool = None | |
async def pool(self): | |
if not self._pool: | |
self._pool = await aiomysql.create_pool( | |
host=self.host, port=self.port, user=self.user, | |
password=self.password, db=self.db | |
) | |
return self._pool | |
async def close(self): | |
if not self._pool: | |
return | |
self._pool.close() | |
await self._pool.wait_closed() | |
mysql = MysqlWrapper( | |
host='localhost', port=3306, user='root', | |
password='', db='mysql' | |
) | |
def mysql_context(wrapper): | |
def _mysql_context(func): | |
async def __mysql_context(*args, **kwargs): | |
pool = await wrapper.pool() | |
async with pool.acquire() as conn: | |
await conn.set_charset('utf8') | |
r = await func(conn=conn, *args, **kwargs) | |
await conn.commit() | |
return r | |
return __mysql_context | |
return _mysql_context | |
@mysql_context(mysql) | |
async def mysql_test(conn=None): | |
async with conn.cursor(DictCursor) as cur: | |
await cur.execute('SELECT Host,User FROM user') | |
print(cur.rowcount) | |
print(await cur.fetchone()) | |
print(await cur.fetchall()) | |
async def close_pool(): | |
await mysql.close() | |
print('Close mysql pool') | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(mysql_test()) | |
loop.run_until_complete(close_pool()) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aioredis | |
class RedisWrapper: | |
def __init__(self, host, port, db): | |
self.host = host | |
self.port = port | |
self.db = db | |
self._pool = None | |
async def pool(self): | |
if not self._pool: | |
self._pool = await aioredis.create_pool((self.host, self.port), db=self.db) | |
return self._pool | |
async def close(self): | |
if not self._pool: | |
return | |
self._pool.close() | |
await self._pool.wait_closed() | |
redis = RedisWrapper('localhost', 6379, 0) | |
def redis_context(wrapper): | |
def _redis_context(func): | |
async def __redis_context(*args, **kwargs): | |
pool = await wrapper.pool() | |
async with pool.get() as conn: | |
r = await func(conn=conn, *args, **kwargs) | |
return r | |
return __redis_context | |
return _redis_context | |
@redis_context(redis) | |
async def redis_test(conn=None): | |
print(await conn.keys('*')) | |
async def close_pool(): | |
await redis.close() | |
print('Close redis pool') | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(redis_test()) | |
loop.run_until_complete(close_pool()) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiohttp | |
class SessionWrapper: | |
def __init__(self): | |
self._session = None | |
async def session(self): | |
if not self._session: | |
self._session = aiohttp.ClientSession() | |
return self._session | |
async def close(self): | |
if not self._session: | |
return | |
self._session.close() | |
sess = SessionWrapper() | |
async def http_test(): | |
session = await sess.session() | |
async with session.get('http://www.cip.cc/') as resp: | |
print(await resp.text()) | |
async def close_session(): | |
await sess.close() | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(http_test()) | |
loop.run_until_complete(close_session()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment