Examples

This section provides examples of common use cases for working with OpenAI Structured Outputs using the openai-structured library.

Basic Examples

Movie Review Analysis

Extract structured movie reviews using OpenAI Structured Outputs with streaming:

import logging
from pydantic import BaseModel, Field
from openai import AsyncOpenAI, APIError, APITimeoutError
from openai_structured import async_openai_structured_stream, StreamConfig
from openai_structured.errors import (
    StreamBufferError,
    StreamInterruptedError,
    StreamParseError,
    ValidationError,
    ModelNotSupportedError
)
from typing import Optional

# Configure application logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Library logging callback - the library does not use Python's logging infrastructure directly
# Instead, it calls this callback for all internal logs, giving you full control over log handling
def log_callback(level: int, message: str, data: Optional[dict] = None):
    """Custom logging callback to handle library logs.

    The library will call this function for all internal logs, allowing you to:
    - Filter logs by level
    - Format messages and data as needed
    - Route logs to your logging system
    - Add additional context or processing

    Note: The library itself does not use Python's logging infrastructure.
    This callback is the only way the library outputs logs.
    """
    # Example: Route library logs through application logger
    if level == logging.DEBUG:
        logger.debug("Library: " + message, data or {})
    elif level == logging.INFO:
        logger.info("Library: " + message, data or {})
    elif level == logging.WARNING:
        logger.warning("Library: " + message, data or {})
    elif level == logging.ERROR:
        logger.error("Library: " + message, data or {})

class MovieReview(BaseModel):
    title: str
    rating: float = Field(minimum=0, maximum=10)
    summary: str
    pros: list[str]
    cons: list[str]

async def analyze_movie(title: str):
    client = AsyncOpenAI()  # Initialize client

    try:
        # Application log
        logger.info("Starting analysis of movie: %s", title)

        # Use OpenAI Structured Outputs with streaming
        async for chunk in async_openai_structured_stream(
            client=client,
            model="gpt-4o-2024-08-06",
            output_schema=MovieReview,
            system_prompt="You are a movie critic.",
            user_prompt=f"Review the movie '{title}'",
            stream_config=StreamConfig(
                max_buffer_size=1024 * 1024,  # 1MB
                cleanup_threshold=512 * 1024   # 512KB
            ),
            timeout=30.0,
            on_log=log_callback  # Library will use this for all logging
        ):
            # Application logs
            logger.info("Received review for: %s", chunk.title)
            print(f"Title: {chunk.title}")
            print(f"Rating: {chunk.rating}/10")
            print(f"Summary: {chunk.summary}")
            print("\nPros:")
            for pro in chunk.pros:
                print(f"- {pro}")
            print("\nCons:")
            for con in chunk.cons:
                print(f"- {con}")

    except StreamBufferError as e:
        # Application error logging
        logger.error("Failed to process stream: %s", e)
        logger.info("Hint: Try increasing buffer size or adjusting cleanup threshold")

    except StreamInterruptedError as e:
        logger.error("Stream interrupted: %s", e)
        logger.info("Check network connection and API status")

    except StreamParseError as e:
        logger.error(
            "Parse error after %d attempts: %s",
            e.attempts, e.last_error
        )
        logger.debug("Buffer cleanup completed")

    except ValidationError as e:
        logger.error("Invalid analysis format: %s", e)
        logger.debug("Error context: %s", e.errors())

    except APITimeoutError as e:
        logger.error("API timeout: %s", e)
        logger.info("Consider increasing timeout for large files")

    except APIError as e:
        logger.error("API error: %s", e)
        if e.status_code == 429:
            logger.info("Rate limit exceeded, implement backoff")
        elif e.status_code >= 500:
            logger.info("Server error, retry with exponential backoff")

    except ModelNotSupportedError as e:
        logger.error("Model not supported: %s", e)
        logger.info("Supported versions: %s", e.supported_versions)
    finally:
        await client.close()  # Cleanup resources

Code Analysis

Analyze code using OpenAI Structured Outputs with custom rules and streaming:

