Guest User

Building a Robust Redis Client with Aioredis and Tenacity

a guest
Jul 7th, 2025
24
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 14.00 KB | Software | 0 0
  1. import asyncio
  2. import logging
  3. import time
  4. from contextlib import asynccontextmanager
  5. from dataclasses import dataclass
  6. from dataclasses import field
  7. from functools import wraps
  8. from typing import Any
  9. from typing import Callable
  10. from typing import Dict
  11. from typing import Optional
  12. from typing import TypeVar
  13. from typing import Union
  14.  
  15. import redis.asyncio as redis
  16. from redis.asyncio import ConnectionPool
  17. from tenacity import AsyncRetrying
  18. from tenacity import retry_if_exception_type
  19. from tenacity import RetryError
  20. from tenacity import stop_after_attempt
  21. from tenacity import wait_exponential
  22.  
  23. from core.settings import app_env_settings as settings
  24.  
  25. logger = logging.getLogger(__name__)
  26.  
  27. T = TypeVar("T")
  28.  
  29.  
  30. @dataclass(frozen=True)
  31. class RedisRetryConfig:
  32.     """Immutable retry configuration."""
  33.  
  34.     max_attempts: int = settings.REDIS_MAX_ATTEMPTS
  35.     base_delay: float = settings.REDIS_BASE_DELAY
  36.     max_delay: float = settings.REDIS_MAX_DELAY
  37.     exponential_base: float = settings.REDIS_EXPONENTIAL_BASE
  38.     retry_exceptions: tuple = field(
  39.         default_factory=lambda: (
  40.             redis.ConnectionError,
  41.             redis.TimeoutError,
  42.             redis.RedisError,
  43.             ConnectionRefusedError,
  44.             TimeoutError,
  45.         )
  46.     )
  47.     circuit_breaker_threshold: int = settings.REDIS_CIRCUIT_BREAKER_THRESHOLD
  48.     circuit_breaker_timeout: float = settings.REDIS_CIRCUIT_BREAKER_TIMEOUT
  49.  
  50.  
  51. class CircuitBreaker:
  52.     """Simple circuit breaker for Redis operations."""
  53.  
  54.     def __init__(self, threshold: int, timeout: float):
  55.         self.threshold = threshold
  56.         self.timeout = timeout
  57.         self.failure_count = 0
  58.         self.last_failure_time = 0
  59.         self.state = "closed"  # closed, open, half-open
  60.         self._lock = asyncio.Lock()
  61.  
  62.     async def call(self, func: Callable, *args, **kwargs):
  63.         async with self._lock:
  64.             if self.state == "open":
  65.                 if time.time() - self.last_failure_time > self.timeout:
  66.                     self.state = "half-open"
  67.                     logger.info("Circuit breaker moving to half-open state")
  68.                 else:
  69.                     raise redis.ConnectionError("Circuit breaker is open")
  70.  
  71.             try:
  72.                 result = await func(*args, **kwargs)
  73.                 if self.state == "half-open":
  74.                     self.reset()
  75.                 return result
  76.             except Exception as e:
  77.                 self.record_failure()
  78.                 raise e
  79.  
  80.     def record_failure(self):
  81.         self.failure_count += 1
  82.         self.last_failure_time = time.time()
  83.  
  84.         if self.failure_count >= self.threshold and self.state == "closed":
  85.             self.state = "open"
  86.             logger.warning(
  87.                 f"Circuit breaker opened after {self.failure_count} failures"
  88.             )
  89.  
  90.     def reset(self):
  91.         self.failure_count = 0
  92.         self.state = "closed"
  93.         logger.info("Circuit breaker reset")
  94.  
  95.  
  96. def redis_retry(
  97.     config: Optional[RedisRetryConfig] = None,
  98.     operation_name: Optional[str] = None,
  99.     use_circuit_breaker: bool = True,
  100. ) -> Callable:
  101.     """
  102.    Decorator for Redis operations with retry logic and circuit breaker.
  103.    """
  104.     retry_config = config or RedisRetryConfig()
  105.  
  106.     def decorator(func: Callable[..., T]) -> Callable[..., T]:
  107.         circuit_breaker = (
  108.             CircuitBreaker(
  109.                 retry_config.circuit_breaker_threshold,
  110.                 retry_config.circuit_breaker_timeout,
  111.             )
  112.             if use_circuit_breaker
  113.             else None
  114.         )
  115.  
  116.         @wraps(func)
  117.         async def wrapper(*args, **kwargs) -> T:
  118.             op_name = operation_name or func.__name__
  119.  
  120.             async def execute_with_retry():
  121.                 try:
  122.                     async for attempt in AsyncRetrying(
  123.                         stop=stop_after_attempt(retry_config.max_attempts),
  124.                         wait=wait_exponential(
  125.                             multiplier=retry_config.base_delay,
  126.                             max=retry_config.max_delay,
  127.                             exp_base=retry_config.exponential_base,
  128.                         ),
  129.                         retry=retry_if_exception_type(retry_config.retry_exceptions),
  130.                         reraise=True,
  131.                     ):
  132.                         with attempt:
  133.                             return await func(*args, **kwargs)
  134.                 except RetryError as e:
  135.                     logger.error(
  136.                         f"[Redis][{op_name}] Failed after {retry_config.max_attempts} attempts: {e.last_attempt.exception()}"
  137.                     )
  138.                     raise e.last_attempt.exception() from e
  139.  
  140.             if circuit_breaker:
  141.                 return await circuit_breaker.call(execute_with_retry)
  142.             else:
  143.                 return await execute_with_retry()
  144.  
  145.         return wrapper
  146.  
  147.     return decorator
  148.  
  149.  
  150. class RedisRetryPipeline:
  151.     """Redis pipeline wrapper with retry logic."""
  152.  
  153.     def __init__(self, pipeline: redis.client.Pipeline, config: RedisRetryConfig):
  154.         self.pipeline = pipeline
  155.         self.config = config
  156.  
  157.     async def __aenter__(self):
  158.         return self
  159.  
  160.     async def __aexit__(self, exc_type, exc_val, exc_tb):
  161.         self.pipeline.reset()  # reset() is synchronous
  162.  
  163.     @property
  164.     def _retry_decorator(self):
  165.         return redis_retry(self.config, "pipeline_execute")
  166.  
  167.     async def execute(self) -> list:
  168.         """Execute pipeline with retry logic."""
  169.         decorated_execute = self._retry_decorator(self.pipeline.execute)
  170.         return await decorated_execute()
  171.  
  172.     def __getattr__(self, name: str) -> Any:
  173.         """Delegate all other attributes to the underlying pipeline."""
  174.         return getattr(self.pipeline, name)
  175.  
  176.  
  177. class RedisWithRetry(redis.Redis):
  178.     """
  179.    Enhanced Redis client with comprehensive retry logic and circuit breaker.
  180.    """
  181.  
  182.     def __init__(self, retry_config: Optional[RedisRetryConfig] = None, **kwargs):
  183.         # Set _core_methods before calling parent constructor to avoid circular dependency
  184.         self._core_methods = {
  185.             "ping",
  186.             "get",
  187.             "set",
  188.             "setex",
  189.             "delete",
  190.             "exists",
  191.             "expire",
  192.             "sadd",
  193.             "srem",
  194.             "smembers",
  195.             "sismember",
  196.             "scard",
  197.             "zadd",
  198.             "zrem",
  199.             "zrange",
  200.             "zcard",
  201.             "zrangebyscore",
  202.             "keys",
  203.             "info",
  204.             "flushdb",
  205.             "flushall",
  206.             "incr",
  207.             "decr",
  208.             "hget",
  209.             "hset",
  210.             "hgetall",
  211.             "hdel",
  212.             "hexists",
  213.             "hkeys",
  214.             "hvals",
  215.             "lpush",
  216.             "rpush",
  217.             "lpop",
  218.             "rpop",
  219.             "llen",
  220.             "lrange",
  221.             "lindex",
  222.             "publish",
  223.             "eval",
  224.             "evalsha",
  225.             "script_load",
  226.             "script_exists",
  227.         }
  228.         self.retry_config = retry_config or RedisRetryConfig()
  229.  
  230.         # Now call parent constructor
  231.         super().__init__(**kwargs)
  232.  
  233.     def __getattribute__(self, name: str) -> Any:
  234.         """
  235.        Override attribute access to automatically add retry logic to Redis methods.
  236.        """
  237.         # Handle special cases to avoid infinite recursion
  238.         if name in ["_core_methods", "retry_config"]:
  239.             return super().__getattribute__(name)
  240.  
  241.         attr = super().__getattribute__(name)
  242.  
  243.         # Only wrap async methods that are core Redis operations
  244.         if (
  245.             name in self._core_methods
  246.             and callable(attr)
  247.             and asyncio.iscoroutinefunction(attr)
  248.         ):
  249.  
  250.             retry_config = super().__getattribute__("retry_config")
  251.             return redis_retry(retry_config, name)(attr)
  252.  
  253.         return attr
  254.  
  255.     def pipeline(
  256.         self, transaction: bool = True, shard_hint: Optional[str] = None
  257.     ) -> RedisRetryPipeline:
  258.         """Create a pipeline with retry logic."""
  259.         pipeline = super().pipeline(transaction=transaction, shard_hint=shard_hint)
  260.         return RedisRetryPipeline(pipeline, self.retry_config)
  261.  
  262.     async def health_check(self) -> Dict[str, Any]:
  263.         """Comprehensive health check for Redis connection."""
  264.         try:
  265.             start_time = time.time()
  266.             await self.ping()
  267.             ping_time = time.time() - start_time
  268.  
  269.             info = await self.info()
  270.  
  271.             return {
  272.                 "status": "healthy",
  273.                 "ping_time_ms": round(ping_time * 1000, 2),
  274.                 "connected_clients": info.get("connected_clients", 0),
  275.                 "used_memory": info.get("used_memory", 0),
  276.                 "used_memory_human": info.get("used_memory_human", "0B"),
  277.                 "redis_version": info.get("redis_version", "unknown"),
  278.             }
  279.         except Exception as e:
  280.             return {
  281.                 "status": "unhealthy",
  282.                 "error": str(e),
  283.                 "error_type": type(e).__name__,
  284.             }
  285.  
  286.  
  287. class RedisClientManager:
  288.     """Thread-safe singleton manager for Redis client."""
  289.  
  290.     def __init__(self):
  291.         self._client: Optional[RedisWithRetry] = None
  292.         self._connection_pool: Optional[ConnectionPool] = None
  293.         self._lock = asyncio.Lock()
  294.         self._retry_config = RedisRetryConfig()
  295.  
  296.     async def get_client(
  297.         self, retry_config: Optional[RedisRetryConfig] = None
  298.     ) -> RedisWithRetry:
  299.         """Get singleton Redis client with retry logic and connection pooling."""
  300.         if self._client is None:
  301.             async with self._lock:
  302.                 if self._client is None:
  303.                     await self._initialize_client(retry_config)
  304.         return self._client
  305.  
  306.     async def _initialize_client(self, retry_config: Optional[RedisRetryConfig] = None):
  307.         """Initialize Redis client with connection pooling."""
  308.         self._retry_config = retry_config or RedisRetryConfig()
  309.  
  310.         try:
  311.             # Create connection pool with optimized settings
  312.             self._connection_pool = ConnectionPool.from_url(
  313.                 settings.REDIS_URL,
  314.                 max_connections=getattr(settings, "REDIS_CONNECTION_POOL", 20),
  315.                 retry_on_timeout=True,
  316.                 socket_keepalive=True,
  317.                 socket_keepalive_options={},
  318.                 health_check_interval=30,
  319.                 socket_connect_timeout=5,
  320.                 socket_timeout=5,
  321.             )
  322.  
  323.             # Create Redis client with connection pool
  324.             self._client = RedisWithRetry(
  325.                 connection_pool=self._connection_pool, retry_config=self._retry_config
  326.             )
  327.  
  328.             # Test connection
  329.             await self._client.ping()
  330.             logger.info(
  331.                 f"Redis client initialized successfully with retry config: {self._retry_config}"
  332.             )
  333.  
  334.         except Exception as e:
  335.             logger.error(f"Failed to initialize Redis client: {str(e)}")
  336.             await self._cleanup()
  337.             raise
  338.  
  339.     async def close(self):
  340.         """Close Redis client and connection pool."""
  341.         async with self._lock:
  342.             await self._cleanup()
  343.  
  344.     async def _cleanup(self):
  345.         """Internal cleanup method."""
  346.         try:
  347.             if self._client:
  348.                 await self._client.close()
  349.                 self._client = None
  350.                 logger.info("Redis client closed")
  351.  
  352.             if self._connection_pool:
  353.                 await self._connection_pool.disconnect()
  354.                 self._connection_pool = None
  355.                 logger.info("Redis connection pool closed")
  356.         except Exception as e:
  357.             logger.error(f"Error during Redis cleanup: {str(e)}")
  358.  
  359.     async def health_check(self) -> Dict[str, Any]:
  360.         """Perform health check on Redis connection."""
  361.         try:
  362.             client = await self.get_client()
  363.             return await client.health_check()
  364.         except Exception as e:
  365.             return {
  366.                 "status": "unhealthy",
  367.                 "error": str(e),
  368.                 "error_type": type(e).__name__,
  369.             }
  370.  
  371.     @asynccontextmanager
  372.     async def get_pipeline(self):
  373.         """Context manager for Redis pipeline operations."""
  374.         client = await self.get_client()
  375.         async with client.pipeline() as pipeline:
  376.             yield pipeline
  377.  
  378.  
  379. # Global singleton instance
  380. _redis_manager = RedisClientManager()
  381.  
  382.  
  383. # Public API functions
  384. async def get_redis_client(
  385.     retry_config: Optional[RedisRetryConfig] = None,
  386. ) -> RedisWithRetry:
  387.     """
  388.    Get singleton Redis client with retry logic and connection pooling.
  389.  
  390.    Args:
  391.        retry_config: Optional custom retry configuration
  392.  
  393.    Returns:
  394.        RedisWithRetry: Redis client with built-in retry logic
  395.    """
  396.     return await _redis_manager.get_client(retry_config)
  397.  
  398.  
  399. async def close_redis_client() -> None:
  400.     """Close the global Redis client and connection pool."""
  401.     await _redis_manager.close()
  402.  
  403.  
  404. async def redis_health_check() -> Dict[str, Any]:
  405.     """Perform comprehensive health check on Redis connection."""
  406.     return await _redis_manager.health_check()
  407.  
  408.  
  409. @asynccontextmanager
  410. async def redis_pipeline():
  411.     """Context manager for Redis pipeline operations with retry logic."""
  412.     async with _redis_manager.get_pipeline() as pipeline:
  413.         yield pipeline
  414.  
  415.  
  416. def create_redis_client(
  417.     redis_url: str, retry_config: Optional[RedisRetryConfig] = None, **kwargs
  418. ) -> RedisWithRetry:
  419.     """
  420.    Create a new Redis client instance (not singleton).
  421.  
  422.    Args:
  423.        redis_url: Redis connection URL
  424.        retry_config: Optional retry configuration
  425.        **kwargs: Additional Redis client arguments
  426.  
  427.    Returns:
  428.        RedisWithRetry: New Redis client instance
  429.    """
  430.     return RedisWithRetry.from_url(
  431.         redis_url, retry_config=retry_config or RedisRetryConfig(), **kwargs
  432.     )
  433.  
Advertisement
Add Comment
Please, Sign In to add comment