Resilience
Build robust LLM applications with circuit breakers, retry policies, rate limiting, and failover mechanisms. These features help you handle failures gracefully and maintain service availability in production environments.
Rate Limiting
Prevent API rate limit violations by controlling request and token throughput. Parsec includes a token bucket algorithm that smoothly throttles requests to stay within provider limits.
Basic Rate Limiting
from parsec import EnforcementEngine
from parsec.models.adapters import OpenAIAdapter
from parsec.validators import PydanticValidator
from parsec.resilience import RateLimiter
# Create rate limiter with requests and token limits
rate_limiter = RateLimiter(
requests_per_minute=60, # OpenAI tier 1 limit
tokens_per_minute=90_000 # OpenAI tier 1 limit
)
# Create engine with rate limiting
adapter = OpenAIAdapter(api_key="your-key", model="gpt-4o-mini")
validator = PydanticValidator()
engine = EnforcementEngine(
adapter,
validator,
rate_limiter=rate_limiter
)
# Requests are automatically throttled
result = await engine.enforce(prompt, schema)Per-Provider Rate Limiting
When using multiple providers, each can have independent rate limits:
from parsec.resilience import PerProviderRateLimiter, PROVIDER_LIMITS
# Create per-provider rate limiter
rate_limiter = PerProviderRateLimiter()
# Configure OpenAI with tier 1 limits
openai_config = PROVIDER_LIMITS['openai']['tier_1']
rate_limiter.set_provider_limits(
'openai',
requests_per_minute=openai_config.requests_per_minute, # 60
tokens_per_minute=openai_config.tokens_per_minute # 90,000
)
# Configure Anthropic with tier 1 limits
anthropic_config = PROVIDER_LIMITS['anthropic']['tier_1']
rate_limiter.set_provider_limits(
'anthropic',
requests_per_minute=anthropic_config.requests_per_minute, # 50
tokens_per_minute=anthropic_config.tokens_per_minute # 40,000
)
# Each provider respects its own limits
openai_engine = EnforcementEngine(openai_adapter, validator, rate_limiter=rate_limiter)
anthropic_engine = EnforcementEngine(anthropic_adapter, validator, rate_limiter=rate_limiter)Predefined Provider Limits
Parsec includes built-in rate limits for common providers:
from parsec.resilience import PROVIDER_LIMITS
# OpenAI limits
PROVIDER_LIMITS['openai']['tier_1'] # 60 req/min, 90K tokens/min
PROVIDER_LIMITS['openai']['tier_2'] # 500 req/min, 450K tokens/min
# Anthropic limits
PROVIDER_LIMITS['anthropic']['tier_1'] # 50 req/min, 40K tokens/min
PROVIDER_LIMITS['anthropic']['tier_2'] # 1000 req/min, 80K tokens/min
# Gemini limits
PROVIDER_LIMITS['gemini']['free'] # 15 req/min, 32K tokens/min
PROVIDER_LIMITS['gemini']['paid'] # 1000 req/min, 4M tokens/minMulti-Dimensional Rate Limits
You can set limits across multiple time windows:
rate_limiter = RateLimiter(
requests_per_minute=60,
tokens_per_minute=90_000,
requests_per_day=10_000, # Daily request cap
tokens_per_day=2_000_000 # Daily token cap
)Monitoring Rate Limits
Track usage and capacity in real-time:
# Get statistics
stats = rate_limiter.get_stats()
print(f"Total requests: {stats['total_requests']}")
print(f"Total tokens: {stats['total_tokens']}")
# Check available capacity
capacity = stats['available_capacity']
print(f"Request capacity: {capacity['requests_per_minute']['available']}")
print(f"Token capacity: {capacity['tokens_per_minute']['available']}")
print(f"Utilization: {capacity['requests_per_minute']['utilization']}")Circuit Breakers
Prevent cascade failures by temporarily blocking requests to failing services. Circuit breakers detect when a service is unhealthy and stop sending requests until it recovers.
Basic Usage
from parsec import EnforcementEngine
from parsec.resilience import CircuitBreakerConfig
# Enable circuit breaker on the engine
engine = EnforcementEngine(
adapter,
validator,
use_circuit_breaker=True,
circuit_breaker_config=CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout=60.0
)
)
# Circuit breaker protects automatically
result = await engine.enforce(prompt, schema)Manual Circuit Breaker
from parsec.resilience import CircuitBreaker, CircuitBreakerState, CircuitBreakerConfig
# Configure circuit breaker
config = CircuitBreakerConfig(
failure_threshold=5, # Open after 5 failures
success_threshold=2, # Close after 2 successes in half-open
timeout=60.0 # Wait 60 seconds before trying again
)
circuit = CircuitBreaker(name="openai_circuit", config=config)
# Use with function calls
async def call_llm():
return await adapter.generate(prompt, schema)
result = await circuit.call(call_llm)
# Check circuit state
state = circuit.get_state()
print(f"Circuit state: {state['state']}")
print(f"Failure count: {state['failure_count']}")Circuit States
The circuit breaker transitions through three states:
- CLOSED: Normal operation - all requests pass through
- OPEN: Too many failures detected - requests are blocked
- HALF_OPEN: Testing recovery - limited requests allowed
CLOSED --[5 failures]--> OPEN
OPEN --[60 seconds]--> HALF_OPEN
HALF_OPEN --[2 successes]--> CLOSED
HALF_OPEN --[any failure]--> OPENResetting Circuit Breakers
Manually reset a circuit breaker when needed:
# Reset the circuit breaker
await circuit.reset()
# Check state after reset
print(circuit.get_state()['state']) # CLOSEDRetry Policies
Automatically retry failed operations with exponential backoff. The enforcement engine has built-in retry logic with configurable policies.
Exponential Backoff
from parsec.resilience import ExponentialBackoff
# Create backoff strategy
backoff = ExponentialBackoff(
base=1.0, # Start with 1 second
max_delay=30.0, # Cap at 30 seconds
jitter=True # Add randomness to prevent thundering herd
)
# Calculate delay for attempt
delay = backoff.calculate(attempt=0) # ~1.0 seconds
delay = backoff.calculate(attempt=1) # ~2.0 seconds
delay = backoff.calculate(attempt=2) # ~4.0 seconds
# Use with async sleep
await backoff.sleep(attempt)Built-in Retry Policies
The enforcement engine uses retry policies for different operation types:
from parsec.resilience import RetryPolicy, OperationType, DEFAULT_POLICIES
# Generation operations (LLM API calls)
gen_policy = DEFAULT_POLICIES[OperationType.GENERATION]
# - max_attempts: 3
# - base_delay: 1.0s
# - max_delay: 30.0s
# - timeout: 120.0s
# - retryable_exceptions: (TimeoutError, ConnectionError, OSError)
# Validation operations
val_policy = DEFAULT_POLICIES[OperationType.VALIDATION]
# - max_attempts: 1 (no retry)
# - timeout: 5.0sCustom Retry Policy
Create your own retry policy for specific needs:
from parsec.resilience import RetryPolicy
custom_policy = RetryPolicy(
max_attempts=5,
base_delay=2.0,
max_delay=60.0,
retryable_exceptions=(TimeoutError, ConnectionError),
timeout=180.0
)
# Use with engine
engine = EnforcementEngine(
adapter,
validator,
retry_policy=custom_policy
)Failover
Automatically switch between providers when the primary fails. Failover chains try adapters in sequence until one succeeds.
Basic Failover
from parsec.resilience import FailoverChain
from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter
from parsec import EnforcementEngine
# Create adapters for different providers
openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini")
anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022")
# Create failover chain - tries adapters in order
failover = FailoverChain([openai, anthropic])
# Use failover as the adapter
engine = EnforcementEngine(failover, validator)
# Automatically tries OpenAI first, falls back to Anthropic on failure
result = await engine.enforce(prompt, schema)Multi-Provider Failover
from parsec.resilience import FailoverChain
from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter, GeminiAdapter
# Create multiple adapters
openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini")
anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022")
gemini = GeminiAdapter(api_key="key3", model="gemini-1.5-flash")
# Create failover chain with all providers
failover = FailoverChain([openai, anthropic, gemini])
# Engine will try each adapter until one succeeds
engine = EnforcementEngine(failover, validator)
result = await engine.enforce(prompt, schema)Failover with Caching
The failover chain generates a composite model identifier for caching:
from parsec.cache import InMemoryCache
# Cache key includes all adapters in the chain
cache = InMemoryCache()
engine = EnforcementEngine(failover, validator, cache=cache)
# Model identifier: "failover[openai:gpt-4o-mini,anthropic:claude-3-5-haiku-20241022]"
print(failover.model)Complete Production Example
Combine all resilience features for a production-ready setup:
from parsec import EnforcementEngine
from parsec.resilience import (
FailoverChain,
CircuitBreakerConfig,
PerProviderRateLimiter,
PROVIDER_LIMITS,
RetryPolicy
)
from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter
from parsec.validators import PydanticValidator
from parsec.cache import InMemoryCache
from pydantic import BaseModel
# Define schema
class SentimentAnalysis(BaseModel):
sentiment: str
confidence: float
summary: str
# Create adapters
openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini")
anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022")
# Set up per-provider rate limiting
rate_limiter = PerProviderRateLimiter()
rate_limiter.set_provider_limits(
'openai',
**PROVIDER_LIMITS['openai']['tier_1'].__dict__
)
rate_limiter.set_provider_limits(
'anthropic',
**PROVIDER_LIMITS['anthropic']['tier_1'].__dict__
)
# Create failover chain
failover = FailoverChain([openai, anthropic])
# Configure circuit breaker
circuit_config = CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout=60.0
)
# Custom retry policy
retry_policy = RetryPolicy(
max_attempts=3,
base_delay=1.0,
max_delay=30.0
)
# Create engine with all resilience features
cache = InMemoryCache(max_size=100, default_ttl=3600)
validator = PydanticValidator()
engine = EnforcementEngine(
adapter=failover,
validator=validator,
cache=cache,
rate_limiter=rate_limiter,
retry_policy=retry_policy,
use_circuit_breaker=True,
circuit_breaker_config=circuit_config
)
# Use with full production resilience
result = await engine.enforce(
"Analyze sentiment: Great product, highly recommend!",
SentimentAnalysis
)
print(result.data)
# SentimentAnalysis(sentiment='positive', confidence=0.95, summary='...')Best Practices
Rate Limiting
- Set realistic limits: Match your API tier limits exactly
- Monitor usage: Track stats to avoid hitting limits
- Handle bursts: Token bucket allows short bursts while maintaining long-term limits
- Per-provider limits: Use
PerProviderRateLimiterwhen working with multiple providers
Circuit Breaker Configuration
# Production-ready settings
config = CircuitBreakerConfig(
failure_threshold=5, # 5-10 failures is typical
success_threshold=2, # 2-3 successes for recovery
timeout=60.0 # 30-120 seconds typical
)Retry Configuration
# Recommended for LLM calls
retry = RetryPolicy(
max_attempts=3, # 3-5 retries typical
base_delay=1.0, # Start with 1 second
max_delay=30.0, # Cap at 30 seconds
timeout=120.0 # 2 minutes for LLM calls
)Failover Strategy
- Order by speed/cost: Put fastest and cheapest providers first
- Monitor failover rate: High failover indicates primary issues
- Consider cost: Backup providers may have different pricing
- Use consistent schemas: Ensure all providers support your schema format
Error Handling
Retryable vs Non-Retryable Errors
The default retry policy distinguishes between errors that should and shouldn’t be retried:
# Retryable errors (temporary issues)
RETRYABLE_ERRORS = (
TimeoutError,
ConnectionError,
OSError
)
# Non-retryable errors (permanent issues)
# - ValueError
# - AuthenticationError
# - InvalidRequestErrorCustom Error Handling
# Define custom retryable exceptions
custom_policy = RetryPolicy(
max_attempts=3,
base_delay=1.0,
max_delay=30.0,
retryable_exceptions=(
TimeoutError,
ConnectionError,
RuntimeError # Add custom exception
)
)API Reference
RateLimiter
class RateLimiter:
def __init__(
self,
requests_per_minute: Optional[int] = None,
tokens_per_minute: Optional[int] = None,
requests_per_day: Optional[int] = None,
tokens_per_day: Optional[int] = None
)
async def acquire(self, estimated_tokens: int = 0) -> None
def get_stats(self) -> dictPerProviderRateLimiter
class PerProviderRateLimiter:
def __init__(self)
def set_provider_limits(
self,
provider: str,
requests_per_minute: Optional[int] = None,
tokens_per_minute: Optional[int] = None,
requests_per_day: Optional[int] = None,
tokens_per_day: Optional[int] = None
) -> None
async def acquire(self, provider: str, estimated_tokens: int = 0) -> None
def get_stats(self) -> dictCircuitBreaker
class CircuitBreaker:
def __init__(
self,
name: str,
config: Optional[CircuitBreakerConfig] = None
)
async def call(self, func: Callable, *args, **kwargs) -> Any
async def reset(self) -> None
def get_state(self) -> dictCircuitBreakerConfig
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
success_threshold: int = 2
timeout: float = 60.0ExponentialBackoff
class ExponentialBackoff:
def __init__(
self,
base: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
)
def calculate(self, attempt: int) -> float
async def sleep(self, attempt: int) -> NoneFailoverChain
class FailoverChain:
def __init__(self, adapters: List[BaseLLMAdapter])
@property
def model(self) -> str
async def generate(
self,
prompt: str,
schema: Optional[dict] = None,
**kwargs
) -> GenerationResponseRetryPolicy
@dataclass
class RetryPolicy:
max_attempts: int
base_delay: float
max_delay: float
retryable_exceptions: tuple = (Exception,)
timeout: Optional[float] = None
def is_retryable(self, exception: Exception) -> bool