import logging
import aiofiles
from typing import Literal
from pydantic import BaseModel, Field
from openai import AsyncOpenAI, APIError, APITimeoutError
from openai_structured import async_openai_structured_stream, StreamConfig
from openai_structured.errors import (
    StreamBufferError,
    StreamInterruptedError,
    StreamParseError,
    ValidationError,
    ModelNotSupportedError
)

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CodeIssue(BaseModel):
    severity: Literal["high", "medium", "low"]
    line_number: int = Field(ge=1)
    description: str
    suggestion: str

class CodeAnalysis(BaseModel):
    file_name: str
    language: str
    issues: list[CodeIssue]
    best_practices: list[str]
    improvement_summary: str

async def analyze_code(file_path: str):
    client = AsyncOpenAI()

    try:
        # Read file with proper error handling
        try:
            async with aiofiles.open(file_path, 'r') as f:
                code = await f.read()
        except IOError as e:
            logger.error("Failed to read file: %s", e)
            return

        # Configure stream with larger buffer for code analysis
        config = StreamConfig(
            max_buffer_size=2 * 1024 * 1024,  # 2MB for large files
            cleanup_threshold=1024 * 1024,     # 1MB (50% of max)
            chunk_size=16 * 1024              # 16KB chunks
        )

        # Use OpenAI Structured Outputs with streaming
        async for chunk in async_openai_structured_stream(
            client=client,
            model="gpt-4o-2024-08-06",  # Model with OpenAI Structured Outputs support
            output_schema=CodeAnalysis,
            system_prompt="You are a code review expert.",
            user_prompt=f"Analyze this code:\n\n{code}",
            temperature=0.2,  # Lower temperature for analysis
            stream_config=config,
            timeout=60.0  # Longer timeout for large files
        ):
            # Log buffer size changes
            if config.should_log_size():
                logger.info(
                    "Buffer size: %d bytes",
                    config.total_bytes
                )

            logger.info("Analyzing %s", chunk.file_name)
            print(f"\nAnalysis for {chunk.file_name}:")
            print(f"Language: {chunk.language}")

            print("\nIssues:")
            for issue in chunk.issues:
                print(f"[{issue.severity.upper()}] Line {issue.line_number}")
                print(f"  Problem: {issue.description}")
                print(f"  Suggestion: {issue.suggestion}")

            print("\nBest Practices:")
            for practice in chunk.best_practices:
                print(f"- {practice}")

            print(f"\nSummary: {chunk.improvement_summary}")

    except StreamBufferError as e:
        logger.error("Buffer overflow: %s", e)
        logger.info("Consider increasing buffer size or processing chunks faster")
    except StreamInterruptedError as e:
        logger.error("Stream interrupted: %s", e)
        logger.info("Check network connection and API status")
    except StreamParseError as e:
        logger.error(
            "Parse error after %d attempts: %s (max attempts: %d)",
            e.attempts, e.last_error, StreamBuffer.MAX_PARSE_ERRORS
        )
        logger.debug("Buffer cleanup completed")
    except ValidationError as e:
        logger.error("Invalid analysis format: %s", e)
        logger.debug("Error context: %s", e.errors())
    except APITimeoutError as e:
        logger.error("API timeout: %s", e)
        logger.info("Consider increasing timeout for large files")
    except APIError as e:
        logger.error("API error: %s", e)
        if e.status_code == 429:
            logger.info("Rate limit exceeded, implement backoff")
        elif e.status_code >= 500:
            logger.info("Server error, retry with exponential backoff")
    except ModelNotSupportedError as e:
        logger.error("Model not supported: %s", e)
        logger.info("Supported versions: %s", e.supported_versions)
    finally:
        await client.close()  # Cleanup resources

Buffer Management

Configure buffer settings for different OpenAI Structured Outputs use cases:

import logging
from openai import AsyncOpenAI
from openai_structured import StreamConfig, async_openai_structured_stream
from openai_structured.errors import StreamBufferError, StreamParseError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Default configuration (1MB buffer)
config = StreamConfig()  # Uses ijson for efficient parsing

