[Bug Fix] Responses api session management for streaming responses (#13396)
* fix proxy config * fix(responses api): fix streaming ID consistency and tool format handling (#12640) * fix(responses): ensure streaming chunk IDs use consistent encoding format Fixes streaming ID inconsistency where streaming responses used raw provider IDs while non-streaming responses used properly encoded IDs with provider context. Changes: - Updated LiteLLMCompletionStreamingIterator to accept provider context - Added _encode_chunk_id() method using same logic as non-streaming responses - Modified chunk transformation to encode all streaming item_ids with resp_ prefix - Updated handlers to pass custom_llm_provider and litellm_metadata to streaming iterator Impact: - Streaming chunk IDs now format: resp_<base64_encoded_provider_context> - Enables session continuity when using streaming response IDs as previous_response_id - Allows provider detection and load balancing with streaming responses - Maintains backward compatibility with existing streaming functionality 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix(types): add explicit Optional[str] type annotation for model_id This resolves MyPy type checking error where model_id could be None but wasn't explicitly typed as Optional[str]. * fix(types): handle None case for litellm_metadata access Prevents 'Item None has no attribute get' error by checking for None before accessing litellm_metadata dictionary. * test: add comprehensive tests for streaming ID consistency Adds unit and E2E tests to verify streaming chunk IDs are properly encoded with consistent format across streaming responses. ## Tests Added ### Unit Test (test_reasoning_content_transformation.py) - `test_streaming_chunk_id_encoding()`: Validates the `_encode_chunk_id()` method correctly encodes chunk IDs with `resp_` prefix and provider context ### E2E Tests (test_e2e_openai_responses_api.py) - `test_streaming_id_consistency_across_chunks()`: Tests that all streaming chunk IDs are properly encoded across multiple chunks in a real streaming response - `test_streaming_response_id_as_previous_response_id()`: Tests the core use case - using streaming response IDs for session continuity with `previous_response_id` ## Key Testing Approach - Uses **Gemini** (non-OpenAI model) to test the transformation logic rather than OpenAI passthrough, since the streaming ID consistency issue occurs when LiteLLM transforms responses rather than just passing through to native OpenAI responses API - Tests validate that streaming chunk IDs now use same encoding as non-streaming responses - Verifies session continuity works with streaming responses Addresses @ishaan-jaff's request for unit tests covering the streaming ID consistency fix. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix(lint): remove unused imports in transformation.py Removes unused imports to fix CI linting errors: - GenericResponseOutputItem - OutputFunctionToolCall * test: remove E2E tests from openai_endpoints_tests Remove streaming ID consistency E2E tests as requested by @ishaan-jaff. Keep only the mock/unit test in test_reasoning_content_transformation.py * revert: remove streaming chunk ID encoding to original behavior This reverts the streaming chunk ID encoding changes to understand the original issue better. Original behavior was: - Streaming chunks: raw provider IDs - Streaming final response: raw IDs (PROBLEM!) - Non-streaming final response: encoded IDs (correct) The real issue: streaming final response IDs were not encoded, breaking session continuity. * fix(responses): encode streaming final response IDs to match OpenAI behavior Fixes streaming ID inconsistency to match OpenAI's Responses API behavior: - Streaming chunks: raw message IDs (like OpenAI's msg_xxx) - Final response: encoded IDs (like OpenAI's resp_xxx) This enables session continuity by ensuring streaming final response IDs have the same encoded format as non-streaming responses, allowing them to be used as previous_response_id in follow-up requests. Changes: - Add custom_llm_provider and litellm_metadata to LiteLLMCompletionStreamingIterator - Update handlers to pass provider context to streaming iterator - Apply _update_responses_api_response_id_with_model_id to final streaming response - Keep streaming chunks as raw IDs to match OpenAI format Impact: - Session continuity works with streaming responses - Load balancing can detect provider from streaming final response IDs - Format matches OpenAI's Responses API exactly 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * test: update unit test to match correct OpenAI-compatible behavior Updates the unit test to verify streaming chunk IDs are raw (not encoded) to match OpenAI's responses API format: - Streaming chunks: raw message IDs (like msg_xxx) - Final response: encoded IDs (like resp_xxx) This reflects the correct behavior implemented in the fix. --------- Co-authored-by: Claude <noreply@anthropic.com> * cleanup * TestBaseResponsesAPIStreamingIterator --------- Co-authored-by: Javier de la Torre <jatorre@carto.com> Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
parent
698655cd24
commit
9761ba7c7a
@ -1,15 +1,5 @@
|
||||
model_list:
|
||||
- model_name: bedrock/*
|
||||
- model_name: gemini/*
|
||||
litellm_params:
|
||||
model: bedrock/*
|
||||
model: gemini/*
|
||||
|
||||
|
||||
litellm_settings:
|
||||
callbacks: ["s3_v2"]
|
||||
s3_callback_params:
|
||||
s3_bucket_name: litellm-logs # AWS Bucket Name for S3
|
||||
s3_region_name: us-west-2
|
||||
|
||||
general_settings:
|
||||
cold_storage_custom_logger: s3_v2
|
||||
store_prompts_in_cold_storage: true
|
||||
@ -84,6 +84,8 @@ class LiteLLMCompletionTransformationHandler:
|
||||
litellm_custom_stream_wrapper=litellm_completion_response,
|
||||
request_input=input,
|
||||
responses_api_request=responses_api_request,
|
||||
custom_llm_provider=custom_llm_provider,
|
||||
litellm_metadata=kwargs.get("litellm_metadata", {}),
|
||||
)
|
||||
|
||||
async def async_response_api_handler(
|
||||
@ -129,4 +131,6 @@ class LiteLLMCompletionTransformationHandler:
|
||||
litellm_custom_stream_wrapper=litellm_completion_response,
|
||||
request_input=request_input,
|
||||
responses_api_request=responses_api_request,
|
||||
custom_llm_provider=litellm_completion_request.get("custom_llm_provider"),
|
||||
litellm_metadata=kwargs.get("litellm_metadata", {}),
|
||||
)
|
||||
|
||||
@ -6,6 +6,7 @@ from litellm.responses.litellm_completion_transformation.transformation import (
|
||||
LiteLLMCompletionResponsesConfig,
|
||||
)
|
||||
from litellm.responses.streaming_iterator import ResponsesAPIStreamingIterator
|
||||
from litellm.responses.utils import ResponsesAPIRequestUtils
|
||||
from litellm.types.llms.openai import (
|
||||
OutputTextDeltaEvent,
|
||||
ReasoningSummaryTextDeltaEvent,
|
||||
@ -34,6 +35,8 @@ class LiteLLMCompletionStreamingIterator(ResponsesAPIStreamingIterator):
|
||||
litellm_custom_stream_wrapper: litellm.CustomStreamWrapper,
|
||||
request_input: Union[str, ResponseInputParam],
|
||||
responses_api_request: ResponsesAPIOptionalRequestParams,
|
||||
custom_llm_provider: Optional[str] = None,
|
||||
litellm_metadata: Optional[dict] = None,
|
||||
):
|
||||
self.litellm_custom_stream_wrapper: litellm.CustomStreamWrapper = (
|
||||
litellm_custom_stream_wrapper
|
||||
@ -42,6 +45,8 @@ class LiteLLMCompletionStreamingIterator(ResponsesAPIStreamingIterator):
|
||||
self.responses_api_request: ResponsesAPIOptionalRequestParams = (
|
||||
responses_api_request
|
||||
)
|
||||
self.custom_llm_provider: Optional[str] = custom_llm_provider
|
||||
self.litellm_metadata: Optional[dict] = litellm_metadata or {}
|
||||
self.collected_chat_completion_chunks: List[ModelResponseStream] = []
|
||||
self.finished: bool = False
|
||||
|
||||
@ -164,14 +169,23 @@ class LiteLLMCompletionStreamingIterator(ResponsesAPIStreamingIterator):
|
||||
Union[ModelResponse, TextCompletionResponse]
|
||||
] = stream_chunk_builder(chunks=self.collected_chat_completion_chunks)
|
||||
if litellm_model_response and isinstance(litellm_model_response, ModelResponse):
|
||||
# Transform the response
|
||||
responses_api_response = LiteLLMCompletionResponsesConfig.transform_chat_completion_response_to_responses_api_response(
|
||||
request_input=self.request_input,
|
||||
chat_completion_response=litellm_model_response,
|
||||
responses_api_request=self.responses_api_request,
|
||||
)
|
||||
|
||||
# Encode the response ID to match non-streaming behavior
|
||||
encoded_response = ResponsesAPIRequestUtils._update_responses_api_response_id_with_model_id(
|
||||
responses_api_response=responses_api_response,
|
||||
custom_llm_provider=self.custom_llm_provider,
|
||||
litellm_metadata=self.litellm_metadata,
|
||||
)
|
||||
|
||||
return ResponseCompletedEvent(
|
||||
type=ResponsesAPIStreamEvents.RESPONSE_COMPLETED,
|
||||
response=LiteLLMCompletionResponsesConfig.transform_chat_completion_response_to_responses_api_response(
|
||||
request_input=self.request_input,
|
||||
chat_completion_response=litellm_model_response,
|
||||
responses_api_request=self.responses_api_request,
|
||||
),
|
||||
response=encoded_response,
|
||||
)
|
||||
else:
|
||||
return None
|
||||
|
||||
@ -137,6 +137,14 @@ model_list:
|
||||
model: openai/my-fake-model
|
||||
api_key: my-fake-key
|
||||
api_base: https://exampleopenaiendpoint-production.up.railway.appxxxx/
|
||||
- model_name: gemini-1.5-flash
|
||||
litellm_params:
|
||||
model: gemini/gemini-1.5-flash
|
||||
api_key: os.environ/GOOGLE_API_KEY
|
||||
- model_name: gpt-4o
|
||||
litellm_params:
|
||||
model: gpt-4o
|
||||
api_key: os.environ/OPENAI_API_KEY
|
||||
|
||||
|
||||
litellm_settings:
|
||||
|
||||
@ -0,0 +1,239 @@
|
||||
"""
|
||||
Unit tests for BaseResponsesAPIStreamingIterator
|
||||
|
||||
Tests core functionality including:
|
||||
1. Processing chunks and handling ResponseCompletedEvent
|
||||
2. Ensuring _update_responses_api_response_id_with_model_id is called for final chunk
|
||||
3. Verifying ID update is NOT called for non-final chunks (delta events)
|
||||
4. Edge case handling for invalid JSON, empty chunks, and [DONE] markers
|
||||
|
||||
These tests ensure the streaming iterator correctly processes response chunks
|
||||
and applies model ID updates only to completed responses, as required for proper
|
||||
response tracking and logging.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Optional
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.abspath("../.."))
|
||||
|
||||
from litellm.constants import STREAM_SSE_DONE_STRING
|
||||
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
|
||||
from litellm.llms.base_llm.responses.transformation import BaseResponsesAPIConfig
|
||||
from litellm.responses.streaming_iterator import BaseResponsesAPIStreamingIterator
|
||||
from litellm.responses.utils import ResponsesAPIRequestUtils
|
||||
from litellm.types.llms.openai import (
|
||||
ResponseCompletedEvent,
|
||||
ResponsesAPIResponse,
|
||||
ResponsesAPIStreamEvents,
|
||||
OutputTextDeltaEvent
|
||||
)
|
||||
|
||||
|
||||
class TestBaseResponsesAPIStreamingIterator:
|
||||
"""Test cases for BaseResponsesAPIStreamingIterator"""
|
||||
|
||||
def test_process_chunk_with_response_completed_event(self):
|
||||
"""
|
||||
Test that _process_chunk correctly processes a ResponseCompletedEvent
|
||||
and calls _update_responses_api_response_id_with_model_id for the final chunk.
|
||||
"""
|
||||
# Mock dependencies
|
||||
mock_response = Mock()
|
||||
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
|
||||
mock_config = Mock(spec=BaseResponsesAPIConfig)
|
||||
|
||||
# Create a mock ResponsesAPIResponse for the completed event
|
||||
mock_responses_api_response = Mock(spec=ResponsesAPIResponse)
|
||||
mock_responses_api_response.id = "original_response_id"
|
||||
|
||||
# Create a mock ResponseCompletedEvent
|
||||
mock_completed_event = Mock(spec=ResponseCompletedEvent)
|
||||
mock_completed_event.type = ResponsesAPIStreamEvents.RESPONSE_COMPLETED
|
||||
mock_completed_event.response = mock_responses_api_response
|
||||
|
||||
# Set up the mock transform method to return our completed event
|
||||
mock_config.transform_streaming_response.return_value = mock_completed_event
|
||||
|
||||
# Mock the _update_responses_api_response_id_with_model_id method
|
||||
updated_response = Mock(spec=ResponsesAPIResponse)
|
||||
updated_response.id = "updated_response_id"
|
||||
|
||||
# Create the iterator instance
|
||||
iterator = BaseResponsesAPIStreamingIterator(
|
||||
response=mock_response,
|
||||
model="gpt-4",
|
||||
responses_api_provider_config=mock_config,
|
||||
logging_obj=mock_logging_obj,
|
||||
litellm_metadata={"model_info": {"id": "model_123"}},
|
||||
custom_llm_provider="openai"
|
||||
)
|
||||
|
||||
# Prepare test chunk data
|
||||
test_chunk_data = {
|
||||
"type": "response.completed",
|
||||
"response": {
|
||||
"id": "original_response_id",
|
||||
"output": [{"type": "message", "content": [{"text": "Hello World"}]}]
|
||||
}
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
ResponsesAPIRequestUtils,
|
||||
'_update_responses_api_response_id_with_model_id',
|
||||
return_value=updated_response
|
||||
) as mock_update_id:
|
||||
# Process the chunk
|
||||
result = iterator._process_chunk(json.dumps(test_chunk_data))
|
||||
|
||||
# Assertions
|
||||
assert result is not None
|
||||
assert result.type == ResponsesAPIStreamEvents.RESPONSE_COMPLETED
|
||||
|
||||
# Verify that _update_responses_api_response_id_with_model_id was called
|
||||
mock_update_id.assert_called_once_with(
|
||||
responses_api_response=mock_responses_api_response,
|
||||
litellm_metadata={"model_info": {"id": "model_123"}},
|
||||
custom_llm_provider="openai"
|
||||
)
|
||||
|
||||
# Verify the completed response was stored
|
||||
assert iterator.completed_response == result
|
||||
|
||||
# Verify the response was updated on the event
|
||||
assert result.response == updated_response
|
||||
|
||||
def test_process_chunk_with_delta_event_no_id_update(self):
|
||||
"""
|
||||
Test that _process_chunk correctly processes a delta event
|
||||
and does NOT call _update_responses_api_response_id_with_model_id.
|
||||
"""
|
||||
# Mock dependencies
|
||||
mock_response = Mock()
|
||||
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
|
||||
mock_config = Mock(spec=BaseResponsesAPIConfig)
|
||||
|
||||
# Create a mock OutputTextDeltaEvent (not a completed event)
|
||||
mock_delta_event = Mock(spec=OutputTextDeltaEvent)
|
||||
mock_delta_event.type = ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA
|
||||
mock_delta_event.delta = "Hello"
|
||||
# Delta events don't have a response attribute
|
||||
delattr(mock_delta_event, 'response') if hasattr(mock_delta_event, 'response') else None
|
||||
|
||||
# Set up the mock transform method to return our delta event
|
||||
mock_config.transform_streaming_response.return_value = mock_delta_event
|
||||
|
||||
# Create the iterator instance
|
||||
iterator = BaseResponsesAPIStreamingIterator(
|
||||
response=mock_response,
|
||||
model="gpt-4",
|
||||
responses_api_provider_config=mock_config,
|
||||
logging_obj=mock_logging_obj,
|
||||
litellm_metadata={"model_info": {"id": "model_123"}},
|
||||
custom_llm_provider="openai"
|
||||
)
|
||||
|
||||
# Prepare test chunk data for a delta event
|
||||
test_chunk_data = {
|
||||
"type": "response.output_text.delta",
|
||||
"delta": "Hello",
|
||||
"item_id": "item_123",
|
||||
"output_index": 0,
|
||||
"content_index": 0
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
ResponsesAPIRequestUtils,
|
||||
'_update_responses_api_response_id_with_model_id'
|
||||
) as mock_update_id:
|
||||
# Process the chunk
|
||||
result = iterator._process_chunk(json.dumps(test_chunk_data))
|
||||
|
||||
# Assertions
|
||||
assert result is not None
|
||||
assert result.type == ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA
|
||||
|
||||
# Verify that _update_responses_api_response_id_with_model_id was NOT called
|
||||
mock_update_id.assert_not_called()
|
||||
|
||||
# Verify no completed response was stored (since this is not a completed event)
|
||||
assert iterator.completed_response is None
|
||||
|
||||
def test_process_chunk_handles_invalid_json(self):
|
||||
"""
|
||||
Test that _process_chunk gracefully handles invalid JSON.
|
||||
"""
|
||||
# Mock dependencies
|
||||
mock_response = Mock()
|
||||
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
|
||||
mock_config = Mock(spec=BaseResponsesAPIConfig)
|
||||
|
||||
# Create the iterator instance
|
||||
iterator = BaseResponsesAPIStreamingIterator(
|
||||
response=mock_response,
|
||||
model="gpt-4",
|
||||
responses_api_provider_config=mock_config,
|
||||
logging_obj=mock_logging_obj
|
||||
)
|
||||
|
||||
# Test with invalid JSON
|
||||
result = iterator._process_chunk("invalid json {")
|
||||
|
||||
# Should return None for invalid JSON
|
||||
assert result is None
|
||||
assert iterator.completed_response is None
|
||||
|
||||
def test_process_chunk_handles_done_marker(self):
|
||||
"""
|
||||
Test that _process_chunk correctly handles the [DONE] marker.
|
||||
"""
|
||||
# Mock dependencies
|
||||
mock_response = Mock()
|
||||
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
|
||||
mock_config = Mock(spec=BaseResponsesAPIConfig)
|
||||
|
||||
# Create the iterator instance
|
||||
iterator = BaseResponsesAPIStreamingIterator(
|
||||
response=mock_response,
|
||||
model="gpt-4",
|
||||
responses_api_provider_config=mock_config,
|
||||
logging_obj=mock_logging_obj
|
||||
)
|
||||
|
||||
# Test with [DONE] marker
|
||||
result = iterator._process_chunk(STREAM_SSE_DONE_STRING)
|
||||
|
||||
# Should return None and set finished flag
|
||||
assert result is None
|
||||
assert iterator.finished is True
|
||||
|
||||
def test_process_chunk_handles_empty_chunk(self):
|
||||
"""
|
||||
Test that _process_chunk correctly handles empty or None chunks.
|
||||
"""
|
||||
# Mock dependencies
|
||||
mock_response = Mock()
|
||||
mock_logging_obj = Mock(spec=LiteLLMLoggingObj)
|
||||
mock_config = Mock(spec=BaseResponsesAPIConfig)
|
||||
|
||||
# Create the iterator instance
|
||||
iterator = BaseResponsesAPIStreamingIterator(
|
||||
response=mock_response,
|
||||
model="gpt-4",
|
||||
responses_api_provider_config=mock_config,
|
||||
logging_obj=mock_logging_obj
|
||||
)
|
||||
|
||||
# Test with empty chunk
|
||||
result = iterator._process_chunk("")
|
||||
assert result is None
|
||||
|
||||
# Test with None chunk
|
||||
result = iterator._process_chunk(None)
|
||||
assert result is None
|
||||
@ -253,3 +253,34 @@ class TestReasoningContentFinalResponse:
|
||||
]
|
||||
assert len(reasoning_items) == 1, "Should have exactly one reasoning item"
|
||||
assert reasoning_items[0].content[0].text == "Reasoning for first answer"
|
||||
|
||||
|
||||
def test_streaming_chunk_id_raw():
|
||||
"""Test that streaming chunk IDs are raw (not encoded) to match OpenAI format"""
|
||||
chunk = ModelResponseStream(
|
||||
id="chunk-123",
|
||||
created=1234567890,
|
||||
model="test-model",
|
||||
object="chat.completion.chunk",
|
||||
choices=[
|
||||
StreamingChoices(
|
||||
finish_reason=None,
|
||||
index=0,
|
||||
delta=Delta(content="Hello", role="assistant"),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
iterator = LiteLLMCompletionStreamingIterator(
|
||||
litellm_custom_stream_wrapper=AsyncMock(),
|
||||
request_input="Test input",
|
||||
responses_api_request={},
|
||||
custom_llm_provider="openai",
|
||||
litellm_metadata={"model_info": {"id": "gpt-4"}},
|
||||
)
|
||||
|
||||
result = iterator._transform_chat_completion_chunk_to_response_api_chunk(chunk)
|
||||
|
||||
# Streaming chunk IDs should be raw (like OpenAI's msg_xxx format)
|
||||
assert result.item_id == "chunk-123" # Should be raw, not encoded
|
||||
assert not result.item_id.startswith("resp_") # Should NOT have resp_ prefix
|
||||
|
||||
Loading…
Reference in New Issue
Block a user