fix: passthrough endpoints duplicate logs (#29598)

* fix duplicate cost callbacks for anthropic streaming pass-through

Two bugs caused _PROXY_track_cost_callback to see stream=True +
complete_streaming_response=None on every streaming pass-through request,
making the dedup guard in dispatch_success_handlers permanently inactive:

1. pass_through_endpoints.py created the Logging object with stream=False
   for all requests. _is_assembled_stream_success short-circuits on
   self.stream is not True, so has_dispatched_final_stream_success was
   never set and any second dispatch went through unchecked.
   Fix: set logging_obj.stream = True after stream detection.

2. _create_anthropic_response_logging_payload set complete_streaming_response
   inside the try block after litellm.completion_cost(), so a pricing error
   caused an early return without setting it on model_call_details.
   Fix: set complete_streaming_response before the try block.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix stream

* add stream to logging obj

* test(pass_through): give mock logging object a real model_call_details dict

The anthropic passthrough logging payload now records the assembled
response on model_call_details before cost calculation, which requires
model_call_details to support item assignment. In production it is always
a dict; the existing unit test stubbed the logging object with a bare Mock
whose attribute is not subscriptable, so the new assignment raised
TypeError. Use a real dict to match the production logging object.

* test(pass_through): cover streaming logging-obj stream flag

The streaming branch of pass_through_request that marks the logging object
as streaming (logging_obj.stream and model_call_details["stream"]) had no
unit coverage, so the patch coverage gate flagged it. Add a regression test
that drives a streaming pass-through request through pass_through_request and
asserts the logging object is flagged as a stream before dispatch.

* test(pass_through): cover SSE-response stream flag fallback branch

The auto-detected streaming branch of pass_through_request (when a request
that was not flagged as streaming returns a text/event-stream response) sets
logging_obj.stream and model_call_details["stream"] but had no unit coverage,
so the codecov patch gate failed at 60%. Drive a non-streaming pass-through
request whose upstream response is SSE through pass_through_request and assert
the logging object is flagged as a stream before dispatch.

* fix(pass_through): gate complete_streaming_response on stream flag

perform_redaction only scrubs complete_streaming_response when
model_call_details["stream"] is True. Setting it unconditionally for
non-streaming Anthropic pass-through responses left the assembled
response unredacted in model_call_details, which is handed to logging
callbacks as kwargs when message logging is disabled. Only record it for
actual streaming responses so redaction always applies.

---------

Co-authored-by: mubashir1osmani <mubashir.osmani777@gmail.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mateo Wang 2026-06-03 12:13:02 -07:00 committed by GitHub
parent 5119b9462f
commit 2bbdbfa5c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 471 additions and 0 deletions

View File

@ -114,6 +114,13 @@ class AnthropicPassthroughLoggingHandler:
handles streaming and non-streaming responses
"""
# Only record complete_streaming_response for actual streaming responses.
# perform_redaction scrubs this field only when stream is True, so setting
# it on a non-streaming response would bypass message redaction.
if logging_obj.model_call_details.get("stream") is True:
logging_obj.model_call_details["complete_streaming_response"] = (
litellm_model_response
)
try:
# Get custom_llm_provider from logging object if available (e.g., azure_ai for Azure Anthropic)
custom_llm_provider = logging_obj.model_call_details.get(

View File

@ -1061,6 +1061,9 @@ async def pass_through_request( # noqa: PLR0915
)
if stream:
logging_obj.stream = True
logging_obj.model_call_details["stream"] = True
if is_multipart:
response = (
await HttpPassThroughEndpointHelpers.make_multipart_http_request(
@ -1139,6 +1142,9 @@ async def pass_through_request( # noqa: PLR0915
verbose_proxy_logger.debug("response.headers= %s", response.headers)
if _is_streaming_response(response) is True:
logging_obj.stream = True
logging_obj.model_call_details["stream"] = True
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:

View File

@ -318,6 +318,7 @@ def test_handle_logging_anthropic_collected_chunks(all_chunks):
from litellm.types.utils import ModelResponse
litellm_logging_obj = Mock()
litellm_logging_obj.model_call_details = {}
pass_through_logging_obj = Mock()
sent_args = {

View File

@ -1043,3 +1043,335 @@ class TestPureTextFastPathParity:
AnthropicPassthroughLoggingHandler._collapse_pure_text_chunks(all_chunks)
is None
)
class TestStreamFalseDeduplication:
"""
Regression tests for the duplicate-callback bug where a streaming pass-through
request had stream=False hardcoded on its Logging object.
Before the fix:
- logging_obj.stream was always False for pass-through requests
- _is_assembled_stream_success() checked `self.stream is not True` and returned
False immediately, so has_dispatched_final_stream_success was never set
- Any second dispatch_success_handlers call went through unchecked
After the fix:
- pass_through_endpoints.py sets logging_obj.stream = True after detecting stream
- _create_anthropic_response_logging_payload sets complete_streaming_response on
model_call_details so callbacks see the correct assembled response state
- _is_assembled_stream_success returns True, dedup guard fires on first dispatch
"""
@staticmethod
def _sse(event, data):
return f"event: {event}\ndata: {json.dumps(data)}\n\n".encode()
@staticmethod
def _make_logging_obj(stream: bool = False) -> LiteLLMLoggingObj:
logging_obj = LiteLLMLoggingObj(
model="claude-3-5-sonnet-20241022",
messages=[{"role": "user", "content": "hello"}],
stream=stream,
call_type="pass_through_endpoint",
start_time=datetime.now(),
litellm_call_id="test-call-id",
function_id="1245",
)
return logging_obj
@staticmethod
def _build_chunks():
frames = [
TestStreamFalseDeduplication._sse(
"message_start",
{
"type": "message_start",
"message": {
"id": "msg_abc",
"type": "message",
"role": "assistant",
"model": "claude-3-5-sonnet-20241022",
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 10, "output_tokens": 0},
},
},
),
TestStreamFalseDeduplication._sse(
"content_block_start",
{
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
},
),
TestStreamFalseDeduplication._sse(
"content_block_delta",
{
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": "Hello"},
},
),
TestStreamFalseDeduplication._sse(
"content_block_stop", {"type": "content_block_stop", "index": 0}
),
TestStreamFalseDeduplication._sse(
"message_delta",
{
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"output_tokens": 5},
},
),
TestStreamFalseDeduplication._sse("message_stop", {"type": "message_stop"}),
]
from litellm.proxy.pass_through_endpoints.streaming_handler import (
PassThroughStreamingHandler,
)
return PassThroughStreamingHandler._convert_raw_bytes_to_str_lines(frames)
def test_complete_streaming_response_set_on_model_call_details(self):
"""
After the fix, _create_anthropic_response_logging_payload must set
complete_streaming_response on logging_obj.model_call_details so that
callbacks like _PROXY_track_cost_callback see the assembled response
instead of None.
Before the fix: model_call_details had no complete_streaming_response key.
The log showed: "kwargs stream: True + complete streaming response: None"
"""
from litellm.types.passthrough_endpoints.pass_through_endpoints import (
EndpointType,
)
# pass_through_request sets the stream flag before the streaming handler
# reconstructs the response; mirror that here.
logging_obj = self._make_logging_obj(stream=True)
logging_obj.model_call_details["stream"] = True
all_chunks = list(self._build_chunks())
result = AnthropicPassthroughLoggingHandler._handle_logging_anthropic_collected_chunks(
litellm_logging_obj=logging_obj,
passthrough_success_handler_obj=MagicMock(),
url_route="/anthropic/v1/messages",
request_body={"model": "claude-3-5-sonnet-20241022", "stream": True},
endpoint_type=EndpointType.ANTHROPIC,
start_time=datetime.now(),
all_chunks=all_chunks,
end_time=datetime.now(),
)
# The assembled response must be stored on model_call_details so callbacks
# can identify this as a completed streaming call, not an in-progress one.
assert (
logging_obj.model_call_details.get("complete_streaming_response")
is not None
), "complete_streaming_response must be set on model_call_details after assembly"
# The returned result must match what was stored
assert result["result"] is logging_obj.model_call_details.get(
"complete_streaming_response"
)
def test_dedup_guard_fires_when_stream_true_on_logging_obj(self):
"""
When logging_obj.stream is True (set by pass_through_endpoints.py after
detecting a streaming request), dispatch_success_handlers must set
has_dispatched_final_stream_success=True on the first call so that any
second call is a no-op.
This is the _is_assembled_stream_success gate: with stream=False it
always returned False and the guard was permanently disabled.
"""
from litellm.types.passthrough_endpoints.pass_through_endpoints import (
EndpointType,
)
from litellm.types.utils import ModelResponse
# Simulate what pass_through_endpoints.py now does after stream detection
logging_obj = self._make_logging_obj(stream=False)
logging_obj.stream = True # fix applied
logging_obj.model_call_details["stream"] = True
# Simulate what _create_anthropic_response_logging_payload now does
mock_response = ModelResponse(model="claude-3-5-sonnet-20241022")
logging_obj.model_call_details["complete_streaming_response"] = mock_response
assert logging_obj._is_assembled_stream_success(result=mock_response) is True
# First dispatch sets the flag
assert not logging_obj.model_call_details.get(
"has_dispatched_final_stream_success"
)
logging_obj.model_call_details["has_dispatched_final_stream_success"] = True
# Second dispatch would be blocked — simulate the guard check
would_skip = bool(
logging_obj._is_assembled_stream_success(result=mock_response)
and logging_obj.model_call_details.get(
"has_dispatched_final_stream_success"
)
)
assert would_skip is True, (
"Dedup guard must block a second dispatch_success_handlers call for the "
"same assembled streaming response"
)
def test_sse_fallback_path_sets_stream_true_for_dedup(self):
"""
When a nominally non-streaming request receives an SSE response
(_is_streaming_response returns True), the fallback branch in
pass_through_endpoints.py must set logging_obj.stream = True so the
dedup guard activates.
Before the fix the fallback path never set stream=True, so
_is_assembled_stream_success always returned False and duplicate
callback dispatches were never blocked.
"""
from litellm.types.utils import ModelResponse
# logging_obj starts with stream=False, as created before the request
logging_obj = self._make_logging_obj(stream=False)
assert logging_obj._is_assembled_stream_success(result=MagicMock()) is False
# Simulate what the SSE fallback branch in pass_through_endpoints.py now does
logging_obj.stream = True
logging_obj.model_call_details["stream"] = True
mock_response = ModelResponse(model="claude-3-5-sonnet-20241022")
logging_obj.model_call_details["complete_streaming_response"] = mock_response
# With stream=True the dedup guard must be active
assert logging_obj._is_assembled_stream_success(result=mock_response) is True
logging_obj.model_call_details["has_dispatched_final_stream_success"] = True
would_skip = bool(
logging_obj._is_assembled_stream_success(result=mock_response)
and logging_obj.model_call_details.get(
"has_dispatched_final_stream_success"
)
)
assert would_skip is True
def test_stream_false_logging_obj_bypasses_dedup_guard(self):
"""
Demonstrates the pre-fix state: with stream=False on the logging object,
_is_assembled_stream_success always returns False regardless of whether
complete_streaming_response is set. This means the dedup guard can never
fire, so duplicate dispatches go through unchecked.
This test documents the old broken behavior so the fix is clearly justified.
"""
from litellm.types.utils import ModelResponse
logging_obj = self._make_logging_obj(stream=False)
mock_response = ModelResponse(model="claude-3-5-sonnet-20241022")
logging_obj.model_call_details["complete_streaming_response"] = mock_response
# With stream=False, _is_assembled_stream_success returns False even though
# complete_streaming_response is present — the guard is permanently disabled.
assert logging_obj._is_assembled_stream_success(result=mock_response) is False
class TestNonStreamingResponseRedaction:
"""
Regression tests ensuring _create_anthropic_response_logging_payload only sets
complete_streaming_response for streaming responses. perform_redaction scrubs
that field exclusively when model_call_details["stream"] is True, so storing it
on a non-streaming response would deliver the unredacted response to logging
callbacks when message logging is disabled.
"""
@staticmethod
def _make_logging_obj(stream: bool) -> LiteLLMLoggingObj:
logging_obj = LiteLLMLoggingObj(
model="claude-3-5-sonnet-20241022",
messages=[{"role": "user", "content": "hello"}],
stream=stream,
call_type="pass_through_endpoint",
start_time=datetime.now(),
litellm_call_id="test-call-id",
function_id="1245",
)
# pass_through_request mirrors the stream flag onto model_call_details,
# which is the key perform_redaction inspects.
logging_obj.model_call_details["stream"] = stream
return logging_obj
def test_non_streaming_does_not_set_complete_streaming_response(self):
from litellm.types.utils import ModelResponse
logging_obj = self._make_logging_obj(stream=False)
response = ModelResponse(model="claude-3-5-sonnet-20241022")
AnthropicPassthroughLoggingHandler._create_anthropic_response_logging_payload(
litellm_model_response=response,
model="claude-3-5-sonnet-20241022",
kwargs={},
start_time=datetime.now(),
end_time=datetime.now(),
logging_obj=logging_obj,
)
assert (
"complete_streaming_response" not in logging_obj.model_call_details
), "non-streaming responses must not populate complete_streaming_response"
def test_streaming_sets_complete_streaming_response(self):
from litellm.types.utils import ModelResponse
logging_obj = self._make_logging_obj(stream=True)
response = ModelResponse(model="claude-3-5-sonnet-20241022")
AnthropicPassthroughLoggingHandler._create_anthropic_response_logging_payload(
litellm_model_response=response,
model="claude-3-5-sonnet-20241022",
kwargs={},
start_time=datetime.now(),
end_time=datetime.now(),
logging_obj=logging_obj,
)
assert (
logging_obj.model_call_details.get("complete_streaming_response")
is response
)
def test_non_streaming_response_is_redacted_when_message_logging_off(self):
from litellm.litellm_core_utils.redact_messages import (
redact_message_input_output_from_logging,
)
from litellm.types.utils import Choices, Message, ModelResponse
logging_obj = self._make_logging_obj(stream=False)
response = ModelResponse(
model="claude-3-5-sonnet-20241022",
choices=[Choices(message=Message(role="assistant", content="secret"))],
)
AnthropicPassthroughLoggingHandler._create_anthropic_response_logging_payload(
litellm_model_response=response,
model="claude-3-5-sonnet-20241022",
kwargs={},
start_time=datetime.now(),
end_time=datetime.now(),
logging_obj=logging_obj,
)
logging_obj.model_call_details["litellm_params"] = {
"metadata": {"headers": {"x-litellm-enable-message-redaction": True}}
}
redacted = redact_message_input_output_from_logging(
model_call_details=logging_obj.model_call_details,
result=response,
)
leaked = logging_obj.model_call_details.get("complete_streaming_response")
assert leaked is None
assert redacted.choices[0].message.content == "redacted-by-litellm"

View File

@ -1050,6 +1050,131 @@ async def test_pass_through_request_contains_proxy_server_request_in_kwargs():
assert metadata["user_api_key_user_id"] == "test-user-id"
@pytest.mark.asyncio
async def test_pass_through_request_streaming_marks_logging_obj_as_stream():
"""
Regression: a streaming pass-through request must flag its logging object as
streaming (logging_obj.stream and model_call_details["stream"]) before the
response is dispatched, so cost/success callbacks treat it as a stream and the
streaming dedup guard fires instead of double-logging.
"""
with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging:
with patch(
"litellm.proxy.pass_through_endpoints.pass_through_endpoints.get_async_httpx_client"
) as mock_get_client:
with patch(
"litellm.proxy.pass_through_endpoints.pass_through_endpoints.PassThroughStreamingHandler.chunk_processor"
) as mock_chunk_processor:
mock_proxy_logging.pre_call_hook = AsyncMock(
return_value={"model": "claude-3", "stream": True}
)
mock_proxy_logging.post_call_failure_hook = AsyncMock()
upstream_response = MagicMock()
upstream_response.status_code = 200
upstream_response.headers = {}
upstream_response.raise_for_status = MagicMock()
async_client = MagicMock()
async_client.build_request = MagicMock(return_value=MagicMock())
async_client.send = AsyncMock(return_value=upstream_response)
mock_get_client.return_value = MagicMock(client=async_client)
async def _empty_chunks(*args, **kwargs):
return
yield # pragma: no cover
mock_chunk_processor.return_value = _empty_chunks()
mock_request = MagicMock(spec=Request)
mock_request.method = "POST"
mock_request.url = "http://test-proxy.com/v1/messages"
mock_request.body = AsyncMock(
return_value=b'{"model": "claude-3", "stream": true}'
)
mock_request.headers = Headers({})
mock_request.query_params = QueryParams({})
await pass_through_request(
request=mock_request,
target="http://target-api.com/v1/messages",
custom_headers={},
user_api_key_dict=MagicMock(),
stream=True,
)
async_client.send.assert_awaited_once()
assert async_client.send.call_args.kwargs["stream"] is True
mock_chunk_processor.assert_called_once()
logging_obj = mock_chunk_processor.call_args.kwargs[
"litellm_logging_obj"
]
assert logging_obj.stream is True
assert logging_obj.model_call_details["stream"] is True
@pytest.mark.asyncio
async def test_pass_through_request_sse_response_marks_logging_obj_as_stream():
"""
Regression: a request that is not flagged as streaming up front but whose
upstream response comes back as an SSE stream (content-type text/event-stream)
must still flag its logging object as streaming before dispatch. Otherwise the
cost/success callbacks treat the assembled stream as a non-stream and the dedup
guard never fires, double-logging the request.
"""
with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging:
with patch(
"litellm.proxy.pass_through_endpoints.pass_through_endpoints.get_async_httpx_client"
) as mock_get_client:
with patch(
"litellm.proxy.pass_through_endpoints.pass_through_endpoints.PassThroughStreamingHandler.chunk_processor"
) as mock_chunk_processor:
mock_proxy_logging.pre_call_hook = AsyncMock(
return_value={"model": "claude-3"}
)
mock_proxy_logging.post_call_failure_hook = AsyncMock()
upstream_response = MagicMock()
upstream_response.status_code = 200
upstream_response.headers = {"content-type": "text/event-stream"}
upstream_response.raise_for_status = MagicMock()
async_client = MagicMock()
async_client.request = AsyncMock(return_value=upstream_response)
mock_get_client.return_value = MagicMock(client=async_client)
async def _empty_chunks(*args, **kwargs):
return
yield # pragma: no cover
mock_chunk_processor.return_value = _empty_chunks()
mock_request = MagicMock(spec=Request)
mock_request.method = "POST"
mock_request.url = "http://test-proxy.com/v1/messages"
mock_request.body = AsyncMock(return_value=b'{"model": "claude-3"}')
mock_request.headers = Headers({})
mock_request.query_params = QueryParams({})
await pass_through_request(
request=mock_request,
target="http://target-api.com/v1/messages",
custom_headers={},
user_api_key_dict=MagicMock(),
stream=False,
)
async_client.request.assert_awaited_once()
mock_chunk_processor.assert_called_once()
logging_obj = mock_chunk_processor.call_args.kwargs[
"litellm_logging_obj"
]
assert logging_obj.stream is True
assert logging_obj.model_call_details["stream"] is True
@pytest.mark.asyncio
async def test_create_pass_through_endpoint():
"""