Skip to Content
DocsResilience

Resilience

Build robust LLM applications with circuit breakers, retry policies, rate limiting, and failover mechanisms. These features help you handle failures gracefully and maintain service availability in production environments.

Rate Limiting

Prevent API rate limit violations by controlling request and token throughput. Parsec includes a token bucket algorithm that smoothly throttles requests to stay within provider limits.

Basic Rate Limiting

from parsec import EnforcementEngine from parsec.models.adapters import OpenAIAdapter from parsec.validators import PydanticValidator from parsec.resilience import RateLimiter # Create rate limiter with requests and token limits rate_limiter = RateLimiter( requests_per_minute=60, # OpenAI tier 1 limit tokens_per_minute=90_000 # OpenAI tier 1 limit ) # Create engine with rate limiting adapter = OpenAIAdapter(api_key="your-key", model="gpt-4o-mini") validator = PydanticValidator() engine = EnforcementEngine( adapter, validator, rate_limiter=rate_limiter ) # Requests are automatically throttled result = await engine.enforce(prompt, schema)

Per-Provider Rate Limiting

When using multiple providers, each can have independent rate limits:

from parsec.resilience import PerProviderRateLimiter, PROVIDER_LIMITS # Create per-provider rate limiter rate_limiter = PerProviderRateLimiter() # Configure OpenAI with tier 1 limits openai_config = PROVIDER_LIMITS['openai']['tier_1'] rate_limiter.set_provider_limits( 'openai', requests_per_minute=openai_config.requests_per_minute, # 60 tokens_per_minute=openai_config.tokens_per_minute # 90,000 ) # Configure Anthropic with tier 1 limits anthropic_config = PROVIDER_LIMITS['anthropic']['tier_1'] rate_limiter.set_provider_limits( 'anthropic', requests_per_minute=anthropic_config.requests_per_minute, # 50 tokens_per_minute=anthropic_config.tokens_per_minute # 40,000 ) # Each provider respects its own limits openai_engine = EnforcementEngine(openai_adapter, validator, rate_limiter=rate_limiter) anthropic_engine = EnforcementEngine(anthropic_adapter, validator, rate_limiter=rate_limiter)

Predefined Provider Limits

Parsec includes built-in rate limits for common providers:

from parsec.resilience import PROVIDER_LIMITS # OpenAI limits PROVIDER_LIMITS['openai']['tier_1'] # 60 req/min, 90K tokens/min PROVIDER_LIMITS['openai']['tier_2'] # 500 req/min, 450K tokens/min # Anthropic limits PROVIDER_LIMITS['anthropic']['tier_1'] # 50 req/min, 40K tokens/min PROVIDER_LIMITS['anthropic']['tier_2'] # 1000 req/min, 80K tokens/min # Gemini limits PROVIDER_LIMITS['gemini']['free'] # 15 req/min, 32K tokens/min PROVIDER_LIMITS['gemini']['paid'] # 1000 req/min, 4M tokens/min

Multi-Dimensional Rate Limits

You can set limits across multiple time windows:

