diff --git a/litellm/llms/anthropic/experimental_pass_through/adapters/streaming_iterator.py b/litellm/llms/anthropic/experimental_pass_through/adapters/streaming_iterator.py index bacb9f8ddf..8c20f4c430 100644 --- a/litellm/llms/anthropic/experimental_pass_through/adapters/streaming_iterator.py +++ b/litellm/llms/anthropic/experimental_pass_through/adapters/streaming_iterator.py @@ -1,5 +1,6 @@ # What is this? ## Translates OpenAI call to Anthropic `/v1/messages` format +import copy import json import traceback from collections import deque @@ -29,6 +30,98 @@ if TYPE_CHECKING: from litellm.types.utils import ModelResponseStream +class _CombinedChunkSplitter: + """ + Splits a streaming chunk that carries BOTH response content and a + ``finish_reason`` into two chunks: a content-only chunk followed by a + finish-only chunk. + + ``AnthropicStreamWrapper`` (via ``translate_streaming_openai_response_to_anthropic``) + assumes content and ``finish_reason`` never arrive in the same chunk — true for + real provider streams, but false for fake-streamed providers (e.g. Vertex AI + Gemma ``:predict``) where ``MockResponseIterator`` collapses the entire response + into a single chunk. Without this split the assumption causes all content to be + silently dropped (only the ``message_delta`` stop event is emitted). + + Supports both sync and async iteration, since ``AnthropicStreamWrapper`` exposes + both ``__next__`` and ``__anext__``. An instance is single-mode: callers must + iterate it either synchronously or asynchronously, never both — the two modes + hold independent iterator references on the upstream stream and mixing them + would advance them out of sync. + """ + + def __init__(self, completion_stream: Any): + self._stream = completion_stream + self._sync_iter: Optional[Iterator[Any]] = None + self._async_iter: Optional[AsyncIterator[Any]] = None + self._buffer: deque = deque() + + @staticmethod + def _is_combined(chunk: Any) -> bool: + """True if ``chunk`` carries response content AND a finish_reason.""" + choices = getattr(chunk, "choices", None) + if not choices: + return False + choice = choices[0] + if getattr(choice, "finish_reason", None) is None: + return False + delta = getattr(choice, "delta", None) + if delta is None: + return False + return bool( + getattr(delta, "content", None) + or getattr(delta, "tool_calls", None) + or getattr(delta, "reasoning_content", None) + or getattr(delta, "thinking_blocks", None) + ) + + @staticmethod + def _split(chunk: Any) -> List[Any]: + """Return ``[chunk]``, or ``[content_chunk, finish_chunk]`` if combined.""" + if not _CombinedChunkSplitter._is_combined(chunk): + return [chunk] + + # Content chunk: keep the delta payload, clear the finish_reason. + content_chunk = copy.deepcopy(chunk) + content_chunk.choices[0].finish_reason = None + + # Finish chunk: keep finish_reason (and usage), clear the delta payload. + finish_chunk = copy.deepcopy(chunk) + finish_delta = finish_chunk.choices[0].delta + finish_delta.content = None + if hasattr(finish_delta, "tool_calls"): + finish_delta.tool_calls = None + if hasattr(finish_delta, "reasoning_content"): + finish_delta.reasoning_content = None + if hasattr(finish_delta, "thinking_blocks"): + finish_delta.thinking_blocks = None + return [content_chunk, finish_chunk] + + def __iter__(self) -> "Iterator[Any]": + return self + + def __next__(self) -> Any: + if self._buffer: + return self._buffer.popleft() + if self._sync_iter is None: + self._sync_iter = iter(self._stream) + chunk = next(self._sync_iter) # propagates StopIteration when exhausted + self._buffer.extend(self._split(chunk)) + return self._buffer.popleft() + + def __aiter__(self) -> "AsyncIterator[Any]": + return self + + async def __anext__(self) -> Any: + if self._buffer: + return self._buffer.popleft() + if self._async_iter is None: + self._async_iter = self._stream.__aiter__() + chunk = await self._async_iter.__anext__() # propagates StopAsyncIteration + self._buffer.extend(self._split(chunk)) + return self._buffer.popleft() + + class AnthropicStreamWrapper(AdapterCompletionStreamWrapper): """ - first chunk return 'message_start' @@ -62,7 +155,10 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper): compaction_block: Optional[CompactionBlock] = None, iterations_usage: Optional[List[UsageIteration]] = None, ): - super().__init__(completion_stream) + # Wrap the upstream stream so chunks that carry both content and a + # finish_reason (fake-streamed providers) are split into two — see + # _CombinedChunkSplitter. + super().__init__(_CombinedChunkSplitter(completion_stream)) self.model = model # Mapping of truncated tool names to original names (for OpenAI's 64-char limit) self.tool_name_mapping = tool_name_mapping or {} diff --git a/litellm/llms/base_llm/base_model_iterator.py b/litellm/llms/base_llm/base_model_iterator.py index cf1fd6f786..bf1bfd0653 100644 --- a/litellm/llms/base_llm/base_model_iterator.py +++ b/litellm/llms/base_llm/base_model_iterator.py @@ -50,6 +50,11 @@ def convert_model_response_to_streaming( model=model_response.model, choices=streaming_choices, ) + # Carry usage onto the streaming chunk so fake-streamed responses + # (e.g. Vertex AI Gemma :predict) still report token counts. + usage = getattr(model_response, "usage", None) + if usage is not None: + setattr(processed_chunk, "usage", usage) return processed_chunk except Exception as e: raise ValueError( diff --git a/litellm/llms/databricks/streaming_utils.py b/litellm/llms/databricks/streaming_utils.py index eebe318288..7a7330227d 100644 --- a/litellm/llms/databricks/streaming_utils.py +++ b/litellm/llms/databricks/streaming_utils.py @@ -25,6 +25,28 @@ class ModelResponseIterator: finish_reason = "" usage: Optional[ChatCompletionUsageBlock] = None + # Usage-only final chunk (OpenAI ``stream_options.include_usage``) + # arrives with an empty ``choices`` list — return usage without + # indexing ``choices[0]``. + if len(processed_chunk.choices) == 0: + final_usage = getattr(processed_chunk, "usage", None) + return GenericStreamingChunk( + text="", + tool_use=None, + is_finished=False, + finish_reason="", + usage=( + ChatCompletionUsageBlock( + prompt_tokens=final_usage.prompt_tokens or 0, + completion_tokens=final_usage.completion_tokens or 0, + total_tokens=final_usage.total_tokens or 0, + ) + if final_usage is not None + else None + ), + index=0, + ) + if processed_chunk.choices[0].delta.content is not None: # type: ignore text = processed_chunk.choices[0].delta.content # type: ignore diff --git a/tests/test_litellm/llms/anthropic/experimental_pass_through/adapters/test_streaming_iterator_combined_chunk.py b/tests/test_litellm/llms/anthropic/experimental_pass_through/adapters/test_streaming_iterator_combined_chunk.py new file mode 100644 index 0000000000..f74c5b6130 --- /dev/null +++ b/tests/test_litellm/llms/anthropic/experimental_pass_through/adapters/test_streaming_iterator_combined_chunk.py @@ -0,0 +1,150 @@ +""" +Regression tests for fake-streamed providers routed through `/v1/messages`. + +A fake-streaming provider (e.g. Vertex AI Gemma `:predict`) collapses its whole +response into a single `MockResponseIterator` chunk that carries content text AND a +`finish_reason` together. `AnthropicStreamWrapper` previously dropped all content in +this case — `translate_streaming_openai_response_to_anthropic` sees the finish_reason +and emits only a `message_delta`. `_CombinedChunkSplitter` splits such chunks so the +content survives. +""" + +import asyncio +import json +from types import SimpleNamespace + +from litellm.llms.anthropic.experimental_pass_through.adapters.streaming_iterator import ( + AnthropicStreamWrapper, + _CombinedChunkSplitter, +) +from litellm.llms.base_llm.base_model_iterator import MockResponseIterator +from litellm.types.utils import ( + Choices, + Delta, + Message, + ModelResponse, + ModelResponseStream, + StreamingChoices, + Usage, +) + + +def _build_fake_stream( + content: str, finish_reason: str = "stop" +) -> MockResponseIterator: + """Mimic a Vertex Gemma `:predict` fake stream: one collapsed chunk.""" + model_response = ModelResponse() + model_response.choices = [ + Choices( + index=0, + message=Message(role="assistant", content=content), + finish_reason=finish_reason, + ) + ] + model_response.usage = Usage(prompt_tokens=10, completion_tokens=5, total_tokens=15) + model_response.model = "gemma4" + return MockResponseIterator(model_response=model_response) + + +def _collect_async(wrapper: AnthropicStreamWrapper) -> str: + async def _run() -> str: + out = [] + async for raw in wrapper.async_anthropic_sse_wrapper(): + out.append(raw.decode() if isinstance(raw, bytes) else raw) + return "".join(out) + + return asyncio.run(_run()) + + +def test_fake_stream_content_reaches_anthropic_sse(): + """Content from a collapsed fake-stream chunk must be emitted as a delta.""" + wrapper = AnthropicStreamWrapper( + completion_stream=_build_fake_stream("Hello, the answer is 2."), + model="gemma4", + ) + sse = _collect_async(wrapper) + + assert "content_block_delta" in sse + assert "Hello, the answer is 2." in sse + assert "message_delta" in sse + assert "message_stop" in sse + + +def test_fake_stream_usage_preserved(): + """The finish chunk keeps usage so output_tokens is non-zero.""" + wrapper = AnthropicStreamWrapper( + completion_stream=_build_fake_stream("Two."), + model="gemma4", + ) + sse = _collect_async(wrapper) + + message_delta = next( + json.loads(line[len("data: ") :]) + for block in sse.split("\n\n") + for line in block.splitlines() + if line.startswith("data: ") and '"message_delta"' in line + ) + assert message_delta["usage"]["output_tokens"] == 5 + assert message_delta["usage"]["input_tokens"] == 10 + + +def test_splitter_passes_through_non_combined_chunks(): + """A chunk with content but no finish_reason is not split.""" + chunk = ModelResponseStream( + choices=[ + StreamingChoices( + index=0, delta=Delta(content="partial"), finish_reason=None + ) + ] + ) + chunks = list(_CombinedChunkSplitter(iter([chunk]))) + assert len(chunks) == 1 + assert chunks[0].choices[0].delta.content == "partial" + + +def test_splitter_splits_combined_chunk_into_content_then_finish(): + """A chunk with both content and finish_reason becomes two chunks.""" + chunk = ModelResponseStream( + choices=[ + StreamingChoices(index=0, delta=Delta(content="done"), finish_reason="stop") + ] + ) + content_chunk, finish_chunk = list(_CombinedChunkSplitter(iter([chunk]))) + + assert content_chunk.choices[0].delta.content == "done" + assert content_chunk.choices[0].finish_reason is None + + assert finish_chunk.choices[0].finish_reason == "stop" + assert finish_chunk.choices[0].delta.content is None + + +def test_is_combined_false_when_choices_empty(): + """A metadata-only chunk with no choices is never treated as combined.""" + assert _CombinedChunkSplitter._is_combined(SimpleNamespace(choices=[])) is False + + +def test_is_combined_false_when_delta_missing(): + """A finish chunk whose choice has no delta is not combined.""" + chunk = SimpleNamespace(choices=[SimpleNamespace(finish_reason="stop", delta=None)]) + assert _CombinedChunkSplitter._is_combined(chunk) is False + + +def test_split_clears_reasoning_and_thinking_on_finish_chunk(): + """When the combined delta carries reasoning/thinking, only the content + chunk keeps them — the finish chunk is cleared.""" + delta = SimpleNamespace( + content="hi", + tool_calls=None, + reasoning_content="some reasoning", + thinking_blocks=[{"type": "thinking"}], + ) + chunk = SimpleNamespace( + choices=[SimpleNamespace(finish_reason="stop", delta=delta)] + ) + + content_chunk, finish_chunk = _CombinedChunkSplitter._split(chunk) + + assert content_chunk.choices[0].delta.reasoning_content == "some reasoning" + assert content_chunk.choices[0].delta.thinking_blocks == [{"type": "thinking"}] + assert finish_chunk.choices[0].delta.reasoning_content is None + assert finish_chunk.choices[0].delta.thinking_blocks is None diff --git a/tests/test_litellm/llms/databricks/test_databricks_streaming_utils.py b/tests/test_litellm/llms/databricks/test_databricks_streaming_utils.py new file mode 100644 index 0000000000..5612864a84 --- /dev/null +++ b/tests/test_litellm/llms/databricks/test_databricks_streaming_utils.py @@ -0,0 +1,64 @@ +""" +Regression test for the databricks streaming chunk parser. + +OpenAI-compatible servers (e.g. Vertex AI Model Garden vLLM endpoints) send a final +usage-only chunk with an empty `choices` list when `stream_options.include_usage` is +set. `chunk_parser` previously did `choices[0]` unconditionally, raising +`IndexError` -> `MidStreamFallbackError` and crashing the stream. +""" + +from litellm.llms.databricks.streaming_utils import ModelResponseIterator + + +def test_chunk_parser_handles_empty_choices_usage_chunk(): + """A usage-only final chunk (empty choices) must not raise IndexError.""" + iterator = ModelResponseIterator(streaming_response=None, sync_stream=True) + usage_only_chunk = { + "id": "chatcmpl-x", + "object": "chat.completion.chunk", + "created": 1, + "model": "m", + "choices": [], + "usage": {"prompt_tokens": 20, "completion_tokens": 8, "total_tokens": 28}, + } + + result = iterator.chunk_parser(chunk=usage_only_chunk) + + assert result["text"] == "" + assert result["is_finished"] is False + assert result["usage"] is not None + assert result["usage"]["prompt_tokens"] == 20 + assert result["usage"]["completion_tokens"] == 8 + + +def test_chunk_parser_empty_choices_without_usage(): + """An empty-choices chunk with no usage block returns usage=None, no error.""" + iterator = ModelResponseIterator(streaming_response=None, sync_stream=True) + chunk = { + "id": "chatcmpl-x", + "object": "chat.completion.chunk", + "created": 1, + "model": "m", + "choices": [], + } + + result = iterator.chunk_parser(chunk=chunk) + + assert result["text"] == "" + assert result["usage"] is None + + +def test_chunk_parser_normal_content_chunk_still_works(): + """A regular content chunk is unaffected by the empty-choices guard.""" + iterator = ModelResponseIterator(streaming_response=None, sync_stream=True) + chunk = { + "id": "chatcmpl-x", + "object": "chat.completion.chunk", + "created": 1, + "model": "m", + "choices": [{"index": 0, "delta": {"content": "hi"}, "finish_reason": None}], + } + + result = iterator.chunk_parser(chunk=chunk) + + assert result["text"] == "hi"