From 2bbdbfa5c348e198eb21461731c784e12897f01f Mon Sep 17 00:00:00 2001 From: Mateo Wang <277851410+mateo-berri@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:13:02 -0700 Subject: [PATCH] fix: passthrough endpoints duplicate logs (#29598) * fix duplicate cost callbacks for anthropic streaming pass-through Two bugs caused _PROXY_track_cost_callback to see stream=True + complete_streaming_response=None on every streaming pass-through request, making the dedup guard in dispatch_success_handlers permanently inactive: 1. pass_through_endpoints.py created the Logging object with stream=False for all requests. _is_assembled_stream_success short-circuits on self.stream is not True, so has_dispatched_final_stream_success was never set and any second dispatch went through unchecked. Fix: set logging_obj.stream = True after stream detection. 2. _create_anthropic_response_logging_payload set complete_streaming_response inside the try block after litellm.completion_cost(), so a pricing error caused an early return without setting it on model_call_details. Fix: set complete_streaming_response before the try block. Co-Authored-By: Claude Sonnet 4.6 * fix stream * add stream to logging obj * test(pass_through): give mock logging object a real model_call_details dict The anthropic passthrough logging payload now records the assembled response on model_call_details before cost calculation, which requires model_call_details to support item assignment. In production it is always a dict; the existing unit test stubbed the logging object with a bare Mock whose attribute is not subscriptable, so the new assignment raised TypeError. Use a real dict to match the production logging object. * test(pass_through): cover streaming logging-obj stream flag The streaming branch of pass_through_request that marks the logging object as streaming (logging_obj.stream and model_call_details["stream"]) had no unit coverage, so the patch coverage gate flagged it. Add a regression test that drives a streaming pass-through request through pass_through_request and asserts the logging object is flagged as a stream before dispatch. * test(pass_through): cover SSE-response stream flag fallback branch The auto-detected streaming branch of pass_through_request (when a request that was not flagged as streaming returns a text/event-stream response) sets logging_obj.stream and model_call_details["stream"] but had no unit coverage, so the codecov patch gate failed at 60%. Drive a non-streaming pass-through request whose upstream response is SSE through pass_through_request and assert the logging object is flagged as a stream before dispatch. * fix(pass_through): gate complete_streaming_response on stream flag perform_redaction only scrubs complete_streaming_response when model_call_details["stream"] is True. Setting it unconditionally for non-streaming Anthropic pass-through responses left the assembled response unredacted in model_call_details, which is handed to logging callbacks as kwargs when message logging is disabled. Only record it for actual streaming responses so redaction always applies. --------- Co-authored-by: mubashir1osmani Co-authored-by: Claude Sonnet 4.6 --- .../anthropic_passthrough_logging_handler.py | 7 + .../pass_through_endpoints.py | 6 + .../test_unit_test_anthropic_pass_through.py | 1 + ...t_anthropic_passthrough_logging_handler.py | 332 ++++++++++++++++++ .../test_pass_through_endpoints.py | 125 +++++++ 5 files changed, 471 insertions(+) diff --git a/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py b/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py index 3be26eb572..a94672f948 100644 --- a/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py +++ b/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py @@ -114,6 +114,13 @@ class AnthropicPassthroughLoggingHandler: handles streaming and non-streaming responses """ + # Only record complete_streaming_response for actual streaming responses. + # perform_redaction scrubs this field only when stream is True, so setting + # it on a non-streaming response would bypass message redaction. + if logging_obj.model_call_details.get("stream") is True: + logging_obj.model_call_details["complete_streaming_response"] = ( + litellm_model_response + ) try: # Get custom_llm_provider from logging object if available (e.g., azure_ai for Azure Anthropic) custom_llm_provider = logging_obj.model_call_details.get( diff --git a/litellm/proxy/pass_through_endpoints/pass_through_endpoints.py b/litellm/proxy/pass_through_endpoints/pass_through_endpoints.py index 985785ad77..e49e1302ab 100644 --- a/litellm/proxy/pass_through_endpoints/pass_through_endpoints.py +++ b/litellm/proxy/pass_through_endpoints/pass_through_endpoints.py @@ -1061,6 +1061,9 @@ async def pass_through_request( # noqa: PLR0915 ) if stream: + logging_obj.stream = True + logging_obj.model_call_details["stream"] = True + if is_multipart: response = ( await HttpPassThroughEndpointHelpers.make_multipart_http_request( @@ -1139,6 +1142,9 @@ async def pass_through_request( # noqa: PLR0915 verbose_proxy_logger.debug("response.headers= %s", response.headers) if _is_streaming_response(response) is True: + logging_obj.stream = True + logging_obj.model_call_details["stream"] = True + try: response.raise_for_status() except httpx.HTTPStatusError as e: diff --git a/tests/pass_through_unit_tests/test_unit_test_anthropic_pass_through.py b/tests/pass_through_unit_tests/test_unit_test_anthropic_pass_through.py index 455c72ff63..5ab0319da4 100644 --- a/tests/pass_through_unit_tests/test_unit_test_anthropic_pass_through.py +++ b/tests/pass_through_unit_tests/test_unit_test_anthropic_pass_through.py @@ -318,6 +318,7 @@ def test_handle_logging_anthropic_collected_chunks(all_chunks): from litellm.types.utils import ModelResponse litellm_logging_obj = Mock() + litellm_logging_obj.model_call_details = {} pass_through_logging_obj = Mock() sent_args = { diff --git a/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py b/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py index 0a9e303103..f8b6fbde3d 100644 --- a/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py +++ b/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py @@ -1043,3 +1043,335 @@ class TestPureTextFastPathParity: AnthropicPassthroughLoggingHandler._collapse_pure_text_chunks(all_chunks) is None ) + + +class TestStreamFalseDeduplication: + """ + Regression tests for the duplicate-callback bug where a streaming pass-through + request had stream=False hardcoded on its Logging object. + + Before the fix: + - logging_obj.stream was always False for pass-through requests + - _is_assembled_stream_success() checked `self.stream is not True` and returned + False immediately, so has_dispatched_final_stream_success was never set + - Any second dispatch_success_handlers call went through unchecked + + After the fix: + - pass_through_endpoints.py sets logging_obj.stream = True after detecting stream + - _create_anthropic_response_logging_payload sets complete_streaming_response on + model_call_details so callbacks see the correct assembled response state + - _is_assembled_stream_success returns True, dedup guard fires on first dispatch + """ + + @staticmethod + def _sse(event, data): + return f"event: {event}\ndata: {json.dumps(data)}\n\n".encode() + + @staticmethod + def _make_logging_obj(stream: bool = False) -> LiteLLMLoggingObj: + logging_obj = LiteLLMLoggingObj( + model="claude-3-5-sonnet-20241022", + messages=[{"role": "user", "content": "hello"}], + stream=stream, + call_type="pass_through_endpoint", + start_time=datetime.now(), + litellm_call_id="test-call-id", + function_id="1245", + ) + return logging_obj + + @staticmethod + def _build_chunks(): + frames = [ + TestStreamFalseDeduplication._sse( + "message_start", + { + "type": "message_start", + "message": { + "id": "msg_abc", + "type": "message", + "role": "assistant", + "model": "claude-3-5-sonnet-20241022", + "content": [], + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": 10, "output_tokens": 0}, + }, + }, + ), + TestStreamFalseDeduplication._sse( + "content_block_start", + { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""}, + }, + ), + TestStreamFalseDeduplication._sse( + "content_block_delta", + { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": "Hello"}, + }, + ), + TestStreamFalseDeduplication._sse( + "content_block_stop", {"type": "content_block_stop", "index": 0} + ), + TestStreamFalseDeduplication._sse( + "message_delta", + { + "type": "message_delta", + "delta": {"stop_reason": "end_turn", "stop_sequence": None}, + "usage": {"output_tokens": 5}, + }, + ), + TestStreamFalseDeduplication._sse("message_stop", {"type": "message_stop"}), + ] + from litellm.proxy.pass_through_endpoints.streaming_handler import ( + PassThroughStreamingHandler, + ) + + return PassThroughStreamingHandler._convert_raw_bytes_to_str_lines(frames) + + def test_complete_streaming_response_set_on_model_call_details(self): + """ + After the fix, _create_anthropic_response_logging_payload must set + complete_streaming_response on logging_obj.model_call_details so that + callbacks like _PROXY_track_cost_callback see the assembled response + instead of None. + + Before the fix: model_call_details had no complete_streaming_response key. + The log showed: "kwargs stream: True + complete streaming response: None" + """ + from litellm.types.passthrough_endpoints.pass_through_endpoints import ( + EndpointType, + ) + + # pass_through_request sets the stream flag before the streaming handler + # reconstructs the response; mirror that here. + logging_obj = self._make_logging_obj(stream=True) + logging_obj.model_call_details["stream"] = True + all_chunks = list(self._build_chunks()) + + result = AnthropicPassthroughLoggingHandler._handle_logging_anthropic_collected_chunks( + litellm_logging_obj=logging_obj, + passthrough_success_handler_obj=MagicMock(), + url_route="/anthropic/v1/messages", + request_body={"model": "claude-3-5-sonnet-20241022", "stream": True}, + endpoint_type=EndpointType.ANTHROPIC, + start_time=datetime.now(), + all_chunks=all_chunks, + end_time=datetime.now(), + ) + + # The assembled response must be stored on model_call_details so callbacks + # can identify this as a completed streaming call, not an in-progress one. + assert ( + logging_obj.model_call_details.get("complete_streaming_response") + is not None + ), "complete_streaming_response must be set on model_call_details after assembly" + + # The returned result must match what was stored + assert result["result"] is logging_obj.model_call_details.get( + "complete_streaming_response" + ) + + def test_dedup_guard_fires_when_stream_true_on_logging_obj(self): + """ + When logging_obj.stream is True (set by pass_through_endpoints.py after + detecting a streaming request), dispatch_success_handlers must set + has_dispatched_final_stream_success=True on the first call so that any + second call is a no-op. + + This is the _is_assembled_stream_success gate: with stream=False it + always returned False and the guard was permanently disabled. + """ + from litellm.types.passthrough_endpoints.pass_through_endpoints import ( + EndpointType, + ) + from litellm.types.utils import ModelResponse + + # Simulate what pass_through_endpoints.py now does after stream detection + logging_obj = self._make_logging_obj(stream=False) + logging_obj.stream = True # fix applied + logging_obj.model_call_details["stream"] = True + + # Simulate what _create_anthropic_response_logging_payload now does + mock_response = ModelResponse(model="claude-3-5-sonnet-20241022") + logging_obj.model_call_details["complete_streaming_response"] = mock_response + + assert logging_obj._is_assembled_stream_success(result=mock_response) is True + + # First dispatch sets the flag + assert not logging_obj.model_call_details.get( + "has_dispatched_final_stream_success" + ) + logging_obj.model_call_details["has_dispatched_final_stream_success"] = True + + # Second dispatch would be blocked — simulate the guard check + would_skip = bool( + logging_obj._is_assembled_stream_success(result=mock_response) + and logging_obj.model_call_details.get( + "has_dispatched_final_stream_success" + ) + ) + assert would_skip is True, ( + "Dedup guard must block a second dispatch_success_handlers call for the " + "same assembled streaming response" + ) + + def test_sse_fallback_path_sets_stream_true_for_dedup(self): + """ + When a nominally non-streaming request receives an SSE response + (_is_streaming_response returns True), the fallback branch in + pass_through_endpoints.py must set logging_obj.stream = True so the + dedup guard activates. + + Before the fix the fallback path never set stream=True, so + _is_assembled_stream_success always returned False and duplicate + callback dispatches were never blocked. + """ + from litellm.types.utils import ModelResponse + + # logging_obj starts with stream=False, as created before the request + logging_obj = self._make_logging_obj(stream=False) + assert logging_obj._is_assembled_stream_success(result=MagicMock()) is False + + # Simulate what the SSE fallback branch in pass_through_endpoints.py now does + logging_obj.stream = True + logging_obj.model_call_details["stream"] = True + + mock_response = ModelResponse(model="claude-3-5-sonnet-20241022") + logging_obj.model_call_details["complete_streaming_response"] = mock_response + + # With stream=True the dedup guard must be active + assert logging_obj._is_assembled_stream_success(result=mock_response) is True + + logging_obj.model_call_details["has_dispatched_final_stream_success"] = True + + would_skip = bool( + logging_obj._is_assembled_stream_success(result=mock_response) + and logging_obj.model_call_details.get( + "has_dispatched_final_stream_success" + ) + ) + assert would_skip is True + + def test_stream_false_logging_obj_bypasses_dedup_guard(self): + """ + Demonstrates the pre-fix state: with stream=False on the logging object, + _is_assembled_stream_success always returns False regardless of whether + complete_streaming_response is set. This means the dedup guard can never + fire, so duplicate dispatches go through unchecked. + + This test documents the old broken behavior so the fix is clearly justified. + """ + from litellm.types.utils import ModelResponse + + logging_obj = self._make_logging_obj(stream=False) + mock_response = ModelResponse(model="claude-3-5-sonnet-20241022") + logging_obj.model_call_details["complete_streaming_response"] = mock_response + + # With stream=False, _is_assembled_stream_success returns False even though + # complete_streaming_response is present — the guard is permanently disabled. + assert logging_obj._is_assembled_stream_success(result=mock_response) is False + + +class TestNonStreamingResponseRedaction: + """ + Regression tests ensuring _create_anthropic_response_logging_payload only sets + complete_streaming_response for streaming responses. perform_redaction scrubs + that field exclusively when model_call_details["stream"] is True, so storing it + on a non-streaming response would deliver the unredacted response to logging + callbacks when message logging is disabled. + """ + + @staticmethod + def _make_logging_obj(stream: bool) -> LiteLLMLoggingObj: + logging_obj = LiteLLMLoggingObj( + model="claude-3-5-sonnet-20241022", + messages=[{"role": "user", "content": "hello"}], + stream=stream, + call_type="pass_through_endpoint", + start_time=datetime.now(), + litellm_call_id="test-call-id", + function_id="1245", + ) + # pass_through_request mirrors the stream flag onto model_call_details, + # which is the key perform_redaction inspects. + logging_obj.model_call_details["stream"] = stream + return logging_obj + + def test_non_streaming_does_not_set_complete_streaming_response(self): + from litellm.types.utils import ModelResponse + + logging_obj = self._make_logging_obj(stream=False) + response = ModelResponse(model="claude-3-5-sonnet-20241022") + + AnthropicPassthroughLoggingHandler._create_anthropic_response_logging_payload( + litellm_model_response=response, + model="claude-3-5-sonnet-20241022", + kwargs={}, + start_time=datetime.now(), + end_time=datetime.now(), + logging_obj=logging_obj, + ) + + assert ( + "complete_streaming_response" not in logging_obj.model_call_details + ), "non-streaming responses must not populate complete_streaming_response" + + def test_streaming_sets_complete_streaming_response(self): + from litellm.types.utils import ModelResponse + + logging_obj = self._make_logging_obj(stream=True) + response = ModelResponse(model="claude-3-5-sonnet-20241022") + + AnthropicPassthroughLoggingHandler._create_anthropic_response_logging_payload( + litellm_model_response=response, + model="claude-3-5-sonnet-20241022", + kwargs={}, + start_time=datetime.now(), + end_time=datetime.now(), + logging_obj=logging_obj, + ) + + assert ( + logging_obj.model_call_details.get("complete_streaming_response") + is response + ) + + def test_non_streaming_response_is_redacted_when_message_logging_off(self): + from litellm.litellm_core_utils.redact_messages import ( + redact_message_input_output_from_logging, + ) + from litellm.types.utils import Choices, Message, ModelResponse + + logging_obj = self._make_logging_obj(stream=False) + response = ModelResponse( + model="claude-3-5-sonnet-20241022", + choices=[Choices(message=Message(role="assistant", content="secret"))], + ) + + AnthropicPassthroughLoggingHandler._create_anthropic_response_logging_payload( + litellm_model_response=response, + model="claude-3-5-sonnet-20241022", + kwargs={}, + start_time=datetime.now(), + end_time=datetime.now(), + logging_obj=logging_obj, + ) + + logging_obj.model_call_details["litellm_params"] = { + "metadata": {"headers": {"x-litellm-enable-message-redaction": True}} + } + + redacted = redact_message_input_output_from_logging( + model_call_details=logging_obj.model_call_details, + result=response, + ) + + leaked = logging_obj.model_call_details.get("complete_streaming_response") + assert leaked is None + assert redacted.choices[0].message.content == "redacted-by-litellm" diff --git a/tests/test_litellm/proxy/pass_through_endpoints/test_pass_through_endpoints.py b/tests/test_litellm/proxy/pass_through_endpoints/test_pass_through_endpoints.py index 61299e2662..89c57bcced 100644 --- a/tests/test_litellm/proxy/pass_through_endpoints/test_pass_through_endpoints.py +++ b/tests/test_litellm/proxy/pass_through_endpoints/test_pass_through_endpoints.py @@ -1050,6 +1050,131 @@ async def test_pass_through_request_contains_proxy_server_request_in_kwargs(): assert metadata["user_api_key_user_id"] == "test-user-id" +@pytest.mark.asyncio +async def test_pass_through_request_streaming_marks_logging_obj_as_stream(): + """ + Regression: a streaming pass-through request must flag its logging object as + streaming (logging_obj.stream and model_call_details["stream"]) before the + response is dispatched, so cost/success callbacks treat it as a stream and the + streaming dedup guard fires instead of double-logging. + """ + with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging: + with patch( + "litellm.proxy.pass_through_endpoints.pass_through_endpoints.get_async_httpx_client" + ) as mock_get_client: + with patch( + "litellm.proxy.pass_through_endpoints.pass_through_endpoints.PassThroughStreamingHandler.chunk_processor" + ) as mock_chunk_processor: + mock_proxy_logging.pre_call_hook = AsyncMock( + return_value={"model": "claude-3", "stream": True} + ) + mock_proxy_logging.post_call_failure_hook = AsyncMock() + + upstream_response = MagicMock() + upstream_response.status_code = 200 + upstream_response.headers = {} + upstream_response.raise_for_status = MagicMock() + + async_client = MagicMock() + async_client.build_request = MagicMock(return_value=MagicMock()) + async_client.send = AsyncMock(return_value=upstream_response) + mock_get_client.return_value = MagicMock(client=async_client) + + async def _empty_chunks(*args, **kwargs): + return + yield # pragma: no cover + + mock_chunk_processor.return_value = _empty_chunks() + + mock_request = MagicMock(spec=Request) + mock_request.method = "POST" + mock_request.url = "http://test-proxy.com/v1/messages" + mock_request.body = AsyncMock( + return_value=b'{"model": "claude-3", "stream": true}' + ) + mock_request.headers = Headers({}) + mock_request.query_params = QueryParams({}) + + await pass_through_request( + request=mock_request, + target="http://target-api.com/v1/messages", + custom_headers={}, + user_api_key_dict=MagicMock(), + stream=True, + ) + + async_client.send.assert_awaited_once() + assert async_client.send.call_args.kwargs["stream"] is True + + mock_chunk_processor.assert_called_once() + logging_obj = mock_chunk_processor.call_args.kwargs[ + "litellm_logging_obj" + ] + assert logging_obj.stream is True + assert logging_obj.model_call_details["stream"] is True + + +@pytest.mark.asyncio +async def test_pass_through_request_sse_response_marks_logging_obj_as_stream(): + """ + Regression: a request that is not flagged as streaming up front but whose + upstream response comes back as an SSE stream (content-type text/event-stream) + must still flag its logging object as streaming before dispatch. Otherwise the + cost/success callbacks treat the assembled stream as a non-stream and the dedup + guard never fires, double-logging the request. + """ + with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging: + with patch( + "litellm.proxy.pass_through_endpoints.pass_through_endpoints.get_async_httpx_client" + ) as mock_get_client: + with patch( + "litellm.proxy.pass_through_endpoints.pass_through_endpoints.PassThroughStreamingHandler.chunk_processor" + ) as mock_chunk_processor: + mock_proxy_logging.pre_call_hook = AsyncMock( + return_value={"model": "claude-3"} + ) + mock_proxy_logging.post_call_failure_hook = AsyncMock() + + upstream_response = MagicMock() + upstream_response.status_code = 200 + upstream_response.headers = {"content-type": "text/event-stream"} + upstream_response.raise_for_status = MagicMock() + + async_client = MagicMock() + async_client.request = AsyncMock(return_value=upstream_response) + mock_get_client.return_value = MagicMock(client=async_client) + + async def _empty_chunks(*args, **kwargs): + return + yield # pragma: no cover + + mock_chunk_processor.return_value = _empty_chunks() + + mock_request = MagicMock(spec=Request) + mock_request.method = "POST" + mock_request.url = "http://test-proxy.com/v1/messages" + mock_request.body = AsyncMock(return_value=b'{"model": "claude-3"}') + mock_request.headers = Headers({}) + mock_request.query_params = QueryParams({}) + + await pass_through_request( + request=mock_request, + target="http://target-api.com/v1/messages", + custom_headers={}, + user_api_key_dict=MagicMock(), + stream=False, + ) + + async_client.request.assert_awaited_once() + + mock_chunk_processor.assert_called_once() + logging_obj = mock_chunk_processor.call_args.kwargs[ + "litellm_logging_obj" + ] + assert logging_obj.stream is True + assert logging_obj.model_call_details["stream"] is True + + @pytest.mark.asyncio async def test_create_pass_through_endpoint(): """