Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import logging
- import time
- from contextlib import asynccontextmanager
- from dataclasses import dataclass
- from dataclasses import field
- from functools import wraps
- from typing import Any
- from typing import Callable
- from typing import Dict
- from typing import Optional
- from typing import TypeVar
- from typing import Union
- import redis.asyncio as redis
- from redis.asyncio import ConnectionPool
- from tenacity import AsyncRetrying
- from tenacity import retry_if_exception_type
- from tenacity import RetryError
- from tenacity import stop_after_attempt
- from tenacity import wait_exponential
- from core.settings import app_env_settings as settings
- logger = logging.getLogger(__name__)
- T = TypeVar("T")
- @dataclass(frozen=True)
- class RedisRetryConfig:
- """Immutable retry configuration."""
- max_attempts: int = settings.REDIS_MAX_ATTEMPTS
- base_delay: float = settings.REDIS_BASE_DELAY
- max_delay: float = settings.REDIS_MAX_DELAY
- exponential_base: float = settings.REDIS_EXPONENTIAL_BASE
- retry_exceptions: tuple = field(
- default_factory=lambda: (
- redis.ConnectionError,
- redis.TimeoutError,
- redis.RedisError,
- ConnectionRefusedError,
- TimeoutError,
- )
- )
- circuit_breaker_threshold: int = settings.REDIS_CIRCUIT_BREAKER_THRESHOLD
- circuit_breaker_timeout: float = settings.REDIS_CIRCUIT_BREAKER_TIMEOUT
- class CircuitBreaker:
- """Simple circuit breaker for Redis operations."""
- def __init__(self, threshold: int, timeout: float):
- self.threshold = threshold
- self.timeout = timeout
- self.failure_count = 0
- self.last_failure_time = 0
- self.state = "closed" # closed, open, half-open
- self._lock = asyncio.Lock()
- async def call(self, func: Callable, *args, **kwargs):
- async with self._lock:
- if self.state == "open":
- if time.time() - self.last_failure_time > self.timeout:
- self.state = "half-open"
- logger.info("Circuit breaker moving to half-open state")
- else:
- raise redis.ConnectionError("Circuit breaker is open")
- try:
- result = await func(*args, **kwargs)
- if self.state == "half-open":
- self.reset()
- return result
- except Exception as e:
- self.record_failure()
- raise e
- def record_failure(self):
- self.failure_count += 1
- self.last_failure_time = time.time()
- if self.failure_count >= self.threshold and self.state == "closed":
- self.state = "open"
- logger.warning(
- f"Circuit breaker opened after {self.failure_count} failures"
- )
- def reset(self):
- self.failure_count = 0
- self.state = "closed"
- logger.info("Circuit breaker reset")
- def redis_retry(
- config: Optional[RedisRetryConfig] = None,
- operation_name: Optional[str] = None,
- use_circuit_breaker: bool = True,
- ) -> Callable:
- """
- Decorator for Redis operations with retry logic and circuit breaker.
- """
- retry_config = config or RedisRetryConfig()
- def decorator(func: Callable[..., T]) -> Callable[..., T]:
- circuit_breaker = (
- CircuitBreaker(
- retry_config.circuit_breaker_threshold,
- retry_config.circuit_breaker_timeout,
- )
- if use_circuit_breaker
- else None
- )
- @wraps(func)
- async def wrapper(*args, **kwargs) -> T:
- op_name = operation_name or func.__name__
- async def execute_with_retry():
- try:
- async for attempt in AsyncRetrying(
- stop=stop_after_attempt(retry_config.max_attempts),
- wait=wait_exponential(
- multiplier=retry_config.base_delay,
- max=retry_config.max_delay,
- exp_base=retry_config.exponential_base,
- ),
- retry=retry_if_exception_type(retry_config.retry_exceptions),
- reraise=True,
- ):
- with attempt:
- return await func(*args, **kwargs)
- except RetryError as e:
- logger.error(
- f"[Redis][{op_name}] Failed after {retry_config.max_attempts} attempts: {e.last_attempt.exception()}"
- )
- raise e.last_attempt.exception() from e
- if circuit_breaker:
- return await circuit_breaker.call(execute_with_retry)
- else:
- return await execute_with_retry()
- return wrapper
- return decorator
- class RedisRetryPipeline:
- """Redis pipeline wrapper with retry logic."""
- def __init__(self, pipeline: redis.client.Pipeline, config: RedisRetryConfig):
- self.pipeline = pipeline
- self.config = config
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- self.pipeline.reset() # reset() is synchronous
- @property
- def _retry_decorator(self):
- return redis_retry(self.config, "pipeline_execute")
- async def execute(self) -> list:
- """Execute pipeline with retry logic."""
- decorated_execute = self._retry_decorator(self.pipeline.execute)
- return await decorated_execute()
- def __getattr__(self, name: str) -> Any:
- """Delegate all other attributes to the underlying pipeline."""
- return getattr(self.pipeline, name)
- class RedisWithRetry(redis.Redis):
- """
- Enhanced Redis client with comprehensive retry logic and circuit breaker.
- """
- def __init__(self, retry_config: Optional[RedisRetryConfig] = None, **kwargs):
- # Set _core_methods before calling parent constructor to avoid circular dependency
- self._core_methods = {
- "ping",
- "get",
- "set",
- "setex",
- "delete",
- "exists",
- "expire",
- "sadd",
- "srem",
- "smembers",
- "sismember",
- "scard",
- "zadd",
- "zrem",
- "zrange",
- "zcard",
- "zrangebyscore",
- "keys",
- "info",
- "flushdb",
- "flushall",
- "incr",
- "decr",
- "hget",
- "hset",
- "hgetall",
- "hdel",
- "hexists",
- "hkeys",
- "hvals",
- "lpush",
- "rpush",
- "lpop",
- "rpop",
- "llen",
- "lrange",
- "lindex",
- "publish",
- "eval",
- "evalsha",
- "script_load",
- "script_exists",
- }
- self.retry_config = retry_config or RedisRetryConfig()
- # Now call parent constructor
- super().__init__(**kwargs)
- def __getattribute__(self, name: str) -> Any:
- """
- Override attribute access to automatically add retry logic to Redis methods.
- """
- # Handle special cases to avoid infinite recursion
- if name in ["_core_methods", "retry_config"]:
- return super().__getattribute__(name)
- attr = super().__getattribute__(name)
- # Only wrap async methods that are core Redis operations
- if (
- name in self._core_methods
- and callable(attr)
- and asyncio.iscoroutinefunction(attr)
- ):
- retry_config = super().__getattribute__("retry_config")
- return redis_retry(retry_config, name)(attr)
- return attr
- def pipeline(
- self, transaction: bool = True, shard_hint: Optional[str] = None
- ) -> RedisRetryPipeline:
- """Create a pipeline with retry logic."""
- pipeline = super().pipeline(transaction=transaction, shard_hint=shard_hint)
- return RedisRetryPipeline(pipeline, self.retry_config)
- async def health_check(self) -> Dict[str, Any]:
- """Comprehensive health check for Redis connection."""
- try:
- start_time = time.time()
- await self.ping()
- ping_time = time.time() - start_time
- info = await self.info()
- return {
- "status": "healthy",
- "ping_time_ms": round(ping_time * 1000, 2),
- "connected_clients": info.get("connected_clients", 0),
- "used_memory": info.get("used_memory", 0),
- "used_memory_human": info.get("used_memory_human", "0B"),
- "redis_version": info.get("redis_version", "unknown"),
- }
- except Exception as e:
- return {
- "status": "unhealthy",
- "error": str(e),
- "error_type": type(e).__name__,
- }
- class RedisClientManager:
- """Thread-safe singleton manager for Redis client."""
- def __init__(self):
- self._client: Optional[RedisWithRetry] = None
- self._connection_pool: Optional[ConnectionPool] = None
- self._lock = asyncio.Lock()
- self._retry_config = RedisRetryConfig()
- async def get_client(
- self, retry_config: Optional[RedisRetryConfig] = None
- ) -> RedisWithRetry:
- """Get singleton Redis client with retry logic and connection pooling."""
- if self._client is None:
- async with self._lock:
- if self._client is None:
- await self._initialize_client(retry_config)
- return self._client
- async def _initialize_client(self, retry_config: Optional[RedisRetryConfig] = None):
- """Initialize Redis client with connection pooling."""
- self._retry_config = retry_config or RedisRetryConfig()
- try:
- # Create connection pool with optimized settings
- self._connection_pool = ConnectionPool.from_url(
- settings.REDIS_URL,
- max_connections=getattr(settings, "REDIS_CONNECTION_POOL", 20),
- retry_on_timeout=True,
- socket_keepalive=True,
- socket_keepalive_options={},
- health_check_interval=30,
- socket_connect_timeout=5,
- socket_timeout=5,
- )
- # Create Redis client with connection pool
- self._client = RedisWithRetry(
- connection_pool=self._connection_pool, retry_config=self._retry_config
- )
- # Test connection
- await self._client.ping()
- logger.info(
- f"Redis client initialized successfully with retry config: {self._retry_config}"
- )
- except Exception as e:
- logger.error(f"Failed to initialize Redis client: {str(e)}")
- await self._cleanup()
- raise
- async def close(self):
- """Close Redis client and connection pool."""
- async with self._lock:
- await self._cleanup()
- async def _cleanup(self):
- """Internal cleanup method."""
- try:
- if self._client:
- await self._client.close()
- self._client = None
- logger.info("Redis client closed")
- if self._connection_pool:
- await self._connection_pool.disconnect()
- self._connection_pool = None
- logger.info("Redis connection pool closed")
- except Exception as e:
- logger.error(f"Error during Redis cleanup: {str(e)}")
- async def health_check(self) -> Dict[str, Any]:
- """Perform health check on Redis connection."""
- try:
- client = await self.get_client()
- return await client.health_check()
- except Exception as e:
- return {
- "status": "unhealthy",
- "error": str(e),
- "error_type": type(e).__name__,
- }
- @asynccontextmanager
- async def get_pipeline(self):
- """Context manager for Redis pipeline operations."""
- client = await self.get_client()
- async with client.pipeline() as pipeline:
- yield pipeline
- # Global singleton instance
- _redis_manager = RedisClientManager()
- # Public API functions
- async def get_redis_client(
- retry_config: Optional[RedisRetryConfig] = None,
- ) -> RedisWithRetry:
- """
- Get singleton Redis client with retry logic and connection pooling.
- Args:
- retry_config: Optional custom retry configuration
- Returns:
- RedisWithRetry: Redis client with built-in retry logic
- """
- return await _redis_manager.get_client(retry_config)
- async def close_redis_client() -> None:
- """Close the global Redis client and connection pool."""
- await _redis_manager.close()
- async def redis_health_check() -> Dict[str, Any]:
- """Perform comprehensive health check on Redis connection."""
- return await _redis_manager.health_check()
- @asynccontextmanager
- async def redis_pipeline():
- """Context manager for Redis pipeline operations with retry logic."""
- async with _redis_manager.get_pipeline() as pipeline:
- yield pipeline
- def create_redis_client(
- redis_url: str, retry_config: Optional[RedisRetryConfig] = None, **kwargs
- ) -> RedisWithRetry:
- """
- Create a new Redis client instance (not singleton).
- Args:
- redis_url: Redis connection URL
- retry_config: Optional retry configuration
- **kwargs: Additional Redis client arguments
- Returns:
- RedisWithRetry: New Redis client instance
- """
- return RedisWithRetry.from_url(
- redis_url, retry_config=retry_config or RedisRetryConfig(), **kwargs
- )
Advertisement
Add Comment
Please, Sign In to add comment