# Large responses (2MB buffer)
large_config = StreamConfig(
    max_buffer_size=2 * 1024 * 1024,    # 2MB
    cleanup_threshold=1024 * 1024,       # 1MB (50% of max)
    chunk_size=16 * 1024                # 16KB
)

# Memory-constrained (256KB buffer)
small_config = StreamConfig(
    max_buffer_size=256 * 1024,    # 256KB
    cleanup_threshold=128 * 1024,   # 128KB (50% of max)
    chunk_size=4 * 1024            # 4KB
)

async def process_with_config(config: StreamConfig):
    client = AsyncOpenAI()

    try:
        async for chunk in async_openai_structured_stream(
            client=client,
            model="gpt-4o-2024-08-06",
            output_schema=OutputSchema,
            system_prompt="Process this data.",
            user_prompt="Sample input",
            stream_config=config
        ):
            # Monitor buffer size changes
            if config.should_log_size():
                logger.info(
                    "Buffer size: %d bytes (max: %d, cleanup at: %d)",
                    config.total_bytes,
                    config.max_buffer_size,
                    config.cleanup_threshold
                )
            process_chunk(chunk)

    except StreamBufferError as e:
        # Buffer exceeded max size after MAX_CLEANUP_ATTEMPTS
        logger.error(
            "Buffer overflow with %d bytes limit after %d cleanup attempts: %s",
            config.max_buffer_size,
            StreamBuffer.MAX_CLEANUP_ATTEMPTS,
            e
        )
        if hasattr(e, '_cleanup_stats'):
            logger.debug("Cleanup stats: %s", e._cleanup_stats)

    except StreamParseError as e:
        # Failed to parse after MAX_PARSE_ERRORS attempts
        logger.error(
            "Parse error after %d attempts (max: %d): %s",
            e.attempts,
            StreamBuffer.MAX_PARSE_ERRORS,
            e.last_error
        )
        logger.debug("Buffer cleanup completed")

    finally:
        await client.close()

Model Support

Use different models with version validation:

from openai import AsyncOpenAI
from openai_structured import async_openai_structured_stream
from openai_structured.errors import ModelNotSupportedError

async def use_models():
    client = AsyncOpenAI()

    try:
        # Production model with specific version
        async for chunk in async_openai_structured_stream(
            client=client,
            model="gpt-4o-2024-08-06",  # Specific version
            output_schema=OutputSchema,
            system_prompt="Process this.",
            user_prompt="Sample input",
            max_tokens=8000  # Model-specific limit
        ):
            process_chunk(chunk)

        # Development alias (latest compatible version)
        async for chunk in async_openai_structured_stream(
            client=client,
            model="gpt-4o",  # Latest version
            output_schema=OutputSchema,
            system_prompt="Process this.",
            user_prompt="Sample input"
        ):
            process_chunk(chunk)

        # Optimized model for large responses
        async for chunk in async_openai_structured_stream(
            client=client,
            model="o1-2024-12-17",  # Large context window
            output_schema=OutputSchema,
            system_prompt="Process this.",
            user_prompt="Sample input",
            max_tokens=50000  # Up to 100K tokens
        ):
            process_chunk(chunk)

    except ModelNotSupportedError as e:
        print(f"Model version error: {e}")
        print("Supported versions:")
        for model, version in e.supported_versions.items():
            print(f"- {model}: {version}")

    finally:
        await client.close()

Example Schemas

The library provides example schemas and patterns to help you get started.

Basic Usage

The simplest way to use the library is with the SimpleMessage schema:

from openai import OpenAI
from openai_structured import openai_structured
from openai_structured.examples.schemas import SimpleMessage

client = OpenAI()
result = openai_structured(
    client=client,
    model="gpt-4o",
    output_schema=SimpleMessage,
    user_prompt="What is the capital of France?"
)
print(result.message)  # "The capital of France is Paris."

Available Schemas

1. SimpleMessage

A basic schema for text responses:

from openai_structured.examples.schemas import SimpleMessage

class SimpleMessage(BaseModel):
    """Simple schema with a single message field."""
    message: str

