[internal copy of #28007] Fix/gcp model garden streaming (#28363)

* fix(vertex): stream Model Garden Gemma/Qwen responses correctly through /v1/messages

* test(vertex): cover _CombinedChunkSplitter defensive branches

* test(databricks): rename test file to avoid duplicate basename collision

* fix(databricks,anthropic): defensive token defaults; document single-mode splitter

Address greptile P2 concerns:
- databricks: default usage token fields to 0 when constructing
  ChatCompletionUsageBlock from a partially populated usage block — matches
  the defensive pattern used in ollama/vertex_ai/cohere/bedrock.
- _CombinedChunkSplitter: clarify in the docstring that an instance is
  single-mode (sync or async, not both), since the two iteration paths hold
  independent upstream iterator references.

Co-authored-by: Claude <claude@anthropic.com>

---------

Co-authored-by: Steven Kessler <9701252+stvnksslr@users.noreply.github.com>
Co-authored-by: Claude <claude@anthropic.com>
This commit is contained in:
Mateo Wang 2026-06-10 12:31:00 -07:00 committed by GitHub
parent 410b892f77
commit a4a3348801
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 338 additions and 1 deletions

View File

@ -1,5 +1,6 @@
# What is this?
## Translates OpenAI call to Anthropic `/v1/messages` format
import copy
import json
import traceback
from collections import deque
@ -29,6 +30,98 @@ if TYPE_CHECKING:
from litellm.types.utils import ModelResponseStream
class _CombinedChunkSplitter:
"""
Splits a streaming chunk that carries BOTH response content and a
``finish_reason`` into two chunks: a content-only chunk followed by a
finish-only chunk.
``AnthropicStreamWrapper`` (via ``translate_streaming_openai_response_to_anthropic``)
assumes content and ``finish_reason`` never arrive in the same chunk true for
real provider streams, but false for fake-streamed providers (e.g. Vertex AI
Gemma ``:predict``) where ``MockResponseIterator`` collapses the entire response
into a single chunk. Without this split the assumption causes all content to be
silently dropped (only the ``message_delta`` stop event is emitted).
Supports both sync and async iteration, since ``AnthropicStreamWrapper`` exposes
both ``__next__`` and ``__anext__``. An instance is single-mode: callers must
iterate it either synchronously or asynchronously, never both the two modes
hold independent iterator references on the upstream stream and mixing them
would advance them out of sync.
"""
def __init__(self, completion_stream: Any):
self._stream = completion_stream
self._sync_iter: Optional[Iterator[Any]] = None
self._async_iter: Optional[AsyncIterator[Any]] = None
self._buffer: deque = deque()
@staticmethod
def _is_combined(chunk: Any) -> bool:
"""True if ``chunk`` carries response content AND a finish_reason."""
choices = getattr(chunk, "choices", None)
if not choices:
return False
choice = choices[0]
if getattr(choice, "finish_reason", None) is None:
return False
delta = getattr(choice, "delta", None)
if delta is None:
return False
return bool(
getattr(delta, "content", None)
or getattr(delta, "tool_calls", None)
or getattr(delta, "reasoning_content", None)
or getattr(delta, "thinking_blocks", None)
)
@staticmethod
def _split(chunk: Any) -> List[Any]:
"""Return ``[chunk]``, or ``[content_chunk, finish_chunk]`` if combined."""
if not _CombinedChunkSplitter._is_combined(chunk):
return [chunk]
# Content chunk: keep the delta payload, clear the finish_reason.
content_chunk = copy.deepcopy(chunk)
content_chunk.choices[0].finish_reason = None
# Finish chunk: keep finish_reason (and usage), clear the delta payload.
finish_chunk = copy.deepcopy(chunk)
finish_delta = finish_chunk.choices[0].delta
finish_delta.content = None
if hasattr(finish_delta, "tool_calls"):
finish_delta.tool_calls = None
if hasattr(finish_delta, "reasoning_content"):
finish_delta.reasoning_content = None
if hasattr(finish_delta, "thinking_blocks"):
finish_delta.thinking_blocks = None
return [content_chunk, finish_chunk]
def __iter__(self) -> "Iterator[Any]":
return self
def __next__(self) -> Any:
if self._buffer:
return self._buffer.popleft()
if self._sync_iter is None:
self._sync_iter = iter(self._stream)
chunk = next(self._sync_iter) # propagates StopIteration when exhausted
self._buffer.extend(self._split(chunk))
return self._buffer.popleft()
def __aiter__(self) -> "AsyncIterator[Any]":
return self
async def __anext__(self) -> Any:
if self._buffer:
return self._buffer.popleft()
if self._async_iter is None:
self._async_iter = self._stream.__aiter__()
chunk = await self._async_iter.__anext__() # propagates StopAsyncIteration
self._buffer.extend(self._split(chunk))
return self._buffer.popleft()
class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
"""
- first chunk return 'message_start'
@ -62,7 +155,10 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
compaction_block: Optional[CompactionBlock] = None,
iterations_usage: Optional[List[UsageIteration]] = None,
):
super().__init__(completion_stream)
# Wrap the upstream stream so chunks that carry both content and a
# finish_reason (fake-streamed providers) are split into two — see
# _CombinedChunkSplitter.
super().__init__(_CombinedChunkSplitter(completion_stream))
self.model = model
# Mapping of truncated tool names to original names (for OpenAI's 64-char limit)
self.tool_name_mapping = tool_name_mapping or {}

View File

@ -50,6 +50,11 @@ def convert_model_response_to_streaming(
model=model_response.model,
choices=streaming_choices,
)
# Carry usage onto the streaming chunk so fake-streamed responses
# (e.g. Vertex AI Gemma :predict) still report token counts.
usage = getattr(model_response, "usage", None)
if usage is not None:
setattr(processed_chunk, "usage", usage)
return processed_chunk
except Exception as e:
raise ValueError(

View File

@ -25,6 +25,28 @@ class ModelResponseIterator:
finish_reason = ""
usage: Optional[ChatCompletionUsageBlock] = None
# Usage-only final chunk (OpenAI ``stream_options.include_usage``)
# arrives with an empty ``choices`` list — return usage without
# indexing ``choices[0]``.
if len(processed_chunk.choices) == 0:
final_usage = getattr(processed_chunk, "usage", None)
return GenericStreamingChunk(
text="",
tool_use=None,
is_finished=False,
finish_reason="",
usage=(
ChatCompletionUsageBlock(
prompt_tokens=final_usage.prompt_tokens or 0,
completion_tokens=final_usage.completion_tokens or 0,
total_tokens=final_usage.total_tokens or 0,
)
if final_usage is not None
else None
),
index=0,
)
if processed_chunk.choices[0].delta.content is not None: # type: ignore
text = processed_chunk.choices[0].delta.content # type: ignore

View File

@ -0,0 +1,150 @@
"""
Regression tests for fake-streamed providers routed through `/v1/messages`.
A fake-streaming provider (e.g. Vertex AI Gemma `:predict`) collapses its whole
response into a single `MockResponseIterator` chunk that carries content text AND a
`finish_reason` together. `AnthropicStreamWrapper` previously dropped all content in
this case `translate_streaming_openai_response_to_anthropic` sees the finish_reason
and emits only a `message_delta`. `_CombinedChunkSplitter` splits such chunks so the
content survives.
"""
import asyncio
import json
from types import SimpleNamespace
from litellm.llms.anthropic.experimental_pass_through.adapters.streaming_iterator import (
AnthropicStreamWrapper,
_CombinedChunkSplitter,
)
from litellm.llms.base_llm.base_model_iterator import MockResponseIterator
from litellm.types.utils import (
Choices,
Delta,
Message,
ModelResponse,
ModelResponseStream,
StreamingChoices,
Usage,
)
def _build_fake_stream(
content: str, finish_reason: str = "stop"
) -> MockResponseIterator:
"""Mimic a Vertex Gemma `:predict` fake stream: one collapsed chunk."""
model_response = ModelResponse()
model_response.choices = [
Choices(
index=0,
message=Message(role="assistant", content=content),
finish_reason=finish_reason,
)
]
model_response.usage = Usage(prompt_tokens=10, completion_tokens=5, total_tokens=15)
model_response.model = "gemma4"
return MockResponseIterator(model_response=model_response)
def _collect_async(wrapper: AnthropicStreamWrapper) -> str:
async def _run() -> str:
out = []
async for raw in wrapper.async_anthropic_sse_wrapper():
out.append(raw.decode() if isinstance(raw, bytes) else raw)
return "".join(out)
return asyncio.run(_run())
def test_fake_stream_content_reaches_anthropic_sse():
"""Content from a collapsed fake-stream chunk must be emitted as a delta."""
wrapper = AnthropicStreamWrapper(
completion_stream=_build_fake_stream("Hello, the answer is 2."),
model="gemma4",
)
sse = _collect_async(wrapper)
assert "content_block_delta" in sse
assert "Hello, the answer is 2." in sse
assert "message_delta" in sse
assert "message_stop" in sse
def test_fake_stream_usage_preserved():
"""The finish chunk keeps usage so output_tokens is non-zero."""
wrapper = AnthropicStreamWrapper(
completion_stream=_build_fake_stream("Two."),
model="gemma4",
)
sse = _collect_async(wrapper)
message_delta = next(
json.loads(line[len("data: ") :])
for block in sse.split("\n\n")
for line in block.splitlines()
if line.startswith("data: ") and '"message_delta"' in line
)
assert message_delta["usage"]["output_tokens"] == 5
assert message_delta["usage"]["input_tokens"] == 10
def test_splitter_passes_through_non_combined_chunks():
"""A chunk with content but no finish_reason is not split."""
chunk = ModelResponseStream(
choices=[
StreamingChoices(
index=0, delta=Delta(content="partial"), finish_reason=None
)
]
)
chunks = list(_CombinedChunkSplitter(iter([chunk])))
assert len(chunks) == 1
assert chunks[0].choices[0].delta.content == "partial"
def test_splitter_splits_combined_chunk_into_content_then_finish():
"""A chunk with both content and finish_reason becomes two chunks."""
chunk = ModelResponseStream(
choices=[
StreamingChoices(index=0, delta=Delta(content="done"), finish_reason="stop")
]
)
content_chunk, finish_chunk = list(_CombinedChunkSplitter(iter([chunk])))
assert content_chunk.choices[0].delta.content == "done"
assert content_chunk.choices[0].finish_reason is None
assert finish_chunk.choices[0].finish_reason == "stop"
assert finish_chunk.choices[0].delta.content is None
def test_is_combined_false_when_choices_empty():
"""A metadata-only chunk with no choices is never treated as combined."""
assert _CombinedChunkSplitter._is_combined(SimpleNamespace(choices=[])) is False
def test_is_combined_false_when_delta_missing():
"""A finish chunk whose choice has no delta is not combined."""
chunk = SimpleNamespace(choices=[SimpleNamespace(finish_reason="stop", delta=None)])
assert _CombinedChunkSplitter._is_combined(chunk) is False
def test_split_clears_reasoning_and_thinking_on_finish_chunk():
"""When the combined delta carries reasoning/thinking, only the content
chunk keeps them the finish chunk is cleared."""
delta = SimpleNamespace(
content="hi",
tool_calls=None,
reasoning_content="some reasoning",
thinking_blocks=[{"type": "thinking"}],
)
chunk = SimpleNamespace(
choices=[SimpleNamespace(finish_reason="stop", delta=delta)]
)
content_chunk, finish_chunk = _CombinedChunkSplitter._split(chunk)
assert content_chunk.choices[0].delta.reasoning_content == "some reasoning"
assert content_chunk.choices[0].delta.thinking_blocks == [{"type": "thinking"}]
assert finish_chunk.choices[0].delta.reasoning_content is None
assert finish_chunk.choices[0].delta.thinking_blocks is None

View File

@ -0,0 +1,64 @@
"""
Regression test for the databricks streaming chunk parser.
OpenAI-compatible servers (e.g. Vertex AI Model Garden vLLM endpoints) send a final
usage-only chunk with an empty `choices` list when `stream_options.include_usage` is
set. `chunk_parser` previously did `choices[0]` unconditionally, raising
`IndexError` -> `MidStreamFallbackError` and crashing the stream.
"""
from litellm.llms.databricks.streaming_utils import ModelResponseIterator
def test_chunk_parser_handles_empty_choices_usage_chunk():
"""A usage-only final chunk (empty choices) must not raise IndexError."""
iterator = ModelResponseIterator(streaming_response=None, sync_stream=True)
usage_only_chunk = {
"id": "chatcmpl-x",
"object": "chat.completion.chunk",
"created": 1,
"model": "m",
"choices": [],
"usage": {"prompt_tokens": 20, "completion_tokens": 8, "total_tokens": 28},
}
result = iterator.chunk_parser(chunk=usage_only_chunk)
assert result["text"] == ""
assert result["is_finished"] is False
assert result["usage"] is not None
assert result["usage"]["prompt_tokens"] == 20
assert result["usage"]["completion_tokens"] == 8
def test_chunk_parser_empty_choices_without_usage():
"""An empty-choices chunk with no usage block returns usage=None, no error."""
iterator = ModelResponseIterator(streaming_response=None, sync_stream=True)
chunk = {
"id": "chatcmpl-x",
"object": "chat.completion.chunk",
"created": 1,
"model": "m",
"choices": [],
}
result = iterator.chunk_parser(chunk=chunk)
assert result["text"] == ""
assert result["usage"] is None
def test_chunk_parser_normal_content_chunk_still_works():
"""A regular content chunk is unaffected by the empty-choices guard."""
iterator = ModelResponseIterator(streaming_response=None, sync_stream=True)
chunk = {
"id": "chatcmpl-x",
"object": "chat.completion.chunk",
"created": 1,
"model": "m",
"choices": [{"index": 0, "delta": {"content": "hi"}, "finish_reason": None}],
}
result = iterator.chunk_parser(chunk=chunk)
assert result["text"] == "hi"