Enforcement Engine
The EnforcementEngine is the core orchestration component that coordinates LLM generation, validation, retries, caching, and resilience features to ensure you get valid structured output.
Overview
The enforcement engine:
- Calls LLM adapters to generate responses
- Validates output against your schema
- Retries automatically with feedback when validation fails
- Caches successful results to reduce costs
- Applies rate limiting to prevent API violations
- Uses circuit breakers to handle provider failures gracefully
- Collects training data for future fine-tuning
Quick Start
from parsec import EnforcementEngine
from parsec.models.adapters import OpenAIAdapter
from parsec.validators import PydanticValidator
from pydantic import BaseModel
# Define schema
class Person(BaseModel):
name: str
age: int
email: str
# Create adapter and validator
adapter = OpenAIAdapter(api_key="your-key", model="gpt-4o-mini")
validator = PydanticValidator()
# Create enforcement engine
engine = EnforcementEngine(
adapter=adapter,
validator=validator,
max_retries=3
)
# Generate structured output
result = await engine.enforce(
"Extract: John Doe is 30, contact at john@example.com",
Person
)
print(result.data)
# Person(name='John Doe', age=30, email='john@example.com')Constructor Parameters
EnforcementEngine(
adapter: Union[BaseLLMAdapter, FailoverChain],
validator: BaseValidator,
max_retries: int = 3,
collector: Optional[DatasetCollector] = None,
cache: Optional[BaseCache] = None,
retry_policy: Optional[RetryPolicy] = None,
use_circuit_breaker: bool = False,
circuit_breaker_config: Optional[CircuitBreakerConfig] = None,
rate_limiter: Optional[Union[RateLimiter, PerProviderRateLimiter]] = None
)Required Parameters
| Parameter | Type | Description |
|---|---|---|
adapter | BaseLLMAdapter or FailoverChain | LLM adapter for generation (OpenAI, Anthropic, Gemini, etc.) |
validator | BaseValidator | Validator for checking output (JSONValidator or PydanticValidator) |
Optional Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
max_retries | int | 3 | Maximum retry attempts for validation failures |
collector | DatasetCollector | None | Collects successful generations for training datasets |
cache | BaseCache | None | Cache for storing successful results |
retry_policy | RetryPolicy | Default generation policy | Custom retry configuration with backoff and timeout |
use_circuit_breaker | bool | False | Enable circuit breaker for fault tolerance |
circuit_breaker_config | CircuitBreakerConfig | Default config | Circuit breaker thresholds and timeout |
rate_limiter | RateLimiter or PerProviderRateLimiter | None | Rate limiter to prevent API violations |
The enforce() Method
The main method for generating structured output:
result = await engine.enforce(
prompt: str,
schema: Any,
**kwargs
) -> EnforcedOutputParameters
- prompt (
str): The input prompt for the LLM - schema (
Any): Pydantic model or JSON schema for validation - kwargs: Additional parameters passed to the LLM (temperature, max_tokens, etc.)
Returns
EnforcedOutput object containing:
class EnforcedOutput:
data: Any # Validated, parsed output
generation: GenerationResponse # Raw LLM response details
validation: ValidationResult # Validation details
retry_count: int # Number of retries needed
success: bool # Whether enforcement succeededHow It Works
The enforcement engine follows this flow:
1. Check cache (if enabled)
↓
2. Apply rate limiting (if enabled)
↓
3. Generate LLM output (with circuit breaker if enabled)
↓
4. Validate output against schema
↓
5. Valid? → Store in cache → Return success
↓
6. Invalid? → Retry with validation feedback → Repeat from step 3
↓
7. Max retries reached → Return failureBasic Usage
Simple Enforcement
from parsec import EnforcementEngine
from parsec.models.adapters import OpenAIAdapter
from parsec.validators import JSONValidator
adapter = OpenAIAdapter(api_key="key", model="gpt-4o-mini")
validator = JSONValidator()
engine = EnforcementEngine(adapter, validator)
schema = {
"type": "object",
"properties": {
"sentiment": {"type": "string"},
"confidence": {"type": "number"}
},
"required": ["sentiment", "confidence"]
}
result = await engine.enforce(
"Analyze sentiment: This product is amazing!",
schema
)
print(result.data)
# {'sentiment': 'positive', 'confidence': 0.95}
print(result.retry_count) # 0 (succeeded on first try)With Pydantic Schema
from pydantic import BaseModel, Field
from parsec.validators import PydanticValidator
class SentimentAnalysis(BaseModel):
sentiment: str = Field(description="positive, negative, or neutral")
confidence: float = Field(ge=0.0, le=1.0)
summary: str
validator = PydanticValidator()
engine = EnforcementEngine(adapter, validator)
result = await engine.enforce(
"Analyze: This product exceeded all my expectations!",
SentimentAnalysis
)
print(result.data)
# SentimentAnalysis(sentiment='positive', confidence=0.92, summary='...')With LLM Parameters
Pass additional parameters to control generation:
result = await engine.enforce(
prompt,
schema,
temperature=0.2, # Lower temperature for more deterministic output
max_tokens=500, # Limit response length
top_p=0.9 # Nucleus sampling parameter
)Production Configuration
Combine all features for production-ready enforcement:
from parsec import EnforcementEngine
from parsec.models.adapters import OpenAIAdapter, AnthropicAdapter
from parsec.validators import PydanticValidator
from parsec.cache import InMemoryCache
from parsec.resilience import (
FailoverChain,
CircuitBreakerConfig,
PerProviderRateLimiter,
PROVIDER_LIMITS,
RetryPolicy
)
from parsec.training.collector import DatasetCollector
# Set up adapters with failover
openai = OpenAIAdapter(api_key="key1", model="gpt-4o-mini")
anthropic = AnthropicAdapter(api_key="key2", model="claude-3-5-haiku-20241022")
failover = FailoverChain([openai, anthropic])
# Configure rate limiting
rate_limiter = PerProviderRateLimiter()
rate_limiter.set_provider_limits('openai', **PROVIDER_LIMITS['openai']['tier_1'].__dict__)
rate_limiter.set_provider_limits('anthropic', **PROVIDER_LIMITS['anthropic']['tier_1'].__dict__)
# Set up circuit breaker
circuit_config = CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout=60.0
)
# Custom retry policy
retry_policy = RetryPolicy(
max_attempts=3,
base_delay=1.0,
max_delay=30.0,
timeout=120.0
)
# Create cache
cache = InMemoryCache(max_size=100, default_ttl=3600)
# Set up dataset collector
collector = DatasetCollector(output_dir="./training_data")
# Create production engine
engine = EnforcementEngine(
adapter=failover,
validator=PydanticValidator(),
max_retries=3,
cache=cache,
rate_limiter=rate_limiter,
retry_policy=retry_policy,
use_circuit_breaker=True,
circuit_breaker_config=circuit_config,
collector=collector
)
# Use with full production resilience
result = await engine.enforce(prompt, schema)Retry Behavior
Automatic Retries
When validation fails, the engine automatically retries with feedback:
# First attempt: Invalid output
# Engine sends: "Your previous response was invalid. Errors: [...]"
# Second attempt: Still invalid
# Engine sends updated feedback with specific error details
# Third attempt: Valid!
# Returns successful resultRetry Policy
Control retry behavior:
from parsec.resilience import RetryPolicy
custom_policy = RetryPolicy(
max_attempts=5, # More retries
base_delay=2.0, # Start with 2 second delay
max_delay=60.0, # Cap at 60 seconds
timeout=180.0, # 3 minute timeout per attempt
retryable_exceptions=( # Which errors to retry
TimeoutError,
ConnectionError
)
)
engine = EnforcementEngine(
adapter,
validator,
retry_policy=custom_policy
)Exponential Backoff
Retries use exponential backoff with jitter:
Attempt 1: Immediate
Attempt 2: ~1 second delay
Attempt 3: ~2 second delay
Attempt 4: ~4 second delay
...
Max delay: 30 seconds (configurable)Error Handling
Handling Enforcement Failures
result = await engine.enforce(prompt, schema)
if result.success:
print(f"Success! Data: {result.data}")
print(f"Retries needed: {result.retry_count}")
else:
print(f"Failed after {result.retry_count} retries")
print(f"Errors: {result.validation.errors}")
# Handle failure - log, alert, fallback, etc.Retryable vs Non-Retryable Errors
The engine distinguishes between temporary and permanent errors:
# Retryable errors (temporary issues):
- TimeoutError
- ConnectionError
- OSError
→ Engine will retry
# Non-retryable errors (permanent issues):
- ValueError
- AuthenticationError
- InvalidRequestError
→ Engine fails immediatelyCaching Integration
The engine automatically handles caching:
from parsec.cache import InMemoryCache
cache = InMemoryCache(max_size=100, default_ttl=3600)
engine = EnforcementEngine(adapter, validator, cache=cache)
# First call - cache miss, calls API
result1 = await engine.enforce(prompt, schema)
# Second identical call - cache hit, no API call
result2 = await engine.enforce(prompt, schema)
# Check cache performance
print(cache.get_stats())
# {'hits': 1, 'misses': 1, 'hit_rate': '50.00%'}Cache keys are automatically generated from:
- Prompt text
- Model identifier
- Schema definition
- Generation parameters (temperature, etc.)
Dataset Collection
Collect successful generations for training:
from parsec.training.collector import DatasetCollector
collector = DatasetCollector(output_dir="./training_data")
engine = EnforcementEngine(adapter, validator, collector=collector)
# Generate outputs
for prompt in prompts:
result = await engine.enforce(prompt, schema)
# Successful results are automatically collected
# Export collected data
collector.export_to_jsonl("dataset.jsonl")Best Practices
1. Use Appropriate Retry Limits
# For critical operations
engine = EnforcementEngine(adapter, validator, max_retries=5)
# For non-critical operations
engine = EnforcementEngine(adapter, validator, max_retries=1)2. Enable Caching for Repeated Queries
# High-traffic applications with repeated queries
cache = InMemoryCache(max_size=1000, default_ttl=3600)
engine = EnforcementEngine(adapter, validator, cache=cache)3. Use Circuit Breakers in Production
# Prevent cascade failures
engine = EnforcementEngine(
adapter,
validator,
use_circuit_breaker=True,
circuit_breaker_config=CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout=60.0
)
)4. Apply Rate Limiting
from parsec.resilience import RateLimiter
# Match your API tier limits
rate_limiter = RateLimiter(
requests_per_minute=60,
tokens_per_minute=90_000
)
engine = EnforcementEngine(adapter, validator, rate_limiter=rate_limiter)5. Monitor Retry Counts
result = await engine.enforce(prompt, schema)
if result.retry_count > 2:
logger.warning(f"High retry count: {result.retry_count} for prompt: {prompt[:50]}")
# Consider revising prompt or schemaAPI Reference
EnforcementEngine
class EnforcementEngine:
def __init__(
self,
adapter: Union[BaseLLMAdapter, FailoverChain],
validator: BaseValidator,
max_retries: int = 3,
collector: Optional[DatasetCollector] = None,
cache: Optional[BaseCache] = None,
retry_policy: Optional[RetryPolicy] = None,
use_circuit_breaker: bool = False,
circuit_breaker_config: Optional[CircuitBreakerConfig] = None,
rate_limiter: Optional[Union[RateLimiter, PerProviderRateLimiter]] = None
)
async def enforce(
self,
prompt: str,
schema: Any,
**kwargs
) -> EnforcedOutputEnforcedOutput
class EnforcedOutput(BaseModel):
data: Any # Validated output
generation: GenerationResponse # LLM response metadata
validation: ValidationResult # Validation details
retry_count: int # Number of retries
success: bool # Whether enforcement succeededGenerationResponse
class GenerationResponse(BaseModel):
output: str # Raw LLM output text
provider: str # Provider name (openai, anthropic, etc.)
model: str # Model identifier
tokens_used: int # Tokens consumed
latency_ms: float # Generation latency in milliseconds
timestamp: datetime # When generation occurredValidationResult
class ValidationResult(BaseModel):
status: ValidationStatus # VALID, INVALID, etc.
parsed_output: Optional[Any] # Parsed data if valid
errors: List[ValidationError] # Validation errors if invalid
raw_output: str # Original text
repair_attempted: bool # Whether repair was tried
repair_successful: bool # Whether repair succeededReady to learn about validators? Check out Validators →