Use this when you just need the model’s response as text.

2. SentimentMessage

A more complex schema that includes sentiment analysis:

from openai_structured.examples.schemas import SentimentMessage

class SentimentMessage(BaseModel):
    """Schema for sentiment analysis responses."""
    message: str = Field(..., description="The analyzed message")
    sentiment: str = Field(
        ...,
        pattern="(?i)^(positive|negative|neutral|mixed)$",
        description="Sentiment of the message"
    )

Use this when you need both content and sentiment analysis:

result = openai_structured(
    client=client,
    model="gpt-4o",
    output_schema=SentimentMessage,
    user_prompt="How do you feel about AI?"
)
print(f"Message: {result.message}")
print(f"Sentiment: {result.sentiment}")

Creating Your Own Schemas

You can use these examples as templates for your own schemas:

1. Basic Pattern

from pydantic import BaseModel

class YourSchema(BaseModel):
    field1: str
    field2: int

2. With Validation

from pydantic import BaseModel, Field

class YourValidatedSchema(BaseModel):
    field1: str = Field(..., description="Field description")
    field2: int = Field(..., gt=0, description="Must be positive")

3. With Complex Types

from typing import List, Optional
from pydantic import BaseModel

class YourComplexSchema(BaseModel):
    items: List[str]
    details: Optional[dict]

Best Practices

  1. Clear Field Names - Use descriptive names - Follow Python naming conventions - Add field descriptions

  2. Appropriate Validation - Add type hints - Use Field() for constraints - Include pattern validation where needed

  3. Documentation - Add class docstrings - Document field meanings - Include usage examples

  4. Type Safety - Use appropriate types - Consider Optional fields - Add proper type hints

Testing Examples

This section provides examples of testing code that uses the openai-structured library.

Basic Parameter Validation

Example of basic parameter validation testing:

 1"""Example: Basic parameter validation testing."""
 2
 3import pytest
 4
 5from openai_structured import OpenAIClientError
 6
 7
 8def test_basic_parameter_validation(test_registry):
 9    """Test basic parameter validation."""
10    # Get capabilities for a test model
11    capabilities = test_registry.get_capabilities("test-model")
12
13    # Test valid parameters
14    capabilities.validate_parameter("temperature", 0.7)
15    capabilities.validate_parameter("top_p", 0.9)
16
17    # Test invalid parameters
18    with pytest.raises(OpenAIClientError) as exc_info:
19        capabilities.validate_parameter("temperature", 2.5)
20    assert "must be between" in str(exc_info.value)
21
22    with pytest.raises(OpenAIClientError) as exc_info:
23        capabilities.validate_parameter("invalid_param", 1.0)
24    assert "not supported by model" in str(exc_info.value)

Custom Model Testing

