Skip to content

Instantly share code, notes, and snippets.

@mvandermeulen
Forked from cmin764/async-redis-cache.py
Created August 17, 2025 03:28
Show Gist options
  • Save mvandermeulen/9a6bc04db5a449edea08dde28edae928 to your computer and use it in GitHub Desktop.
Save mvandermeulen/9a6bc04db5a449edea08dde28edae928 to your computer and use it in GitHub Desktop.
Async Redis caching (with cluster mode support)
import os
from redis.asyncio import Redis, RedisCluster
from servicelib.logging import get_our_logger
logger = get_our_logger(__name__)
class RedisCache:
"""Async Redis-based cache with cluster mode support."""
SOCKET_TIMEOUT = 3 # seconds to connect
def __init__(self, redis_url: str):
self._redis_url = redis_url
self._client: Redis | RedisCluster | None = None
self._available: bool | None = None
@classmethod
async def create(cls, redis_url: str) -> "RedisCache":
redis_cache = RedisCache(redis_url)
await redis_cache.initialize()
return redis_cache
@staticmethod
def _enabled() -> bool:
# Takes into account any env var update during reloads.
return os.getenv("STRATEGY_CACHE_ENABLED", "false").lower() == "true"
@property
def enabled(self) -> bool:
"""Returns `True` when caching is both enabled and available."""
return self._enabled() and self._available
async def initialize(self):
if not self._enabled():
logger.warning("Redis caching not enabled!")
return
# Ensure the right Redis client at initialization time. (fix tests)
cluster_mode = os.getenv("CACHE_CLUSTER_MODE", "true").lower() == "true"
if cluster_mode:
RedisClient = RedisCluster
redis_type = "Redis-cluster"
else:
RedisClient = Redis
redis_type = "Redis-simple"
try:
self._client = RedisClient.from_url(
self._redis_url, socket_timeout=self.SOCKET_TIMEOUT
)
# Test the connection.
await self._client.ping()
except Exception as exc:
logger.error("Could not connect to %s: %s", redis_type, exc)
self._client = None
self._available = False
else:
logger.info("%s caching enabled and available.", redis_type)
self._available = True
async def get(self, key: str) -> str | None:
if not self.enabled:
return None
value = await self._client.get(key)
return value.decode("utf-8") if value else None
async def set(self, key: str, value: str, expire_time: int = 300):
if self.enabled:
await self._client.setex(key, expire_time, value)
async def clear(self):
if self.enabled:
await self._client.flushdb()
async def close(self):
if self._client:
await self._client.aclose()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment