Skip to Content
DocsEngines

Enforcement Engine

The EnforcementEngine is the core orchestration component that coordinates LLM generation, validation, retries, caching, and resilience features to ensure you get valid structured output.

Overview

The enforcement engine:

  • Calls LLM adapters to generate responses
  • Validates output against your schema
  • Retries automatically with feedback when validation fails
  • Caches successful results to reduce costs
  • Applies rate limiting to prevent API violations
  • Uses circuit breakers to handle provider failures gracefully
  • Collects training data for future fine-tuning

Quick Start

from parsec import EnforcementEngine from parsec.models.adapters import OpenAIAdapter from parsec.validators import PydanticValidator from pydantic import BaseModel # Define schema class Person(BaseModel): name: str age: int email: str # Create adapter and validator adapter = OpenAIAdapter(api_key="your-key", model="gpt-4o-mini") validator = PydanticValidator() # Create enforcement engine engine = EnforcementEngine( adapter=adapter, validator=validator, max_retries=3 ) # Generate structured output result = await engine.enforce( "Extract: John Doe is 30, contact at john@example.com", Person ) print(result.data) # Person(name='John Doe', age=30, email='john@example.com')

Constructor Parameters

EnforcementEngine( adapter: Union[BaseLLMAdapter, FailoverChain], validator: BaseValidator, max_retries: int = 3, collector: Optional[DatasetCollector] = None, cache: Optional[BaseCache] = None, retry_policy: Optional[RetryPolicy] = None, use_circuit_breaker: bool = False, circuit_breaker_config: Optional[CircuitBreakerConfig] = None, rate_limiter: Optional[Union[RateLimiter, PerProviderRateLimiter]] = None )

Required Parameters

ParameterTypeDescription
adapterBaseLLMAdapter or FailoverChainLLM adapter for generation (OpenAI, Anthropic, Gemini, etc.)
validatorBaseValidatorValidator for checking output (JSONValidator or PydanticValidator)

Optional Parameters

ParameterTypeDefaultDescription
max_retriesint3Maximum retry attempts for validation failures
collectorDatasetCollectorNoneCollects successful generations for training datasets
cacheBaseCacheNoneCache for storing successful results
retry_policyRetryPolicyDefault generation policyCustom retry configuration with backoff and timeout
use_circuit_breakerboolFalseEnable circuit breaker for fault tolerance
circuit_breaker_configCircuitBreakerConfigDefault configCircuit breaker thresholds and timeout
rate_limiterRateLimiter or PerProviderRateLimiterNoneRate limiter to prevent API violations

The enforce() Method

The main method for generating structured output:

result = await engine.enforce( prompt: str, schema: Any, **kwargs ) -> EnforcedOutput

Parameters

  • prompt (str): The input prompt for the LLM
  • schema (Any): Pydantic model or JSON schema for validation
  • kwargs: Additional parameters passed to the LLM (temperature, max_tokens, etc.)

Returns

EnforcedOutput object containing:

class EnforcedOutput: data: Any # Validated, parsed output generation: GenerationResponse # Raw LLM response details validation: ValidationResult # Validation details retry_count: int # Number of retries needed success: bool # Whether enforcement succeeded

How It Works

The enforcement engine follows this flow:

1. Check cache (if enabled) 2. Apply rate limiting (if enabled) 3. Generate LLM output (with circuit breaker if enabled) 4. Validate output against schema 5. Valid? → Store in cache → Return success 6. Invalid? → Retry with validation feedback → Repeat from step 3 7. Max retries reached → Return failure

Basic Usage

Simple Enforcement

from parsec import EnforcementEngine from parsec.models.adapters import OpenAIAdapter from parsec.validators import JSONValidator adapter = OpenAIAdapter(api_key="key", model="gpt-4o-mini") validator = JSONValidator() engine = EnforcementEngine(adapter, validator) schema = { "type": "object", "properties": { "sentiment": {"type": "string"}, "confidence": {"type": "number"} }, "required": ["sentiment", "confidence"] } result = await engine.enforce( "Analyze sentiment: This product is amazing!", schema ) print(result.data) # {'sentiment': 'positive', 'confidence': 0.95} print(result.retry_count) # 0 (succeeded on first try)