Example of testing with custom model configurations:

  1"""Example: Testing with custom model configurations."""
  2
  3import pytest
  4
  5from openai_structured import OpenAIClientError, VersionTooOldError
  6from openai_structured.model_registry import ModelRegistry
  7from openai_structured.testing import (
  8    create_enum_constraint,
  9    create_numeric_constraint,
 10    create_test_registry,
 11)
 12
 13
 14@pytest.fixture
 15def custom_test_registry():
 16    """Custom test registry with specific model configuration."""
 17    # Create custom model config
 18    model_config = {
 19        "dated_models": {
 20            "test-custom-model-2024-01-01": {
 21                "context_window": 8192,
 22                "max_output_tokens": 4096,
 23                "supports_structured": True,
 24                "supports_streaming": True,
 25                "supported_parameters": [
 26                    {"ref": "numeric_constraints.temperature"},
 27                    {"ref": "numeric_constraints.top_p"},
 28                ],
 29                "min_version": {
 30                    "year": 2024,
 31                    "month": 1,
 32                    "day": 1,
 33                },
 34            }
 35        },
 36        "aliases": {"test-custom-model": "test-custom-model-2024-01-01"},
 37    }
 38
 39    # Create custom constraints config
 40    constraints_config = {
 41        "numeric_constraints": {
 42            "temperature": {
 43                "type": "numeric",
 44                "min_value": 0.0,
 45                "max_value": 1.0,
 46                "description": "Custom temperature range",
 47                "allow_float": True,
 48                "allow_int": False,
 49            },
 50            "top_p": {
 51                "type": "numeric",
 52                "min_value": 0.0,
 53                "max_value": 1.0,
 54                "description": "Custom top_p range",
 55                "allow_float": True,
 56                "allow_int": False,
 57            },
 58        }
 59    }
 60
 61    # Create and return registry with custom configuration
 62    registry = create_test_registry(
 63        model_config=model_config,
 64        constraints_config=constraints_config,
 65    )
 66    yield registry
 67    ModelRegistry.cleanup()
 68
 69
 70def test_custom_model_capabilities(custom_test_registry):
 71    """Test custom model capabilities."""
 72    # Test basic capabilities
 73    capabilities = custom_test_registry.get_capabilities("test-custom-model")
 74    assert capabilities.context_window == 8192
 75    assert capabilities.max_output_tokens == 4096
 76    assert capabilities.supports_structured
 77    assert capabilities.supports_streaming
 78
 79    # Test parameter validation
 80    capabilities.validate_parameter("temperature", 0.5)
 81    capabilities.validate_parameter("top_p", 0.9)
 82
 83    # Test invalid parameters
 84    with pytest.raises(OpenAIClientError) as exc_info:
 85        capabilities.validate_parameter("temperature", 1.5)
 86    assert "must be between" in str(exc_info.value)
 87
 88    with pytest.raises(OpenAIClientError) as exc_info:
 89        capabilities.validate_parameter("invalid_param", 1.0)
 90    assert "not supported by model" in str(exc_info.value)
 91
 92    # Test version validation
 93    custom_test_registry.get_capabilities("test-custom-model-2024-01-01")
 94    with pytest.raises(VersionTooOldError) as exc_info:
 95        custom_test_registry.get_capabilities("test-custom-model-2023-12-31")
 96    assert "is too old" in str(exc_info.value)
 97
 98
 99def test_custom_parameter_validation(test_registry):
100    """Test parameter validation with custom constraints."""
101    # Create custom constraints
102    temp_constraint = create_numeric_constraint(
103        min_value=0.0,
104        max_value=1.0,
105        description="Custom temperature range",
106        allow_float=True,
107        allow_int=False,
108    )
109
110    effort_constraint = create_enum_constraint(
111        allowed_values=["low", "medium", "high"],
112        description="Custom reasoning effort levels",
113    )
114
115    # Create registry with custom constraints
116    constraints_config = {
117        "numeric_constraints": {
118            "temperature": temp_constraint.model_dump(),
119        },
120        "enum_constraints": {
121            "reasoning_effort": effort_constraint.model_dump(),
122        },
123    }
124
125    registry = create_test_registry(constraints_config=constraints_config)
126    capabilities = registry.get_capabilities("test-model")
127
128    # Test numeric parameters
129    capabilities.validate_parameter("temperature", 0.7)  # Float OK
130    with pytest.raises(OpenAIClientError, match="must be a float"):
131        capabilities.validate_parameter(
132            "temperature", 1
133        )  # Integer not allowed
134
135    # Test enum parameters
136    o1_caps = registry.get_capabilities("o1")
137    o1_caps.validate_parameter("reasoning_effort", "medium")  # Valid value
138    with pytest.raises(OpenAIClientError, match="Invalid value"):
139        o1_caps.validate_parameter("reasoning_effort", "invalid")

Token Limit Testing