rate_limiter = RateLimiter( requests_per_minute=60, tokens_per_minute=90_000, requests_per_day=10_000, # Daily request cap tokens_per_day=2_000_000 # Daily token cap )

Monitoring Rate Limits

Track usage and capacity in real-time:

# Get statistics stats = rate_limiter.get_stats() print(f"Total requests: {stats['total_requests']}") print(f"Total tokens: {stats['total_tokens']}") # Check available capacity capacity = stats['available_capacity'] print(f"Request capacity: {capacity['requests_per_minute']['available']}") print(f"Token capacity: {capacity['tokens_per_minute']['available']}") print(f"Utilization: {capacity['requests_per_minute']['utilization']}")

Circuit Breakers

Prevent cascade failures by temporarily blocking requests to failing services. Circuit breakers detect when a service is unhealthy and stop sending requests until it recovers.

Basic Usage

from parsec import EnforcementEngine from parsec.resilience import CircuitBreakerConfig # Enable circuit breaker on the engine engine = EnforcementEngine( adapter, validator, use_circuit_breaker=True, circuit_breaker_config=CircuitBreakerConfig( failure_threshold=5, success_threshold=2, timeout=60.0 ) ) # Circuit breaker protects automatically result = await engine.enforce(prompt, schema)

Manual Circuit Breaker

from parsec.resilience import CircuitBreaker, CircuitBreakerState, CircuitBreakerConfig # Configure circuit breaker config = CircuitBreakerConfig( failure_threshold=5, # Open after 5 failures success_threshold=2, # Close after 2 successes in half-open timeout=60.0 # Wait 60 seconds before trying again ) circuit = CircuitBreaker(name="openai_circuit", config=config) # Use with function calls async def call_llm(): return await adapter.generate(prompt, schema) result = await circuit.call(call_llm) # Check circuit state state = circuit.get_state() print(f"Circuit state: {state['state']}") print(f"Failure count: {state['failure_count']}")

Circuit States

The circuit breaker transitions through three states:

  • CLOSED: Normal operation - all requests pass through
  • OPEN: Too many failures detected - requests are blocked
  • HALF_OPEN: Testing recovery - limited requests allowed
CLOSED --[5 failures]--> OPEN OPEN --[60 seconds]--> HALF_OPEN HALF_OPEN --[2 successes]--> CLOSED HALF_OPEN --[any failure]--> OPEN

Resetting Circuit Breakers

Manually reset a circuit breaker when needed:

# Reset the circuit breaker await circuit.reset() # Check state after reset print(circuit.get_state()['state']) # CLOSED

Retry Policies

Automatically retry failed operations with exponential backoff. The enforcement engine has built-in retry logic with configurable policies.

Exponential Backoff

from parsec.resilience import ExponentialBackoff # Create backoff strategy backoff = ExponentialBackoff( base=1.0, # Start with 1 second max_delay=30.0, # Cap at 30 seconds jitter=True # Add randomness to prevent thundering herd ) # Calculate delay for attempt delay = backoff.calculate(attempt=0) # ~1.0 seconds delay = backoff.calculate(attempt=1) # ~2.0 seconds delay = backoff.calculate(attempt=2) # ~4.0 seconds # Use with async sleep await backoff.sleep(attempt)

Built-in Retry Policies

The enforcement engine uses retry policies for different operation types:

from parsec.resilience import RetryPolicy, OperationType, DEFAULT_POLICIES # Generation operations (LLM API calls) gen_policy = DEFAULT_POLICIES[OperationType.GENERATION] # - max_attempts: 3 # - base_delay: 1.0s # - max_delay: 30.0s # - timeout: 120.0s # - retryable_exceptions: (TimeoutError, ConnectionError, OSError) # Validation operations val_policy = DEFAULT_POLICIES[OperationType.VALIDATION] # - max_attempts: 1 (no retry) # - timeout: 5.0s

Custom Retry Policy

Create your own retry policy for specific needs:

from parsec.resilience import RetryPolicy custom_policy = RetryPolicy( max_attempts=5, base_delay=2.0, max_delay=60.0, retryable_exceptions=(TimeoutError, ConnectionError), timeout=180.0 ) # Use with engine engine = EnforcementEngine( adapter, validator, retry_policy=custom_policy )

Failover

Automatically switch between providers when the primary fails. Failover chains try adapters in sequence until one succeeds.

Basic Failover

from parsec.resilience import FailoverChain from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter from parsec import EnforcementEngine # Create adapters for different providers openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini") anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022") # Create failover chain - tries adapters in order failover = FailoverChain([openai, anthropic]) # Use failover as the adapter engine = EnforcementEngine(failover, validator) # Automatically tries OpenAI first, falls back to Anthropic on failure result = await engine.enforce(prompt, schema)

Multi-Provider Failover

from parsec.resilience import FailoverChain from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter, GeminiAdapter # Create multiple adapters openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini") anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022") gemini = GeminiAdapter(api_key="key3", model="gemini-1.5-flash") # Create failover chain with all providers failover = FailoverChain([openai, anthropic, gemini]) # Engine will try each adapter until one succeeds engine = EnforcementEngine(failover, validator) result = await engine.enforce(prompt, schema)

Failover with Caching

The failover chain generates a composite model identifier for caching:

from parsec.cache import InMemoryCache # Cache key includes all adapters in the chain cache = InMemoryCache() engine = EnforcementEngine(failover, validator, cache=cache) # Model identifier: "failover[openai:gpt-4o-mini,anthropic:claude-3-5-haiku-20241022]" print(failover.model)

Complete Production Example

Combine all resilience features for a production-ready setup:

from parsec import EnforcementEngine from parsec.resilience import ( FailoverChain, CircuitBreakerConfig, PerProviderRateLimiter, PROVIDER_LIMITS, RetryPolicy ) from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter from parsec.validators import PydanticValidator from parsec.cache import InMemoryCache from pydantic import BaseModel # Define schema class SentimentAnalysis(BaseModel): sentiment: str confidence: float summary: str # Create adapters openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini") anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022") # Set up per-provider rate limiting rate_limiter = PerProviderRateLimiter() rate_limiter.set_provider_limits( 'openai', **PROVIDER_LIMITS['openai']['tier_1'].__dict__ ) rate_limiter.set_provider_limits( 'anthropic', **PROVIDER_LIMITS['anthropic']['tier_1'].__dict__ ) # Create failover chain failover = FailoverChain([openai, anthropic]) # Configure circuit breaker circuit_config = CircuitBreakerConfig( failure_threshold=5, success_threshold=2, timeout=60.0 ) # Custom retry policy retry_policy = RetryPolicy( max_attempts=3, base_delay=1.0, max_delay=30.0 ) # Create engine with all resilience features cache = InMemoryCache(max_size=100, default_ttl=3600) validator = PydanticValidator() engine = EnforcementEngine( adapter=failover, validator=validator, cache=cache, rate_limiter=rate_limiter, retry_policy=retry_policy, use_circuit_breaker=True, circuit_breaker_config=circuit_config ) # Use with full production resilience result = await engine.enforce( "Analyze sentiment: Great product, highly recommend!", SentimentAnalysis ) print(result.data) # SentimentAnalysis(sentiment='positive', confidence=0.95, summary='...')

Best Practices

Rate Limiting

  • Set realistic limits: Match your API tier limits exactly
  • Monitor usage: Track stats to avoid hitting limits
  • Handle bursts: Token bucket allows short bursts while maintaining long-term limits
  • Per-provider limits: Use PerProviderRateLimiter when working with multiple providers

Circuit Breaker Configuration

# Production-ready settings config = CircuitBreakerConfig( failure_threshold=5, # 5-10 failures is typical success_threshold=2, # 2-3 successes for recovery timeout=60.0 # 30-120 seconds typical )

Retry Configuration

# Recommended for LLM calls retry = RetryPolicy( max_attempts=3, # 3-5 retries typical base_delay=1.0, # Start with 1 second max_delay=30.0, # Cap at 30 seconds timeout=120.0 # 2 minutes for LLM calls )

Failover Strategy

  • Order by speed/cost: Put fastest and cheapest providers first
  • Monitor failover rate: High failover indicates primary issues
  • Consider cost: Backup providers may have different pricing
  • Use consistent schemas: Ensure all providers support your schema format

Error Handling

Retryable vs Non-Retryable Errors

The default retry policy distinguishes between errors that should and shouldn’t be retried:

# Retryable errors (temporary issues) RETRYABLE_ERRORS = ( TimeoutError, ConnectionError, OSError ) # Non-retryable errors (permanent issues) # - ValueError # - AuthenticationError # - InvalidRequestError

Custom Error Handling

# Define custom retryable exceptions custom_policy = RetryPolicy( max_attempts=3, base_delay=1.0, max_delay=30.0, retryable_exceptions=( TimeoutError, ConnectionError, RuntimeError # Add custom exception ) )

API Reference

RateLimiter

class RateLimiter: def __init__( self, requests_per_minute: Optional[int] = None, tokens_per_minute: Optional[int] = None, requests_per_day: Optional[int] = None, tokens_per_day: Optional[int] = None ) async def acquire(self, estimated_tokens: int = 0) -> None def get_stats(self) -> dict

PerProviderRateLimiter

class PerProviderRateLimiter: def __init__(self) def set_provider_limits( self, provider: str, requests_per_minute: Optional[int] = None, tokens_per_minute: Optional[int] = None, requests_per_day: Optional[int] = None, tokens_per_day: Optional[int] = None ) -> None async def acquire(self, provider: str, estimated_tokens: int = 0) -> None def get_stats(self) -> dict

CircuitBreaker

class CircuitBreaker: def __init__( self, name: str, config: Optional[CircuitBreakerConfig] = None ) async def call(self, func: Callable, *args, **kwargs) -> Any async def reset(self) -> None def get_state(self) -> dict

CircuitBreakerConfig

@dataclass class CircuitBreakerConfig: failure_threshold: int = 5 success_threshold: int = 2 timeout: float = 60.0

ExponentialBackoff

class ExponentialBackoff: def __init__( self, base: float = 1.0, max_delay: float = 60.0, jitter: bool = True ) def calculate(self, attempt: int) -> float async def sleep(self, attempt: int) -> None

FailoverChain

class FailoverChain: def __init__(self, adapters: List[BaseLLMAdapter]) @property def model(self) -> str async def generate( self, prompt: str, schema: Optional[dict] = None, **kwargs ) -> GenerationResponse

RetryPolicy

@dataclass class RetryPolicy: max_attempts: int base_delay: float max_delay: float retryable_exceptions: tuple = (Exception,) timeout: Optional[float] = None def is_retryable(self, exception: Exception) -> bool
Last updated on