Examples
This section provides examples of common use cases for working with OpenAI Structured Outputs using the openai-structured library.
Basic Examples
Movie Review Analysis
Extract structured movie reviews using OpenAI Structured Outputs with streaming:
import logging
from pydantic import BaseModel, Field
from openai import AsyncOpenAI, APIError, APITimeoutError
from openai_structured import async_openai_structured_stream, StreamConfig
from openai_structured.errors import (
StreamBufferError,
StreamInterruptedError,
StreamParseError,
ValidationError,
ModelNotSupportedError
)
from typing import Optional
# Configure application logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Library logging callback - the library does not use Python's logging infrastructure directly
# Instead, it calls this callback for all internal logs, giving you full control over log handling
def log_callback(level: int, message: str, data: Optional[dict] = None):
"""Custom logging callback to handle library logs.
The library will call this function for all internal logs, allowing you to:
- Filter logs by level
- Format messages and data as needed
- Route logs to your logging system
- Add additional context or processing
Note: The library itself does not use Python's logging infrastructure.
This callback is the only way the library outputs logs.
"""
# Example: Route library logs through application logger
if level == logging.DEBUG:
logger.debug("Library: " + message, data or {})
elif level == logging.INFO:
logger.info("Library: " + message, data or {})
elif level == logging.WARNING:
logger.warning("Library: " + message, data or {})
elif level == logging.ERROR:
logger.error("Library: " + message, data or {})
class MovieReview(BaseModel):
title: str
rating: float = Field(minimum=0, maximum=10)
summary: str
pros: list[str]
cons: list[str]
async def analyze_movie(title: str):
client = AsyncOpenAI() # Initialize client
try:
# Application log
logger.info("Starting analysis of movie: %s", title)
# Use OpenAI Structured Outputs with streaming
async for chunk in async_openai_structured_stream(
client=client,
model="gpt-4o-2024-08-06",
output_schema=MovieReview,
system_prompt="You are a movie critic.",
user_prompt=f"Review the movie '{title}'",
stream_config=StreamConfig(
max_buffer_size=1024 * 1024, # 1MB
cleanup_threshold=512 * 1024 # 512KB
),
timeout=30.0,
on_log=log_callback # Library will use this for all logging
):
# Application logs
logger.info("Received review for: %s", chunk.title)
print(f"Title: {chunk.title}")
print(f"Rating: {chunk.rating}/10")
print(f"Summary: {chunk.summary}")
print("\nPros:")
for pro in chunk.pros:
print(f"- {pro}")
print("\nCons:")
for con in chunk.cons:
print(f"- {con}")
except StreamBufferError as e:
# Application error logging
logger.error("Failed to process stream: %s", e)
logger.info("Hint: Try increasing buffer size or adjusting cleanup threshold")
except StreamInterruptedError as e:
logger.error("Stream interrupted: %s", e)
logger.info("Check network connection and API status")
except StreamParseError as e:
logger.error(
"Parse error after %d attempts: %s",
e.attempts, e.last_error
)
logger.debug("Buffer cleanup completed")
except ValidationError as e:
logger.error("Invalid analysis format: %s", e)
logger.debug("Error context: %s", e.errors())
except APITimeoutError as e:
logger.error("API timeout: %s", e)
logger.info("Consider increasing timeout for large files")
except APIError as e:
logger.error("API error: %s", e)
if e.status_code == 429:
logger.info("Rate limit exceeded, implement backoff")
elif e.status_code >= 500:
logger.info("Server error, retry with exponential backoff")
except ModelNotSupportedError as e:
logger.error("Model not supported: %s", e)
logger.info("Supported versions: %s", e.supported_versions)
finally:
await client.close() # Cleanup resources
Code Analysis
Analyze code using OpenAI Structured Outputs with custom rules and streaming:
import logging
import aiofiles
from typing import Literal
from pydantic import BaseModel, Field
from openai import AsyncOpenAI, APIError, APITimeoutError
from openai_structured import async_openai_structured_stream, StreamConfig
from openai_structured.errors import (
StreamBufferError,
StreamInterruptedError,
StreamParseError,
ValidationError,
ModelNotSupportedError
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CodeIssue(BaseModel):
severity: Literal["high", "medium", "low"]
line_number: int = Field(ge=1)
description: str
suggestion: str
class CodeAnalysis(BaseModel):
file_name: str
language: str
issues: list[CodeIssue]
best_practices: list[str]
improvement_summary: str
async def analyze_code(file_path: str):
client = AsyncOpenAI()
try:
# Read file with proper error handling
try:
async with aiofiles.open(file_path, 'r') as f:
code = await f.read()
except IOError as e:
logger.error("Failed to read file: %s", e)
return
# Configure stream with larger buffer for code analysis
config = StreamConfig(
max_buffer_size=2 * 1024 * 1024, # 2MB for large files
cleanup_threshold=1024 * 1024, # 1MB (50% of max)
chunk_size=16 * 1024 # 16KB chunks
)
# Use OpenAI Structured Outputs with streaming
async for chunk in async_openai_structured_stream(
client=client,
model="gpt-4o-2024-08-06", # Model with OpenAI Structured Outputs support
output_schema=CodeAnalysis,
system_prompt="You are a code review expert.",
user_prompt=f"Analyze this code:\n\n{code}",
temperature=0.2, # Lower temperature for analysis
stream_config=config,
timeout=60.0 # Longer timeout for large files
):
# Log buffer size changes
if config.should_log_size():
logger.info(
"Buffer size: %d bytes",
config.total_bytes
)
logger.info("Analyzing %s", chunk.file_name)
print(f"\nAnalysis for {chunk.file_name}:")
print(f"Language: {chunk.language}")
print("\nIssues:")
for issue in chunk.issues:
print(f"[{issue.severity.upper()}] Line {issue.line_number}")
print(f" Problem: {issue.description}")
print(f" Suggestion: {issue.suggestion}")
print("\nBest Practices:")
for practice in chunk.best_practices:
print(f"- {practice}")
print(f"\nSummary: {chunk.improvement_summary}")
except StreamBufferError as e:
logger.error("Buffer overflow: %s", e)
logger.info("Consider increasing buffer size or processing chunks faster")
except StreamInterruptedError as e:
logger.error("Stream interrupted: %s", e)
logger.info("Check network connection and API status")
except StreamParseError as e:
logger.error(
"Parse error after %d attempts: %s (max attempts: %d)",
e.attempts, e.last_error, StreamBuffer.MAX_PARSE_ERRORS
)
logger.debug("Buffer cleanup completed")
except ValidationError as e:
logger.error("Invalid analysis format: %s", e)
logger.debug("Error context: %s", e.errors())
except APITimeoutError as e:
logger.error("API timeout: %s", e)
logger.info("Consider increasing timeout for large files")
except APIError as e:
logger.error("API error: %s", e)
if e.status_code == 429:
logger.info("Rate limit exceeded, implement backoff")
elif e.status_code >= 500:
logger.info("Server error, retry with exponential backoff")
except ModelNotSupportedError as e:
logger.error("Model not supported: %s", e)
logger.info("Supported versions: %s", e.supported_versions)
finally:
await client.close() # Cleanup resources
Buffer Management
Configure buffer settings for different OpenAI Structured Outputs use cases:
import logging
from openai import AsyncOpenAI
from openai_structured import StreamConfig, async_openai_structured_stream
from openai_structured.errors import StreamBufferError, StreamParseError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Default configuration (1MB buffer)
config = StreamConfig() # Uses ijson for efficient parsing
# Large responses (2MB buffer)
large_config = StreamConfig(
max_buffer_size=2 * 1024 * 1024, # 2MB
cleanup_threshold=1024 * 1024, # 1MB (50% of max)
chunk_size=16 * 1024 # 16KB
)
# Memory-constrained (256KB buffer)
small_config = StreamConfig(
max_buffer_size=256 * 1024, # 256KB
cleanup_threshold=128 * 1024, # 128KB (50% of max)
chunk_size=4 * 1024 # 4KB
)
async def process_with_config(config: StreamConfig):
client = AsyncOpenAI()
try:
async for chunk in async_openai_structured_stream(
client=client,
model="gpt-4o-2024-08-06",
output_schema=OutputSchema,
system_prompt="Process this data.",
user_prompt="Sample input",
stream_config=config
):
# Monitor buffer size changes
if config.should_log_size():
logger.info(
"Buffer size: %d bytes (max: %d, cleanup at: %d)",
config.total_bytes,
config.max_buffer_size,
config.cleanup_threshold
)
process_chunk(chunk)
except StreamBufferError as e:
# Buffer exceeded max size after MAX_CLEANUP_ATTEMPTS
logger.error(
"Buffer overflow with %d bytes limit after %d cleanup attempts: %s",
config.max_buffer_size,
StreamBuffer.MAX_CLEANUP_ATTEMPTS,
e
)
if hasattr(e, '_cleanup_stats'):
logger.debug("Cleanup stats: %s", e._cleanup_stats)
except StreamParseError as e:
# Failed to parse after MAX_PARSE_ERRORS attempts
logger.error(
"Parse error after %d attempts (max: %d): %s",
e.attempts,
StreamBuffer.MAX_PARSE_ERRORS,
e.last_error
)
logger.debug("Buffer cleanup completed")
finally:
await client.close()
Model Support
Use different models with version validation:
from openai import AsyncOpenAI
from openai_structured import async_openai_structured_stream
from openai_structured.errors import ModelNotSupportedError
async def use_models():
client = AsyncOpenAI()
try:
# Production model with specific version
async for chunk in async_openai_structured_stream(
client=client,
model="gpt-4o-2024-08-06", # Specific version
output_schema=OutputSchema,
system_prompt="Process this.",
user_prompt="Sample input",
max_tokens=8000 # Model-specific limit
):
process_chunk(chunk)
# Development alias (latest compatible version)
async for chunk in async_openai_structured_stream(
client=client,
model="gpt-4o", # Latest version
output_schema=OutputSchema,
system_prompt="Process this.",
user_prompt="Sample input"
):
process_chunk(chunk)
# Optimized model for large responses
async for chunk in async_openai_structured_stream(
client=client,
model="o1-2024-12-17", # Large context window
output_schema=OutputSchema,
system_prompt="Process this.",
user_prompt="Sample input",
max_tokens=50000 # Up to 100K tokens
):
process_chunk(chunk)
except ModelNotSupportedError as e:
print(f"Model version error: {e}")
print("Supported versions:")
for model, version in e.supported_versions.items():
print(f"- {model}: {version}")
finally:
await client.close()
Example Schemas
The library provides example schemas and patterns to help you get started.
Basic Usage
The simplest way to use the library is with the SimpleMessage schema:
from openai import OpenAI
from openai_structured import openai_structured
from openai_structured.examples.schemas import SimpleMessage
client = OpenAI()
result = openai_structured(
client=client,
model="gpt-4o",
output_schema=SimpleMessage,
user_prompt="What is the capital of France?"
)
print(result.message) # "The capital of France is Paris."
Available Schemas
1. SimpleMessage
A basic schema for text responses:
from openai_structured.examples.schemas import SimpleMessage
class SimpleMessage(BaseModel):
"""Simple schema with a single message field."""
message: str
Use this when you just need the model’s response as text.
2. SentimentMessage
A more complex schema that includes sentiment analysis:
from openai_structured.examples.schemas import SentimentMessage
class SentimentMessage(BaseModel):
"""Schema for sentiment analysis responses."""
message: str = Field(..., description="The analyzed message")
sentiment: str = Field(
...,
pattern="(?i)^(positive|negative|neutral|mixed)$",
description="Sentiment of the message"
)
Use this when you need both content and sentiment analysis:
result = openai_structured(
client=client,
model="gpt-4o",
output_schema=SentimentMessage,
user_prompt="How do you feel about AI?"
)
print(f"Message: {result.message}")
print(f"Sentiment: {result.sentiment}")
Creating Your Own Schemas
You can use these examples as templates for your own schemas:
1. Basic Pattern
from pydantic import BaseModel
class YourSchema(BaseModel):
field1: str
field2: int
2. With Validation
from pydantic import BaseModel, Field
class YourValidatedSchema(BaseModel):
field1: str = Field(..., description="Field description")
field2: int = Field(..., gt=0, description="Must be positive")
3. With Complex Types
from typing import List, Optional
from pydantic import BaseModel
class YourComplexSchema(BaseModel):
items: List[str]
details: Optional[dict]
Best Practices
Clear Field Names - Use descriptive names - Follow Python naming conventions - Add field descriptions
Appropriate Validation - Add type hints - Use Field() for constraints - Include pattern validation where needed
Documentation - Add class docstrings - Document field meanings - Include usage examples
Type Safety - Use appropriate types - Consider Optional fields - Add proper type hints
Testing Examples
This section provides examples of testing code that uses the openai-structured library.
Basic Parameter Validation
Example of basic parameter validation testing:
1"""Example: Basic parameter validation testing."""
2
3import pytest
4
5from openai_structured import OpenAIClientError
6
7
8def test_basic_parameter_validation(test_registry):
9 """Test basic parameter validation."""
10 # Get capabilities for a test model
11 capabilities = test_registry.get_capabilities("test-model")
12
13 # Test valid parameters
14 capabilities.validate_parameter("temperature", 0.7)
15 capabilities.validate_parameter("top_p", 0.9)
16
17 # Test invalid parameters
18 with pytest.raises(OpenAIClientError) as exc_info:
19 capabilities.validate_parameter("temperature", 2.5)
20 assert "must be between" in str(exc_info.value)
21
22 with pytest.raises(OpenAIClientError) as exc_info:
23 capabilities.validate_parameter("invalid_param", 1.0)
24 assert "not supported by model" in str(exc_info.value)
Custom Model Testing
Example of testing with custom model configurations:
1"""Example: Testing with custom model configurations."""
2
3import pytest
4
5from openai_structured import OpenAIClientError, VersionTooOldError
6from openai_structured.model_registry import ModelRegistry
7from openai_structured.testing import (
8 create_enum_constraint,
9 create_numeric_constraint,
10 create_test_registry,
11)
12
13
14@pytest.fixture
15def custom_test_registry():
16 """Custom test registry with specific model configuration."""
17 # Create custom model config
18 model_config = {
19 "dated_models": {
20 "test-custom-model-2024-01-01": {
21 "context_window": 8192,
22 "max_output_tokens": 4096,
23 "supports_structured": True,
24 "supports_streaming": True,
25 "supported_parameters": [
26 {"ref": "numeric_constraints.temperature"},
27 {"ref": "numeric_constraints.top_p"},
28 ],
29 "min_version": {
30 "year": 2024,
31 "month": 1,
32 "day": 1,
33 },
34 }
35 },
36 "aliases": {"test-custom-model": "test-custom-model-2024-01-01"},
37 }
38
39 # Create custom constraints config
40 constraints_config = {
41 "numeric_constraints": {
42 "temperature": {
43 "type": "numeric",
44 "min_value": 0.0,
45 "max_value": 1.0,
46 "description": "Custom temperature range",
47 "allow_float": True,
48 "allow_int": False,
49 },
50 "top_p": {
51 "type": "numeric",
52 "min_value": 0.0,
53 "max_value": 1.0,
54 "description": "Custom top_p range",
55 "allow_float": True,
56 "allow_int": False,
57 },
58 }
59 }
60
61 # Create and return registry with custom configuration
62 registry = create_test_registry(
63 model_config=model_config,
64 constraints_config=constraints_config,
65 )
66 yield registry
67 ModelRegistry.cleanup()
68
69
70def test_custom_model_capabilities(custom_test_registry):
71 """Test custom model capabilities."""
72 # Test basic capabilities
73 capabilities = custom_test_registry.get_capabilities("test-custom-model")
74 assert capabilities.context_window == 8192
75 assert capabilities.max_output_tokens == 4096
76 assert capabilities.supports_structured
77 assert capabilities.supports_streaming
78
79 # Test parameter validation
80 capabilities.validate_parameter("temperature", 0.5)
81 capabilities.validate_parameter("top_p", 0.9)
82
83 # Test invalid parameters
84 with pytest.raises(OpenAIClientError) as exc_info:
85 capabilities.validate_parameter("temperature", 1.5)
86 assert "must be between" in str(exc_info.value)
87
88 with pytest.raises(OpenAIClientError) as exc_info:
89 capabilities.validate_parameter("invalid_param", 1.0)
90 assert "not supported by model" in str(exc_info.value)
91
92 # Test version validation
93 custom_test_registry.get_capabilities("test-custom-model-2024-01-01")
94 with pytest.raises(VersionTooOldError) as exc_info:
95 custom_test_registry.get_capabilities("test-custom-model-2023-12-31")
96 assert "is too old" in str(exc_info.value)
97
98
99def test_custom_parameter_validation(test_registry):
100 """Test parameter validation with custom constraints."""
101 # Create custom constraints
102 temp_constraint = create_numeric_constraint(
103 min_value=0.0,
104 max_value=1.0,
105 description="Custom temperature range",
106 allow_float=True,
107 allow_int=False,
108 )
109
110 effort_constraint = create_enum_constraint(
111 allowed_values=["low", "medium", "high"],
112 description="Custom reasoning effort levels",
113 )
114
115 # Create registry with custom constraints
116 constraints_config = {
117 "numeric_constraints": {
118 "temperature": temp_constraint.model_dump(),
119 },
120 "enum_constraints": {
121 "reasoning_effort": effort_constraint.model_dump(),
122 },
123 }
124
125 registry = create_test_registry(constraints_config=constraints_config)
126 capabilities = registry.get_capabilities("test-model")
127
128 # Test numeric parameters
129 capabilities.validate_parameter("temperature", 0.7) # Float OK
130 with pytest.raises(OpenAIClientError, match="must be a float"):
131 capabilities.validate_parameter(
132 "temperature", 1
133 ) # Integer not allowed
134
135 # Test enum parameters
136 o1_caps = registry.get_capabilities("o1")
137 o1_caps.validate_parameter("reasoning_effort", "medium") # Valid value
138 with pytest.raises(OpenAIClientError, match="Invalid value"):
139 o1_caps.validate_parameter("reasoning_effort", "invalid")
Token Limit Testing
Example of testing token limits and validation:
1"""Example: Testing token limit validation."""
2
3import pytest
4
5from openai_structured import OpenAIClientError, TokenParameterError
6from openai_structured.model_registry import ModelRegistry
7from openai_structured.testing import (
8 create_test_registry,
9 get_test_capabilities,
10)
11
12
13@pytest.fixture
14def token_test_registry():
15 """Create a test registry with token-specific configuration."""
16 # Create model config with token limits
17 model_config = {
18 "dated_models": {
19 "test-token-model-2024-01-01": {
20 "context_window": 4096,
21 "max_output_tokens": 2048,
22 "supports_structured": True,
23 "supports_streaming": True,
24 "supported_parameters": [
25 {"ref": "numeric_constraints.max_completion_tokens"},
26 ],
27 "min_version": {
28 "year": 2024,
29 "month": 1,
30 "day": 1,
31 },
32 }
33 },
34 "aliases": {"test-token-model": "test-token-model-2024-01-01"},
35 }
36
37 # Create constraints config with token parameters
38 constraints_config = {
39 "numeric_constraints": {
40 "max_completion_tokens": {
41 "type": "numeric",
42 "min_value": 1,
43 "max_value": 2048,
44 "description": "Maximum completion tokens",
45 "allow_float": False,
46 "allow_int": True,
47 }
48 }
49 }
50
51 # Create and return registry
52 registry = create_test_registry(
53 model_config=model_config,
54 constraints_config=constraints_config,
55 )
56 yield registry
57 ModelRegistry.cleanup()
58
59
60def test_token_limit_validation(token_test_registry):
61 """Test token limit validation."""
62 capabilities = token_test_registry.get_capabilities("test-token-model")
63
64 # Test valid token limits
65 capabilities.validate_parameter("max_completion_tokens", 1000)
66
67 # Test invalid token limits
68 with pytest.raises(OpenAIClientError) as exc_info:
69 capabilities.validate_parameter("max_completion_tokens", 3000)
70 assert "must not exceed" in str(exc_info.value)
71
72 with pytest.raises(OpenAIClientError) as exc_info:
73 capabilities.validate_parameter(
74 "max_completion_tokens", "not a number"
75 )
76 assert "must be an integer" in str(exc_info.value)
77
78 with pytest.raises(OpenAIClientError) as exc_info:
79 capabilities.validate_parameter("max_completion_tokens", 1.5)
80 assert "must be an integer" in str(exc_info.value)
81
82
83def test_token_parameter_conflicts(test_registry):
84 """Test token parameter conflict handling."""
85 # Get test capabilities with both token parameters
86 capabilities = get_test_capabilities(
87 openai_model_name="test-model",
88 max_output_tokens=4096,
89 supported_parameters=[
90 {"ref": "numeric_constraints.max_output_tokens"},
91 {"ref": "numeric_constraints.max_completion_tokens"},
92 ],
93 )
94 used_params = set()
95
96 # Test max_output_tokens
97 capabilities.validate_parameter(
98 "max_output_tokens", 1000, used_params=used_params
99 )
100
101 # Test conflict with max_completion_tokens
102 with pytest.raises(TokenParameterError) as exc_info:
103 capabilities.validate_parameter(
104 "max_completion_tokens", 1000, used_params=used_params
105 )
106 assert (
107 "Cannot specify both 'max_output_tokens' and 'max_completion_tokens' parameters"
108 in str(exc_info.value)
109 )
110
111
112def test_token_parameter_types(test_registry):
113 """Test token parameter type validation."""
114 # Get test capabilities with token parameter
115 capabilities = get_test_capabilities(
116 openai_model_name="test-model",
117 max_output_tokens=4096,
118 supported_parameters=[
119 {"ref": "numeric_constraints.max_output_tokens"},
120 ],
121 )
122
123 # Test valid integer
124 capabilities.validate_parameter("max_output_tokens", 1000)
125
126 # Test invalid float
127 with pytest.raises(OpenAIClientError, match="must be an integer"):
128 capabilities.validate_parameter("max_output_tokens", 1000.5)
129
130 # Test invalid string
131 with pytest.raises(OpenAIClientError, match="must be an integer"):
132 capabilities.validate_parameter("max_output_tokens", "1000")