Example of testing token limits and validation:

  1"""Example: Testing token limit validation."""
  2
  3import pytest
  4
  5from openai_structured import OpenAIClientError, TokenParameterError
  6from openai_structured.model_registry import ModelRegistry
  7from openai_structured.testing import (
  8    create_test_registry,
  9    get_test_capabilities,
 10)
 11
 12
 13@pytest.fixture
 14def token_test_registry():
 15    """Create a test registry with token-specific configuration."""
 16    # Create model config with token limits
 17    model_config = {
 18        "dated_models": {
 19            "test-token-model-2024-01-01": {
 20                "context_window": 4096,
 21                "max_output_tokens": 2048,
 22                "supports_structured": True,
 23                "supports_streaming": True,
 24                "supported_parameters": [
 25                    {"ref": "numeric_constraints.max_completion_tokens"},
 26                ],
 27                "min_version": {
 28                    "year": 2024,
 29                    "month": 1,
 30                    "day": 1,
 31                },
 32            }
 33        },
 34        "aliases": {"test-token-model": "test-token-model-2024-01-01"},
 35    }
 36
 37    # Create constraints config with token parameters
 38    constraints_config = {
 39        "numeric_constraints": {
 40            "max_completion_tokens": {
 41                "type": "numeric",
 42                "min_value": 1,
 43                "max_value": 2048,
 44                "description": "Maximum completion tokens",
 45                "allow_float": False,
 46                "allow_int": True,
 47            }
 48        }
 49    }
 50
 51    # Create and return registry
 52    registry = create_test_registry(
 53        model_config=model_config,
 54        constraints_config=constraints_config,
 55    )
 56    yield registry
 57    ModelRegistry.cleanup()
 58
 59
 60def test_token_limit_validation(token_test_registry):
 61    """Test token limit validation."""
 62    capabilities = token_test_registry.get_capabilities("test-token-model")
 63
 64    # Test valid token limits
 65    capabilities.validate_parameter("max_completion_tokens", 1000)
 66
 67    # Test invalid token limits
 68    with pytest.raises(OpenAIClientError) as exc_info:
 69        capabilities.validate_parameter("max_completion_tokens", 3000)
 70    assert "must not exceed" in str(exc_info.value)
 71
 72    with pytest.raises(OpenAIClientError) as exc_info:
 73        capabilities.validate_parameter(
 74            "max_completion_tokens", "not a number"
 75        )
 76    assert "must be an integer" in str(exc_info.value)
 77
 78    with pytest.raises(OpenAIClientError) as exc_info:
 79        capabilities.validate_parameter("max_completion_tokens", 1.5)
 80    assert "must be an integer" in str(exc_info.value)
 81
 82
 83def test_token_parameter_conflicts(test_registry):
 84    """Test token parameter conflict handling."""
 85    # Get test capabilities with both token parameters
 86    capabilities = get_test_capabilities(
 87        openai_model_name="test-model",
 88        max_output_tokens=4096,
 89        supported_parameters=[
 90            {"ref": "numeric_constraints.max_output_tokens"},
 91            {"ref": "numeric_constraints.max_completion_tokens"},
 92        ],
 93    )
 94    used_params = set()
 95
 96    # Test max_output_tokens
 97    capabilities.validate_parameter(
 98        "max_output_tokens", 1000, used_params=used_params
 99    )
100
101    # Test conflict with max_completion_tokens
102    with pytest.raises(TokenParameterError) as exc_info:
103        capabilities.validate_parameter(
104            "max_completion_tokens", 1000, used_params=used_params
105        )
106    assert (
107        "Cannot specify both 'max_output_tokens' and 'max_completion_tokens' parameters"
108        in str(exc_info.value)
109    )
110
111
112def test_token_parameter_types(test_registry):
113    """Test token parameter type validation."""
114    # Get test capabilities with token parameter
115    capabilities = get_test_capabilities(
116        openai_model_name="test-model",
117        max_output_tokens=4096,
118        supported_parameters=[
119            {"ref": "numeric_constraints.max_output_tokens"},
120        ],
121    )
122
123    # Test valid integer
124    capabilities.validate_parameter("max_output_tokens", 1000)
125
126    # Test invalid float
127    with pytest.raises(OpenAIClientError, match="must be an integer"):
128        capabilities.validate_parameter("max_output_tokens", 1000.5)
129
130    # Test invalid string
131    with pytest.raises(OpenAIClientError, match="must be an integer"):
132        capabilities.validate_parameter("max_output_tokens", "1000")

```