With Pydantic Schema

from pydantic import BaseModel, Field from parsec.validators import PydanticValidator class SentimentAnalysis(BaseModel): sentiment: str = Field(description="positive, negative, or neutral") confidence: float = Field(ge=0.0, le=1.0) summary: str validator = PydanticValidator() engine = EnforcementEngine(adapter, validator) result = await engine.enforce( "Analyze: This product exceeded all my expectations!", SentimentAnalysis ) print(result.data) # SentimentAnalysis(sentiment='positive', confidence=0.92, summary='...')

With LLM Parameters

Pass additional parameters to control generation:

result = await engine.enforce( prompt, schema, temperature=0.2, # Lower temperature for more deterministic output max_tokens=500, # Limit response length top_p=0.9 # Nucleus sampling parameter )

Production Configuration

Combine all features for production-ready enforcement:

from parsec import EnforcementEngine from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter from parsec.validators import PydanticValidator from parsec.cache import InMemoryCache from parsec.resilience import ( FailoverChain, CircuitBreakerConfig, PerProviderRateLimiter, PROVIDER_LIMITS, RetryPolicy ) from parsec.training.collector import DatasetCollector # Set up adapters with failover openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini") anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022") failover = FailoverChain([openai, anthropic]) # Configure rate limiting rate_limiter = PerProviderRateLimiter() rate_limiter.set_provider_limits('openai', **PROVIDER_LIMITS['openai']['tier_1'].__dict__) rate_limiter.set_provider_limits('anthropic', **PROVIDER_LIMITS['anthropic']['tier_1'].__dict__) # Set up circuit breaker circuit_config = CircuitBreakerConfig( failure_threshold=5, success_threshold=2, timeout=60.0 ) # Custom retry policy retry_policy = RetryPolicy( max_attempts=3, base_delay=1.0, max_delay=30.0, timeout=120.0 ) # Create cache cache = InMemoryCache(max_size=100, default_ttl=3600) # Set up dataset collector collector = DatasetCollector(output_dir="./training_data") # Create production engine engine = EnforcementEngine( adapter=failover, validator=PydanticValidator(), max_retries=3, cache=cache, rate_limiter=rate_limiter, retry_policy=retry_policy, use_circuit_breaker=True, circuit_breaker_config=circuit_config, collector=collector ) # Use with full production resilience result = await engine.enforce(prompt, schema)

Retry Behavior

Automatic Retries

When validation fails, the engine automatically retries with feedback:

# First attempt: Invalid output # Engine sends: "Your previous response was invalid. Errors: [...]" # Second attempt: Still invalid # Engine sends updated feedback with specific error details # Third attempt: Valid! # Returns successful result

Retry Policy

Control retry behavior:

from parsec.resilience import RetryPolicy custom_policy = RetryPolicy( max_attempts=5, # More retries base_delay=2.0, # Start with 2 second delay max_delay=60.0, # Cap at 60 seconds timeout=180.0, # 3 minute timeout per attempt retryable_exceptions=( # Which errors to retry TimeoutError, ConnectionError ) ) engine = EnforcementEngine( adapter, validator, retry_policy=custom_policy )

Exponential Backoff

Retries use exponential backoff with jitter:

Attempt 1: Immediate Attempt 2: ~1 second delay Attempt 3: ~2 second delay Attempt 4: ~4 second delay ... Max delay: 30 seconds (configurable)

Error Handling

Handling Enforcement Failures

result = await engine.enforce(prompt, schema) if result.success: print(f"Success! Data: {result.data}") print(f"Retries needed: {result.retry_count}") else: print(f"Failed after {result.retry_count} retries") print(f"Errors: {result.validation.errors}") # Handle failure - log, alert, fallback, etc.

Retryable vs Non-Retryable Errors

The engine distinguishes between temporary and permanent errors:

# Retryable errors (temporary issues): - TimeoutError - ConnectionError - OSError → Engine will retry # Non-retryable errors (permanent issues): - ValueError - AuthenticationError - InvalidRequestError → Engine fails immediately

Caching Integration

The engine automatically handles caching:

from parsec.cache import InMemoryCache cache = InMemoryCache(max_size=100, default_ttl=3600) engine = EnforcementEngine(adapter, validator, cache=cache) # First call - cache miss, calls API result1 = await engine.enforce(prompt, schema) # Second identical call - cache hit, no API call result2 = await engine.enforce(prompt, schema) # Check cache performance print(cache.get_stats()) # {'hits': 1, 'misses': 1, 'hit_rate': '50.00%'}

Cache keys are automatically generated from:

  • Prompt text
  • Model identifier
  • Schema definition
  • Generation parameters (temperature, etc.)

Dataset Collection

Collect successful generations for training:

from parsec.training.collector import DatasetCollector collector = DatasetCollector(output_dir="./training_data") engine = EnforcementEngine(adapter, validator, collector=collector) # Generate outputs for prompt in prompts: result = await engine.enforce(prompt, schema) # Successful results are automatically collected # Export collected data collector.export_to_jsonl("dataset.jsonl")

Best Practices

1. Use Appropriate Retry Limits

# For critical operations engine = EnforcementEngine(adapter, validator, max_retries=5) # For non-critical operations engine = EnforcementEngine(adapter, validator, max_retries=1)

2. Enable Caching for Repeated Queries

# High-traffic applications with repeated queries cache = InMemoryCache(max_size=1000, default_ttl=3600) engine = EnforcementEngine(adapter, validator, cache=cache)

3. Use Circuit Breakers in Production

# Prevent cascade failures engine = EnforcementEngine( adapter, validator, use_circuit_breaker=True, circuit_breaker_config=CircuitBreakerConfig( failure_threshold=5, success_threshold=2, timeout=60.0 ) )

4. Apply Rate Limiting

from parsec.resilience import RateLimiter # Match your API tier limits rate_limiter = RateLimiter( requests_per_minute=60, tokens_per_minute=90_000 ) engine = EnforcementEngine(adapter, validator, rate_limiter=rate_limiter)

5. Monitor Retry Counts

result = await engine.enforce(prompt, schema) if result.retry_count > 2: logger.warning(f"High retry count: {result.retry_count} for prompt: {prompt[:50]}") # Consider revising prompt or schema

API Reference

EnforcementEngine

class EnforcementEngine: def __init__( self, adapter: Union[BaseLLMAdapter, FailoverChain], validator: BaseValidator, max_retries: int = 3, collector: Optional[DatasetCollector] = None, cache: Optional[BaseCache] = None, retry_policy: Optional[RetryPolicy] = None, use_circuit_breaker: bool = False, circuit_breaker_config: Optional[CircuitBreakerConfig] = None, rate_limiter: Optional[Union[RateLimiter, PerProviderRateLimiter]] = None ) async def enforce( self, prompt: str, schema: Any, **kwargs ) -> EnforcedOutput

EnforcedOutput

class EnforcedOutput(BaseModel): data: Any # Validated output generation: GenerationResponse # LLM response metadata validation: ValidationResult # Validation details retry_count: int # Number of retries success: bool # Whether enforcement succeeded

GenerationResponse

class GenerationResponse(BaseModel): output: str # Raw LLM output text provider: str # Provider name (openai, anthropic, etc.) model: str # Model identifier tokens_used: int # Tokens consumed latency_ms: float # Generation latency in milliseconds timestamp: datetime # When generation occurred

ValidationResult

class ValidationResult(BaseModel): status: ValidationStatus # VALID, INVALID, etc. parsed_output: Optional[Any] # Parsed data if valid errors: List[ValidationError] # Validation errors if invalid raw_output: str # Original text repair_attempted: bool # Whether repair was tried repair_successful: bool # Whether repair succeeded

Ready to learn about validators? Check out Validators →

Last updated on