Skip to content

Instantly share code, notes, and snippets.

@h1code2
Last active March 11, 2024 08:14
Show Gist options
  • Save h1code2/f3bba962a694c126ae320a5d87cd5875 to your computer and use it in GitHub Desktop.
Save h1code2/f3bba962a694c126ae320a5d87cd5875 to your computer and use it in GitHub Desktop.
fastapi操作异步redis模块aioredis #fastapi #aioredis

fastapi操作异步redis模块aioredis

import aioredis
from fastapi import FastAPI
from api.views import router as api_router
from ws.views import router as wss_router
from config import redis_config
async def redis_pool(db: int = 0):
"""
redis连接池
:return:
"""
redis = await aioredis.create_redis_pool(
f"redis://:{redis_config.get('password')}@{redis_config.get('host')}/{db}?encoding=utf-8"
)
return redis
def create_app():
application = FastAPI()
application.include_router(api_router, prefix="/api")
application.include_router(wss_router, prefix="/ws")
return application
app = create_app()
@app.on_event("startup")
async def create_redis():
app.state.redis = await redis_pool()
@app.on_event("shutdown")
async def close_redis():
await app.state.redis.close()
from fastapi import APIRouter, Request
from .response import Response
router = APIRouter()
@router.get("/")
async def api_index():
return "/api"
@router.get("/online_devices", response_model=Response)
async def online_devices(request: Request):
redis_client = request.app.state.redis
keys = await redis_client.hgetall("online_devices")
return Response(message="successfully", data=keys)
@yhyu13
Copy link

yhyu13 commented Feb 6, 2024

已经不需要redis pool了把,2.0版本之后

@yongk
Copy link

yongk commented Mar 11, 2024

有pub/sub的例子么

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment