[Feat] Add tool calling support for gemini and vertex ai live api (#26590)

* Add tool calling support for gemini and vertex ai live api

* Fix greptile reviews

* Add new functionality behind flag

* fix greptile issues

* Fix greptile review

* Fix greptile review

* Fix greptile review

* Fix greptile review

* Fix greptile review

* fix lint

* fix(realtime): address P1 issues - guardrail timing and inputAudioTranscription default

- Remove early guardrail turn-detection update that consumed first setup slot
- Add inputAudioTranscription default in Gemini deferred-mode setup
- Add tests for both fixes

Made-with: Cursor

* fix(realtime): inject turn_detection into first session.update for deferred mode

- Instead of sending turn_detection as separate message (which gets dropped), inject it into the first client session.update
- This ensures guardrails work correctly in deferred mode
- Add test for turn_detection injection in deferred mode

Made-with: Cursor

* fix(realtime): emit response.created preamble before tool-call events

- Emit response.created, output_item.added, and conversation.item.created for function calls
- Ensures OpenAI Realtime API spec compliance
- Add test for preamble emission

Made-with: Cursor

* fix(realtime): add response.output_item.done to complete tool-call sequence

- Emit response.output_item.done between function_call_arguments.done and conversation.item.created
- Required by OpenAI Realtime spec to finalize function-call items
- Update test to verify complete event sequence

Made-with: Cursor

* fix(realtime): emit response.done after tool-call sequence (P0 CRITICAL)

- Add response.done event after tool-call loop to signal response completion
- Required by OpenAI SDK clients to submit tool results
- Without this, clients stall indefinitely waiting for response completion
- Update test to verify complete 6-event sequence including response.done

Made-with: Cursor

* fix(realtime): include function name in toolResponse (P1)

- Store call_id → name mapping when receiving toolCall from Gemini
- Look up and include name in functionResponses when sending tool results
- Required by Gemini Live API spec for proper tool call routing
- Add test to verify name field is included in round-trip

Made-with: Cursor

* fix: resolve merge conflict markers in UI build chunk

Take litellm_internal_staging version of e1a670efcb966aaa.js after
incomplete merge left conflict markers in the committed artifact.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(vertex_ai/realtime): call super().__init__() to initialize tool call state

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): correct guardrail flag and event-mapping fallback

- realtime_streaming: only mark _guardrail_turn_detection_update_sent
  when the message was actually delivered to the backend. The provider
  transformation (e.g. Gemini after initial setup) may silently drop
  session.update; previously we set the flag anyway, falsely claiming
  the disable was sent and preventing any retry on subsequent
  session.created events. _send_to_backend now returns whether at
  least one transformed message was sent.

- gemini realtime transformation: avoid shadowing the outer
  openai_event variable in map_openai_event's fallback loop. With
  the new toolCall entry now last in MAP_GEMINI_FIELD_TO_OPENAI_EVENT,
  an unmatched key would otherwise leak FUNCTION_CALL_ARGUMENTS_DONE
  and skip the ValueError raise. Use a distinct loop variable so the
  is-None check correctly raises for unknown Gemini messages.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini/realtime): reset response IDs after tool-call response.done

After closing a tool-call response, clear current_output_item_id and
current_response_id so post-tool model turns emit a fresh response.created
preamble. Add regression tests and align guardrail turn_detection test with
GA session shape; apply Black formatting.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix lint

* fix(realtime): log injected message and forward guardrail VAD-disable on Gemini

- Move store_input() after the guardrail turn_detection injection in
  client_ack_messages so audit logs reflect what is actually forwarded
  to the backend (previously the unmodified pre-injection message was
  logged).
- In Gemini's _handle_session_update, allow a session.update that only
  carries a turn_detection change to be forwarded as a follow-up Gemini
  setup with realtimeInputConfig.automaticActivityDetection set, even
  after the initial setup. This restores the guardrail layer's ability
  to disable VAD auto-response in non-deferred mode (the default Gemini
  flow), which was a regression after _handle_session_update started
  silently dropping subsequent session.update messages. Both flat
  beta-style and nested GA-style turn_detection payloads are accepted.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini/realtime): resolve mypy TypedDict errors in transformation

Align realtime event payloads and setup types with OpenAI/Gemini TypedDicts so mypy passes and tool-call events type-check correctly.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(realtime): forward turn_detection updates for Vertex; respect partial VAD config; cache setup after send

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): consolidate send-and-cache, guard session.update lookup, preserve client turn_detection in GA remap

- Replace duplicated transform/send/cache logic in client_ack_messages with a call to _send_to_backend so future changes stay in one place.
- VertexAIRealtimeConfig.transform_realtime_request now uses .get('session') or {} for the first session.update so a malformed client payload no longer crashes the connection.
- Move the audio-transcription guardrail turn_detection injection to run BEFORE the beta->GA session remap. This lets the injected create_response ride along with any client-provided turn_detection fields (e.g. silence_duration_ms) into the nested audio.input.turn_detection path produced by the remap instead of being stranded as a separate root-level dict.
- Update the deferred-mode injection test to assert the GA-shaped location.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): pop tool_call_id mapping after use to bound memory

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): correct deferred-setup session.created modalities and reset IDs after response.done

- Convert provider's real session.created to session.updated when a synthetic
  one was already forwarded so clients receive the authoritative modalities
  derived from their session.update instead of the synthetic placeholder.
- Reset current_response_id / current_output_item_id after Gemini RESPONSE_DONE
  so a toolCall arriving in a later frame starts a fresh response instead of
  reusing the completed response's ID and emitting a duplicate response.done.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini-realtime): preserve nested turn_detection through map_openai_params

After the GA remap moves session.turn_detection into session.audio.input.turn_detection,
Gemini's map_openai_params only looks at top-level keys and silently drops it. Normalize
the extracted turn_detection back to the top level on first session.update so the guardrail
create_response:False (and any client-provided VAD settings) reach the Gemini setup.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): normalize Vertex AI nested turn_detection and unify session.created guardrail ordering

- Vertex AI _build_vertex_ai_setup_config now lifts nested
  audio.input.turn_detection to the top level before calling
  map_openai_params, mirroring the parent GeminiRealtimeConfig
  behavior. Without this, guardrail-injected create_response: False
  was silently dropped for GA-protocol Vertex AI clients.
- realtime_streaming session.created handling now sends the
  (possibly re-typed) event first and then triggers the guardrail
  turn-detection update for both first and duplicate cases, removing
  the inconsistent guardrail-then-event ordering for duplicates.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): tolerate non-dict turn_detection in guardrail injection

When a client sends a session.update whose turn_detection field is None or
a non-dict value (e.g. "auto"), the guardrail injection used setdefault
followed by item assignment on the returned value, raising TypeError. The
inner except only caught JSONDecodeError/AttributeError, so the TypeError
escaped to the outer Exception handler that wraps the entire client_ack
loop, killing the connection. Replace non-dict turn_detection with a
fresh dict carrying create_response=False so the guardrail still applies
without crashing the loop.

* fix(gemini realtime): default synthetic session.created modalities to AUDIO

The synthetic session.created event emitted in deferred setup mode used
TEXT as the default for responseModalities, while _handle_session_update
defaults to AUDIO. Align the default so clients reading modalities from
the initial session.created see the correct value for live sessions.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(vertex_ai/realtime): drop follow-up session.update to avoid 1007 close

Vertex AI Live treats setup as a first-and-only client message; emitting a
second setup with realtimeInputConfig only closes the websocket with a 1007
policy error. Reverting the follow-up-setup branch restores the pre-existing
no-op behavior for subsequent session.update messages.

* fix(gemini realtime): default responseModalities to AUDIO in delta events

Align return_new_content_delta_events with the AUDIO defaults used in
_handle_session_update and transform_session_created_event so deferred
session config does not produce TEXT-typed delta events for audio data.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): default response.done modalities to AUDIO and correct audio-done test

* fix(realtime): set guardrail turn_detection flag only after successful send

Previously the _guardrail_turn_detection_update_sent flag was set inline
during message rewriting in client_ack_messages, before the modified
session.update was forwarded to the backend. If _send_to_backend raised
(e.g. backend WebSocket disconnect), the exception was caught and the
loop continued, but the flag remained True — permanently disabling the
guardrail create_response=False injection for the rest of the session.
Neither the client_ack_messages path nor the
_maybe_send_guardrail_turn_detection_update backup path would retry.

Track the injection locally and only set the flag after _send_to_backend
returns a truthy sent result, matching the pattern used by
_maybe_send_guardrail_turn_detection_update.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(vertex_ai realtime): keep VAD enabled when guardrails inject create_response: False

map_automatic_turn_detection sets disabled=True whenever create_response is
absent OR False. Transcription guardrails inject create_response: False to
suppress auto-responses while expecting VAD to stay active, but the previous
override in _build_vertex_ai_setup_config only fired when create_response was
absent, leaving disabled=True and silently breaking speech detection and
transcription events. Vertex Live has no 'VAD on, no auto-response' mode, so
always keep VAD active in the setup config.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): normalize GA-remapped session fields before mapping

map_openai_params only recognises the flat OpenAI-beta keys (modalities,
input_audio_transcription, turn_detection). For GA clients the upstream
shim renames these into the nested GA schema (output_modalities,
audio.input.transcription, audio.input.turn_detection), causing them to
be silently dropped in _handle_session_update. Add a normalization helper
that surfaces the GA-remapped values back at the top level so the
existing mapping logic picks them up. Without this, a GA client
explicitly requesting modalities=['text'] would still default to audio
output.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(vertex_ai/realtime): normalize all GA-remapped session fields before mapping

Previously _build_vertex_ai_setup_config only lifted nested turn_detection
back to the top level. GA clients' output_modalities and
audio.input.transcription were silently dropped because map_openai_params
only recognises the flat OpenAI-beta keys. Use the parent's
_normalize_session_payload_for_mapping so modalities, transcription, and
turn_detection are all surfaced before mapping.

* fix(realtime): force create_response=False in all client session.update turn_detection when audio guardrails active

Prevents a client from re-enabling Gemini/GA VAD auto-response (and thereby
bypassing the audio transcription guardrail) by sending a later
session.update with turn_detection.create_response: true.

* fix(lint): silence PLR0915 on client_ack_messages

The function exceeded the 50-statement limit (64 > 50) after recent
realtime guardrail additions. Matches the existing project pattern for
inherently complex event/message-mapping methods (see _process_event,
translate_messages_to_responses_input, transform_realtime_response,
_arealtime, etc.).

* fix(gemini realtime): preserve original setup config on follow-up session.update

Gemini Live treats a second BidiGenerateContentSetup as a full session
replacement, not a partial merge. The guardrail-driven turn_detection-only
session.update was emitting a setup containing only model + realtimeInputConfig,
which would silently drop tools, generationConfig, inputAudioTranscription, and
systemInstruction from the original setup. Carry forward the cached original
setup and only override realtimeInputConfig.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): avoid double-serialization and normalize non-dict turn_detection in guardrail override

- Skip the force-override block when the injection block already ran for
  the same session.update to avoid redundant JSON re-serialization.
- Normalize non-dict client-provided turn_detection values (flat and
  nested audio.input.turn_detection) to a dict before enforcing
  create_response=False, matching the injection block's behavior and
  preventing potential bypass on backends that accept non-dict values.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* test(gemini realtime): exercise toolCall → function_call_output name round-trip

Update test_gemini_realtime_function_call_output_transformation to pre-load
the call_id → name mapping by transforming a Gemini toolCall first, then
assert that the resulting Gemini toolResponse functionResponses entry
carries the function name. This pins the production round-trip rather
than the degenerate 'name missing' branch.

* fix(realtime): correct conversation_id, VAD disable, modality state, empty toolCall

- Gemini tool-call response.done now includes conversation_id so clients
  can match it against the preceding response.created.
- Vertex AI setup no longer overrides an explicit guardrail-injected
  create_response: False back to disabled: False; the guardrail's intent
  to disable VAD auto-response is now respected.
- Modality handler is now passed the locally-updated response/item IDs
  rather than the original input snapshot, preventing stale IDs after a
  prior tool-call/response.done in the same JSON message resets them.
- Skip emitting orphaned response.created/response.done events when
  Gemini sends an empty functionCalls array.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): preserve client session.update fields on follow-up Gemini setup

In non-deferred mode the auto-setup pre-populates session_configuration_request,
so a later client session.update carrying tools or instructions used to fall
into the subsequent path and only forward turn_detection. Rebuild a merged
follow-up setup that overlays the new client fields on top of the original
setup so tools/instructions/etc. are no longer silently dropped.

* fix(gemini realtime): include usage on tool-call response.done; coerce non-dict tool output to struct

- Tool-call response.done now includes an empty usage object, matching the
  non-tool-call path so OpenAI-compatible clients always see usage.
- _handle_function_call_output wraps non-dict JSON parses under a 'result'
  key so Gemini's functionResponses[].response (a Struct) always receives a
  mapping.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): deep-merge nested config in follow-up session update

Previously, the follow-up setup performed a shallow merge between the
original setup and new overrides. If a session.update touched any field
inside generationConfig (e.g. modalities), the entire generationConfig
would be replaced, silently dropping unrelated sub-keys like temperature
or maxOutputTokens. Apply the same deep-merge to realtimeInputConfig so
partial automatic-activity-detection updates don't drop other realtime
input config fields either.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): default conversation_id before tool-call response.done

mypy flagged that response.done's conversation_id (str on the TypedDict)
could be None when current_response_id was already set on entry. Ensure
the fallback runs unconditionally before the response is constructed.

* fix(realtime): deep-merge generationConfig and refresh cache on follow-up setup

A subsequent Gemini session.update that touches any generationConfig sub-field
(e.g. just temperature) was clobbering the original generationConfig — silently
dropping responseModalities and switching the session to text-only. Deep-merge
generationConfig so existing keys (responseModalities, maxOutputTokens, ...) are
preserved when the client updates only a subset.

Also drop the early-return in _cache_session_configuration_request so the
cached payload tracks the latest setup sent to the backend. Without this,
downstream readers (transform_session_created_event, modality lookup in
return_new_content_delta_events) keep reading stale modalities/system
instruction after a follow-up setup.

* fix(gemini realtime): mirror modalities/temperature/max_output_tokens on tool-call response.created

The audio/text response.created preamble includes modalities, temperature,
and max_output_tokens on the response object so spec-compliant clients can
initialise per-response state. The tool-call response.created was missing
these fields, leaving clients without consistent response metadata when a
response starts with a tool call instead of content. Read them from the
cached session_configuration_request the same way the audio/text path
does.

* fix(gemini realtime): keep call_id→name mapping across function_call_output retries

A client SDK that retries function_call_output (or sends the same result
twice) would previously hit a missing-name lookup on the second send
because _handle_function_call_output popped the call_id → name entry.
Without name, Gemini may silently reject the response. Use dict.get so
the mapping persists for the lifetime of the session.

* fix(gemini realtime): empty toolCall must not terminate the WebSocket

If Gemini sends a toolCall whose functionCalls list is empty (or absent),
the previous `continue` left returned_message empty and the
"Unknown message type" guard fired, killing the WebSocket session.
Return a normal (empty) result instead so the session keeps going.

* fix(vertex realtime): warn when dropping guardrail turn-detection update

In non-deferred mode the auto-setup is sent on connect, so the audio-transcription
guardrail's subsequent session.update carrying turn_detection.create_response=False
cannot be forwarded as a second setup (Vertex Live closes the WebSocket with 1007).
Surface a warning when this specific drop happens so operators know the model
will auto-respond before the guardrail can gate it, instead of failing silently
at debug level.

* fix(gemini realtime): deep-merge automaticActivityDetection on follow-up session.update

The follow-up setup merge already deep-merged generationConfig and
realtimeInputConfig, but realtimeInputConfig.automaticActivityDetection
itself is a nested dict. A partial VAD update (e.g. the
guardrail-injected disabled=True from create_response=False) silently
dropped unrelated knobs such as silenceDurationMs and prefixPaddingMs
from the original setup. Deep-merge that block too so partial overrides
only touch the fields they specify.

* fix(realtime): record synthetic session.created in deferred-setup mode

The deferred-setup path emits a synthetic session.created directly to
the client websocket but did not run it through RealTimeStreaming's
store_message, so the event was missing from the session log used by
success_handler / async_success_handler. Call store_message before
forwarding so the synthetic event lands in the same log stream as
provider-driven events.

* fix(gemini realtime): bound _tool_call_id_to_name with an LRU; exercise modality forwarding test

Two minor follow-ups from review:

* Switch _tool_call_id_to_name to a 256-entry LRU OrderedDict so a long
  session with many tool calls doesn't grow the dict without bound,
  while retried function_call_output lookups still hit for recently-seen
  call_ids.
* Fix test_gemini_realtime_transformation_session_created to wrap the
  cached session config in {"setup": ...} so the modality lookup in
  transform_session_created_event actually exercises responseModalities
  forwarding (the prior payload was silently treated as empty).

* test(gemini realtime): wrap remaining cached session configs in setup envelope

The session_configuration_request the proxy caches is always serialized
as {"setup": ...}; three modality-related tests dumped a bare config
dict instead, so transform_session_created_event's
`.get('setup', {})` quietly returned an empty dict and the
responseModalities lookup ran against the default rather than the
fixture. Wrap the remaining tests in the same shape the production
cache uses so any regression in modality forwarding actually trips.

* fix(gemini realtime): cast merged realtimeInputConfig for typeddict assignment

mypy flagged the assignment of the merged dict into
BidiGenerateContentSetup.realtimeInputConfig with [typeddict-item]: the
intermediate variable widens to dict[Any, Any], losing the TypedDict
narrowing the previous dict-literal form had.

* test(gemini realtime): wrap test_gemini_tool_call_resets_ids fixture in setup envelope

The cached session_configuration_request the proxy stores is always
serialized as {"setup": ...}; this test passed a bare config dict, so
transform_session_created_event's .get('setup', {}) returned an empty
dict and the responseModalities lookup ran against the default rather
than the fixture. Wrap the fixture in the same shape the production
cache uses.

* fix(gemini realtime): skip unknown sibling keys in transform loop

Gemini realtime messages can include sibling metadata keys like
usageMetadata alongside primary payload keys (toolCall, serverContent).
Previously, the transform loop called map_openai_event for every
top-level key, raising ValueError for unknown ones and terminating
the WebSocket session.

Skip top-level keys not present in MAP_GEMINI_FIELD_TO_OPENAI_EVENT
to keep the session alive when Gemini emits usage metadata with a
toolCall response.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): scope dotted-key event lookup and propagate session metadata to tool-call response.done

- map_openai_event: only check the current key/value pair when resolving
  dotted map entries (e.g. serverContent.turnComplete) so a sibling key in
  the same frame can't misclassify the event being processed
  (e.g. toolCall returning RESPONSE_DONE).
- tool-call path: extract generationConfig once and include modalities,
  temperature, and max_output_tokens on response.done so its shape matches
  response.created and the non-tool-call response.done.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): cast maxOutputTokens to int for typeddict assignment

* fix(gemini realtime): use camelCase maxOutputTokens in response.done

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): cast maxOutputTokens to int for typeddict assignment

* fix(realtime): inject guardrail turn_detection on subsequent session.update without one

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): tolerate sibling-only frames (e.g. standalone usageMetadata)

A Gemini Live frame that contains only metadata keys outside
_KNOWN_GEMINI_TOP_LEVEL_KEYS (e.g. a bare {"usageMetadata": {...}}
emitted between turns) leaves returned_message empty after the
transform loop and was tripping the 'Unknown message type' guard,
which raised ValueError and terminated the WebSocket session.

Treat such frames as no-ops and return the unchanged state instead.

* fix(gemini realtime): preserve sibling toolCall when serverContent has only transcription

Previously, when a Gemini frame contained both a transcription-only
serverContent and a sibling toolCall, the transcription handler would
early-return and silently drop the toolCall. Instead, mark serverContent
as handled and fall through so the main loop still processes siblings
like toolCall, while preserving the prior no-op behavior for empty/
transcription-only frames.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* refactor(gemini realtime): drop unused json_message arg from map_openai_event

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): promote nested turn_detection when flat value is not a dict

When the session payload had `turn_detection: None` (or any non-dict value), the
normalizer skipped promoting the GA nested `audio.input.turn_detection` because
it only checked key presence. The stale None then flowed into
`map_automatic_turn_detection` and raised TypeError on `'create_response' in value`.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(realtime): run guardrails on function_call_output content

Tool result outputs are client-controlled and fed to the model, so
they must pass the same content checks as user text messages.
Otherwise an attacker can smuggle blocked content into a
function_call_output and have the model process it.

* fix(gemini realtime): emit function_call_arguments.delta before .done

Gemini delivers the full function-call arguments in a single toolCall
frame. The OpenAI Realtime spec orders the streaming events as
output_item.added -> function_call_arguments.delta(+) ->
function_call_arguments.done -> output_item.done. Emit a single delta
carrying the complete arguments string before the matching .done so
spec-compliant SDK clients that accumulate deltas and gate finalisation
on at least one delta arriving do not stall on Gemini tool calls.

* fix(realtime): avoid stale session.created flag triggering guardrail re-injection

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(ci): restore guardrail injection on duplicate session.created and cast realtime delta event

- Re-enable the one-time guardrail turn_detection update on duplicate
  session.created. `_maybe_send_guardrail_turn_detection_update` is
  already idempotent via `_guardrail_turn_detection_update_sent`, so
  the previous guard was unnecessary and broke the deferred-setup path
  where the synthetic session.created is emitted by llm_http_handler
  outside this loop (no prior chance to inject).

- Cast the response.function_call_arguments.delta dict appended to
  `returned_message: List[OpenAIRealtimeEvents]` so mypy is satisfied.

* fix(realtime): forward sanitized function_call_output on guardrail block

Providers that pair every toolCall with a toolResponse (e.g. Gemini and
Vertex Live) stay in the awaiting-tool-call state until a toolResponse
arrives. Dropping a blocked function_call_output outright left those
providers stalled — the subsequent guardrail clientContent and
response.create were ignored because the prior toolCall had no matching
toolResponse.

When the client-supplied tool output fails the realtime guardrail check,
forward a sanitized placeholder function_call_output (same call_id,
generic policy marker as output) instead of dropping the message
entirely. The placeholder carries no blocked content, so the model never
sees it, while still completing the provider's tool-call cycle so the
session can recover and the violation message reaches the user.

* fix(gemini realtime): preserve sibling keys on empty toolCall no-op

Replace the early return on `functionCalls` empty/absent with a
`continue` plus a `tool_call_handled` flag that mirrors the existing
`server_content_handled` pattern. The post-loop guard already
distinguishes intentionally-consumed known keys from genuinely-unknown
messages, so adding `toolCall` to that exclusion list lets the loop
continue iterating over any sibling top-level keys in the same Gemini
frame instead of short-circuiting on the first empty toolCall.

In practice Gemini's protobuf places `toolCall`/`serverContent`/
`setupComplete` in a `oneof` so the only realistic sibling is
`usageMetadata` (already filtered as unknown-top-level), but the
uniform handling avoids silently discarding any future sibling key
should the wire format grow.

* fix(gemini realtime): redact realtime payloads from debug logs

The transform_realtime_response debug logs were dumping the raw inbound
Gemini frame and each outbound OpenAI event payload (up to 500 chars).
Realtime frames carry transcripts, model output, and tool-call arguments,
so those strings ended up in application logs whenever DEBUG was enabled.
Replace the inbound dump with just the top-level frame keys and the
outbound dump with just the event type.

* fix(realtime): check function_call_output before user role to prevent guardrail bypass

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): propagate usageMetadata on tool-call response.done

Gemini Live emits usageMetadata as a sibling top-level key alongside the
toolCall frame; the tool-call branch was unconditionally building
response.done from get_empty_usage(), so tokens consumed by tool-call
turns were recorded as zero spend and bypassed LiteLLM budget
accounting. Mirror the non-tool-call RESPONSE_DONE path: when the same
frame carries usageMetadata, run VertexGeminiConfig._calculate_usage and
forward the real token counts.

* fix(realtime): send sanitized toolResponse before guardrail clientContent

Two related fixes for the function_call_output blocked-by-guardrail path:

1. Ordering: Gemini Live requires a matching toolResponse immediately
   after a toolCall before any other client message. Previously we ran
   the guardrail first (which sends clientContent/cancel) and only then
   forwarded the sanitized function_call_output. Add an optional
   pre_block_backend_message arg to run_realtime_guardrails so the
   sanitized toolResponse is emitted before the guardrail's own backend
   messages.

2. Stale pending flag: stop setting _pending_guardrail_message in the
   tool-output block. That flag exists to swallow the reflexive
   response.create an OpenAI client sends right after a user text
   message. In tool-calling flows the client may never send a
   response.create (e.g. Gemini SDKs auto-respond), so leaving the flag
   set would consume an unrelated response.create from a later turn.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* test(model_prices): allow audio_transcription_config in schema

* fix(gemini realtime): event_id, item copy, and dict guard for tool-call events

- Emit event_id on response.output_item.added for tool calls so spec-compliant
  OpenAI Realtime SDK clients can index/deduplicate the event like every other
  server-sent event in the sequence.
- Pass a shallow copy of function_call_item to response.output_item.done and
  conversation.item.created so downstream handlers (e.g. the beta-protocol
  translator) that mutate the item dict don't corrupt sibling events sharing
  the same reference.
- Guard map_openai_event against non-dict values (e.g. Gemini's
  'setupComplete: true' boolean payload) so the WebSocket session doesn't die
  with an AttributeError on the unguarded .get() call.

Add NotRequired event_id field on OpenAIRealtimeStreamResponseOutputItemAdded
to keep existing call-sites that don't set event_id compatible.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(gemini realtime): buffer standalone usageMetadata for next response.done

Gemini Live can emit usageMetadata as a standalone WebSocket frame between
turns. The previous transformer treated those frames as no-ops, so token
counts arriving outside the closing turnComplete/toolCall frame were
dropped from spend and budget accounting. An authenticated client could
drive turns whose usage was recorded as zero, bypassing budgets.

Buffer any standalone usageMetadata on the config instance and attribute
the deferred counts to the next emitted response.done (tool-call or
normal). In-frame usageMetadata remains authoritative and clears the
buffer.

* merge main (#28839)

* fix(helm): drop main- prefix from default image tag (#28710)

* fix(helm): drop main- prefix from default image tag

The default image tag in the deployment + migrations-job templates was
`main-{{ .Chart.AppVersion }}`. The current release pipeline publishes
content tags without the `main-` prefix (e.g. `v1.85.1` / `1.85.1`,
`v1.86.0-rc.1` / `1.86.0-rc.1`), so the rendered ref points at a tag
that does not exist on GHCR or DockerHub and installs fail with
ImagePullBackOff.

- templates/deployment.yaml, templates/migrations-job.yaml: render
  `.Chart.AppVersion` directly instead of `main-<AppVersion>`.
- Chart.yaml: bump stale `appVersion: v1.80.12` (not on either
  registry) to `v1.85.1` so local-checkout installs also resolve.
- values.yaml: update the commented tag-override hint to match.

* fix(helm): use :latest in tag override example, not pinned version

Per review: ghcr.io/berriai/litellm-database:latest is a floating
alias for the most recent stable (same digest as :main-stable),
maintained by the release pipeline's UPDATE_LATEST advance step.
Better example than a pinned version that goes stale.

* test(model_prices): allow audio_transcription_config in schema (#28708)

The schema in test_aaamodel_prices_and_context_window_json_is_valid uses
additionalProperties: false. The azure/speech/azure-stt entry added in
#27482 introduced an audio_transcription_config field that the schema
did not whitelist, so the test fails on every branch built on top of
staging.

Add the field as a string property.

* fix(team): refresh team cache on team_model_add/delete (LIT-3244) (#28683)

* fix(team): refresh team cache on team_model_add/delete (LIT-3244)

team_model_add and team_model_delete wrote to the DB but did not
invalidate the in-memory LiteLLM_TeamTableCachedObj used by
common_checks. After the v1.83.14 common_checks centralization made
team.models authoritative on /v1/files and /v1/vector_stores/*,
adding a Team-BYOK model silently failed to grant the new public
model name to team members until the cache TTL expired (and a
removed model kept working until then on the symmetric path).

Extract the cache-refresh snippet from update_team into a small
helper and apply it consistently at all three team-write sites.

* test: also assert updated models in team-cache-refresh pin

Strengthens the LIT-3244 regression test to also assert
`call_kwargs["team_table"].models` matches the updated row,
not just `team_id`. Both `existing_team` and `updated_team`
share `team_id` in the test setup, so the previous assertion
would have passed even if the implementation accidentally cached
the pre-mutation row.

Greptile review feedback.

* fix(team): hydrate object_permission on cache-refreshing team updates

The Prisma update calls in update_team, team_model_add, and
team_model_delete returned a team row with object_permission_id set
but object_permission=None (the relation was not requested via
include=). _refresh_cached_team then wrote that to the in-memory
LiteLLM_TeamTableCachedObj, and the cache-hit path in get_team_object
returns the cached object without re-hydrating. Downstream consumers
(validate_key_search_tools_against_team, the MCP/agent authz paths)
treat a missing object_permission as no team-level restriction, so
a team-write op silently dropped object-permission enforcement until
the cache TTL expired or a DB-fetch path re-hydrated it.

Add include={"object_permission": True} to all three updates so the
refresh writes a complete cached team. Extend the LIT-3244 regression
test to pin both the cached object_permission and the include shape
on the Prisma call.

Surfaced in PR review of LIT-3244.

* fix(ui/add-model): stop vertex_ai-anthropic_models from leaking under Anthropic (#28723)

`getProviderModels()` matched a model into a provider's dropdown when the
model's `litellm_provider` string *contained* the provider key as a
substring. The intent was to admit suffix variants (e.g. `anthropic_text`,
`bedrock_converse`), but the substring check is too loose: it also pulls in
unrelated providers whose name happens to contain the key, most visibly
`vertex_ai-anthropic_models` matching `anthropic` and `vertex_ai-openai_models`
matching `openai`.

Replace `.includes()` with separator-anchored prefix matching
(`startsWith(provider + "_")` / `startsWith(provider + "-")`). All legitimate
variants in `model_prices_and_context_window.json` still match
(`anthropic_text`, `azure_text`, `azure_ai`, `bedrock_converse`,
`bedrock_mantle`, `cohere_chat`, `fireworks_ai-embedding-models`,
`vertex_ai-*`, `vertex_ai_beta`), and the cross-provider leak is closed.

Tests: update one assertion that pinned the buggy substring behavior
(`custom_openai_endpoint` matching `openai` — not a real provider value);
add 6 new tests covering the leak regressions and the variant-preservation
contract for vertex_ai/bedrock/fireworks.

* Fix spend logs v2 route permissions (#28705)

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: ryan-crabbe-berri <ryan-crabbe-berri@users.noreply.github.com>

* fix(proxy): Bedrock Knowledge Base pass-through: preserve SigV4 headers and signed request body (#27526)

* Fix Bedrock KB pass-through SigV4 headers and signed body

Coerce botocore HeadersDict to a dict for pass-through routes. When
forward_headers is true, drop request headers that collide case-insensitively
with signed headers so client Bearer auth does not shadow AWS SigV4.
Send prepped.body as raw content so the outbound payload matches the
signature after logging hooks mutate the parsed dict.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Simplify pass-through raw body handling

Read the SigV4-signed bytes directly from request.state inside
pass_through_request instead of threading a custom_raw_body argument
through three functions. Helper methods are restored to their original
signatures, and the new branch lives in one place at each httpx call site.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Harden pass-through raw body read from request.state

Guard missing request.state (test fixtures) and ignore non-bytes/str
values so MagicMock does not trigger the SigV4 raw-body path.

Co-authored-by: Cursor <cursoragent@cursor.com>

* Test pass_through_request state_raw_body uses httpx content=

Cover non-streaming (async_client.request) and streaming (build_request)
paths so SigV4 bytes on request.state are not replaced by json= of a
hook-mutated dict.

Co-authored-by: Cursor <cursoragent@cursor.com>

---------

Co-authored-by: Cursor <cursoragent@cursor.com>

* chore(tests): migrate Bedrock CI to AWS account 941277531214 (#28728)

* chore(tests): migrate Bedrock CI from AWS account 888602223428 to 941277531214

The original account (888602223428) was put under a security restriction by
AWS after a root access key leaked in a PR comment. While that account works
its way through the AWS Support unlock process, Bedrock-touching CI tests have
been migrated to a fresh account (941277531214).

Changes:
  - Replace 26 hardcoded references to 888602223428 with 941277531214 across
    8 files (provisioned-model ARNs, imported-model ARNs, AgentCore runtime
    ARNs, batch execution role ARN, and example proxy config).
  - The provisioned-model and imported-model ARNs are referenced only from
    mocked unit tests — no AWS resources to recreate.
  - The batch execution IAM role has been recreated in the new account with
    the same name and equivalent permissions.
  - The two AgentCore runtimes (hosted_agent_r9jvp-3ySZuRHjLC,
    hosted_agent_13sf6-cALnp38iZD) are being recreated in the new account
    under the same names — see tools/agentcore-deploy/ in a follow-up.

CircleCI env vars AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY / AWS_REGION_NAME
were updated separately via the CircleCI API to point at the new account.

Smoke-tested locally against the new account:
  aws bedrock-runtime converse --region us-west-2 \
    --model-id us.anthropic.claude-sonnet-4-5-20250929-v1:0 \
    --messages '[{"role":"user","content":[{"text":"ping"}]}]'
  → 200, model returned 'pong'

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* chore(tests): refresh AgentCore ARN suffixes to match newly-deployed runtimes

The first migration commit replaced just the account ID, but AgentCore
auto-assigns a random 10-char suffix to every runtime on creation — we
can't reuse the original suffixes (`3ySZuRHjLC`, `cALnp38iZD`) in the
new account. Updated the AgentCore-runtime ARNs in the three files that
reference real runtime IDs (not the mock-based unit-test ARNs).

Deployed runtimes:
  arn:aws:bedrock-agentcore:us-west-2:941277531214:runtime/hosted_agent_r9jvp-Rq79QFC2fp
  arn:aws:bedrock-agentcore:us-west-2:941277531214:runtime/hosted_agent_13sf6-4046UzHSwy

Both runtimes are status=READY and pass a smoke invoke:
  $ aws bedrock-agentcore invoke-agent-runtime --agent-runtime-arn ... --payload '{"prompt":"ping"}'
  → 200, {"result": "echo: ping"}

The agent is a minimal echo (see /tmp/agentcore_deploy/agent.py for the
deploy artifacts). Tests that only verify the SDK wiring will pass; if any
test asserts on agent output content, swap the echo for the real agent.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* chore(tests): point Bedrock batch tests at new-account S3 bucket

The account migration (888602223428 -> 941277531214) was a flat
account-ID swap, which only rewrites ARNs that embed the account
number. S3 bucket names carry no account ID, so the live Bedrock
batch tests still uploaded to `litellm-proxy` — a bucket that lives
in the old account. S3 names are globally unique, and the old account
still holds that name, so it can't be recreated in the new account.

Rename to `litellm-proxy-941277531214` (account-ID suffix guarantees
global uniqueness). The bucket must be created in 941277531214 and the
batch execution role granted s3:GetObject/PutObject/ListBucket on it
before this job is run in CI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(tests): point live S3 logging test at new-account bucket

Same account-ID-free blind spot as the batch bucket: `load-testing-oct`
lives in the old account and its name can't be reused globally. The
`logging_testing` CI job is wired into the workflow and runs
test_basic_s3_logging, which uploads to this bucket with the CI env
creds, then lists and deletes objects — a live dependency.

Rename to `load-testing-oct-941277531214`. The bucket must exist in the
new account with the CI IAM principal granted
s3:PutObject/GetObject/ListBucket/DeleteObject before this job runs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(tests): repoint Bedrock guardrail IDs to new-account guardrails

The migration left guardrail IDs untouched (no account ID in them), so
all live guardrail tests failed with "guardrail identifier or version
does not exist" against 941277531214. Recreated both guardrails in the
new account and updated the hardcoded IDs:
  - wf0hkdb5x07f -> zgkmukebruil (PII mask: PHONE + CREDIT_DEBIT_CARD,
    with explicit inputAction=ANONYMIZE so masking applies to INPUT,
    which is the source litellm's moderation hook sends)
  - ff6ujrregl1q -> 4w3d1di3snt5 (blocks "coffee"; blocked message set
    to the exact string the tests assert on)

Updated test_bedrock_guardrails.py, otel_test_config.yaml, and the
guardrailConfig in test_bedrock_completion.py. Verified locally: the 5
previously-failing guardrail tests now pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(bedrock): migrate legacy models to current inference profiles

The new CI account (941277531214) cannot invoke legacy Bedrock models
(AWS gates them: "marked by provider as Legacy... not actively using in
the last 30 days"). Migrated the live-call tests:
  - anthropic.claude-3-sonnet-20240229    -> us.anthropic.claude-sonnet-4-5-20250929-v1:0
  - anthropic.claude-3-haiku-20240307     -> us.anthropic.claude-haiku-4-5-20251001-v1:0
Current Claude models on Bedrock require the us. inference-profile prefix
(bare on-demand ids are rejected).

cohere.command-r-plus has no working replacement (all Cohere is legacy-
gated in the new account): swapped to claude-haiku-4-5 in provider-
agnostic param lists. amazon.titan-image-generator skipped (no working
replacement). Mocked/transformation/cost tests that reference the legacy
strings are intentionally left unchanged. Verified live against the new
account.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(bedrock): repoint SageMaker + Knowledge Base to new-account resources

These referenced account-scoped resources by hardcoded id that only
existed in the old account, so the migration's account-ID swap missed
them. Recreated in 941277531214 and repointed:
  - SageMaker endpoint jumpstart-dft-hf-textgeneration1-mp-20240815-185614
    -> litellm-ci-textgen (gpt2 on a TGI container, ml.g5.xlarge)
  - Bedrock Knowledge Base T37J8R4WTM -> LCYXFBR2TU (OpenSearch Serverless
    vector store + titan-embed-text-v2, seeded with a LiteLLM doc)
Verified live: test_sagemaker.py (12 passed) and
test_bedrock_knowledgebase_hook.py (12 passed).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(reasoning_effort_grid): skip bedrock claude-opus-4-7 cells (not entitled on 941277531214)

claude-opus-4-7 is listed in the new Bedrock CI account's foundation
models but invoke is denied (AccessDeniedException: "not available for
this account"). Bedrock access to the flagship Opus requires an AWS
Sales request, not the self-serve model-access toggle, so it can't be
enabled inline with the rest of the account migration.

Add an optional `skip_reason` to ModelEntry and set it on the
bedrock-claude-opus-4-7 entry; the grid test honors it via pytest.skip.
Cell count (231) and route coverage are unchanged, so the structural
asserts still pass. Restore coverage by deleting the one skip_reason
line once access is granted.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(bedrock): swap/skip legacy-gated models unavailable on new CI account

The migrated AWS account (941277531214) cannot access several models that
the old account could, so the remaining red CI jobs were hitting real
Bedrock "Access denied / Legacy" and "account not authorized" errors:

- image_gen: skip both Nova Canvas test classes (amazon.nova-canvas-v1:0 is
  legacy-gated), matching the existing titan skip.
- batches: skip test_async_file_and_batch (Bedrock batch inference is not
  authorized on the new account; requires an AWS support case).
- litellm_overhead: swap legacy claude-3-5-haiku for the active
  us.anthropic.claude-haiku-4-5 inference profile.
- test_completion_claude_3_function_call: swap legacy claude-3-sonnet for the
  active us.anthropic.claude-sonnet-4-5 inference profile.

https://claude.ai/code/session_01Y7zgHYu9GX29YRwV4yiWAa

* test(bedrock): fix remaining e2e legacy-model + batch failures on new CI account

- e2e_openai_endpoints: skip test_bedrock_batches_api (Bedrock batch inference
  is not authorized on account 941277531214) and migrate the missed
  s3_bucket_name in oai_misc_config.yaml to litellm-proxy-941277531214.
- build_and_test: swap legacy bedrock claude-3-sonnet for the active
  us.anthropic.claude-sonnet-4-5 inference profile in the proxy structured
  output e2e test.

https://claude.ai/code/session_01Y7zgHYu9GX29YRwV4yiWAa

* test(bedrock): make opus-4-7 + batch cells fail loudly and mock image-gen (#28791)

Replace the silent skips added for the new CI account with noisier behavior:
- reasoning-effort grid: opus-4-7 cells now fail (when AWS creds are present)
  instead of skipping, so the missing entitlement stays visible in CI; they
  still skip when AWS creds are absent (local dev)
- Bedrock batch inference tests: drop the skip so they run and fail until
  batch access is granted
- Titan + Nova Canvas image-gen tests: mock the Bedrock HTTP call so the
  transform + cost-tracking path stays under test without live model access

https://claude.ai/code/session_01MT7SWDnXUjv6e6EPG7BDjT

Co-authored-by: Claude <noreply@anthropic.com>

* test(bedrock): use pytest.xfail for known-failing opus-4-7 cells

Replace pytest.fail with pytest.xfail when a model has a fail_reason,
so known-broken cells stay visible as XFAIL without keeping CI red.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

---------

Co-authored-by: Mateo <mateo@Mateos-MacBook-Pro.local>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(otel): export SERVER span on management-endpoint success without http_request (#28794)

Co-authored-by: Yassin Kortam <yassinkortam@Yassins-MacBook-Pro.local>

* chore(ci): merge dev branch (#28801)

* chore(proxy): route path-dependent call sites through get_request_route

Replace direct ``request.url.path`` reads in auth, ACL, routing, and
audit-log decisions with ``get_request_route(request)`` — the helper
already added in ``auth/auth_utils.py`` that returns the ASGI
``scope["path"]`` with ``root_path`` stripped. Starlette reconstructs
``url.path`` from the Host header; ``scope["path"]`` is uvicorn's
parse of the request line and matches what FastAPI dispatches on, so
it's the authoritative route for any decision that should agree with
the actual handler.

Sites:
- _experimental/mcp_server/auth/user_api_key_auth_mcp.py
- management_endpoints/mcp_management_endpoints.py
- vector_store_endpoints/utils.py
- pass_through_endpoints/pass_through_endpoints.py
- auth/route_checks.py
- litellm_pre_call_utils.py
- spend_tracking/spend_management_endpoints.py
- common_utils/http_parsing_utils.py
- management_helpers/utils.py
- health_endpoints/_health_endpoints.py

Adds regression tests in tests/proxy_unit_tests/test_proxy_routes.py
that construct a Request with scope["path"] set to a benign route and
the Host header crafted so url.path would resolve differently; each
site's decision is asserted against scope["path"].

* chore(proxy): make get_request_route imports lazy at call sites

Move the ``from litellm.proxy.auth.auth_utils import get_request_route``
imports added in the prior commit back to the function bodies that use
them. The module-level form participates in a long-standing import
cycle through ``auth_utils -> _types -> ...`` and was flagged by CodeQL
on the PR; the lazy form matches the pattern the proxy already uses
for ``user_api_key_auth`` and related helpers elsewhere in these files.

Also drop the ``RouteChecks._is_assistants_api_request`` delegation in
``_get_metadata_variable_name`` introduced in the prior commit — the
delegation pulled ``RouteChecks`` into the same cycle, and the call
site reuses the resolved route for its other branches, so inlining
the substring check is both cycle-free and avoids a redundant second
``get_request_route`` call.

Comment in test_proxy_routes.py acknowledges that the two MCP table
entries exercise ``get_request_route`` directly rather than the full
production handler (which needs ASGI scope + MCP state to invoke).

---------

Co-authored-by: shin-berri <shin-laptop@berri.ai>
Co-authored-by: user <70670632+stuxf@users.noreply.github.com>

* chore(ci): merge dev branch (#28657)

* feat(dashboard): navbar hierarchy + Agent Platform notifications (#27543)

* feat(dashboard): refine navbar zones and Agent Platform notice

Restructure the admin navbar for production users: clear product vs community
vs personal columns with vertical dividers, icon-only Slack/GitHub in a
shared chip, and Docs/Blog typography aligned on an 8px rhythm.

Add a notifications bell with popover linking to the LiteLLM Agent Platform
repo and optional mark-as-read persistence.

Promote the account control with initials avatar, single-line display name,
and navDisplayName mapping for placeholder user ids (e.g. default_user_id).

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(dashboard): address PR review — AntD buttons, public page guard, dedupe regex

- Replace raw <button> with AntD Button in BlogDropdown, NotificationsBell, UserDropdown, and test mock
- Guard NotificationsBell + container behind !isPublicPage to avoid rendering on public pages
- Remove redundant equality checks in navDisplayName (regex already covers them)
- Remove unused `lower` variable after simplification

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: yuneng-jiang <yuneng@berri.ai>

* fix(dashboard): drop dead useHealthReadiness import in navbar

The module was removed in #27896 (replaced by useHealthReadinessDetails),
but the import survived the rebase. The symbol is unused — only
useHealthReadinessDetails is consumed in the file. Removing the dead
import unblocks the UI TypeScript build.

* fix(dashboard): align CommunityEngagementButtons test with icon-only aria-labels

The component was refactored to an icon-only chip with aria-label='LiteLLM
on GitHub' (squash #27543), but the test still asserted /star us on
github/i. Update the query to match the rendered accessible name.

* refactor(dashboard): drop unused props from NavbarProps

The navbar refactor moved user identity + dark-mode state to internal
hooks (useAuthorized, useWorker), but the NavbarProps interface still
declared userID, userEmail, userRole, premiumUser, isDarkMode, and
toggleDarkMode as required, forcing every caller to thread them through.

Drop them from the interface and all four call sites (page.tsx,
(dashboard)/layout.tsx, public_model_hub.tsx, navbar.test.tsx). Also
shrinks the destructure in layout.tsx so the now-unused locals stop
being pulled out of useAuthorized().

* refactor(dashboard): use useSyncExternalStore for NotificationsBell dismiss flag

Reads/writes of the litellmHideAgentPlatformBanner key were done
directly inside NotificationsBell via a useEffect + useState pair.
Every other localStorage-backed flag in the dashboard (Disable
ShowPrompts, DisableBouncingIcon, DisableShowNewBadge,
DisableUsageIndicator, DisableBlogPosts) is wrapped in a
useSyncExternalStore hook over localStorageUtils so all mounted
components stay in sync.

Extract useHideAgentPlatformBanner to follow the same shape, swap
NotificationsBell to consume it, and add a regression test that
two sibling bells stay in sync without a remount when one is
dismissed.

* refactor: mask credential fields in proxy settings GET responses (#28682)

* refactor: mask credential fields in proxy settings GET responses

Brings SSO settings, cache settings, and the email/Slack alerting view in
/get/config/callbacks in line with the HashiCorp Vault config-override
pattern, so persisted credentials are not transported back to the UI in
plaintext.

* refactor: harden short-value masking and hoist alerting var constant

Closes two review observations:

- mask_sensitive_keys now replaces short values (below the visible
  prefix+suffix length) with an all-mask string instead of returning them
  unchanged, so a 1-7 character credential is no longer round-tripped
  verbatim.
- _ALERTING_SENSITIVE_VARS is moved out of get_config() to a module-level
  constant, matching the analogous _SSO_SENSITIVE_FIELDS and
  _CACHE_SENSITIVE_FIELDS in the SSO and cache endpoint files.

---------

Co-authored-by: Krrish Dholakia <krrish+github@berri.ai>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* fix(ui): show 2-decimal precision for max_budget on key overview (#28809)

The Key Info Overview tab's Spend card truncated sub-dollar budgets to
"$0" because formatNumberWithCommas defaults to 0 decimals. The Settings
tab passes 2; align the overview so a $0.10 budget renders as "$0.10".

Resolves LIT-2845

* feat(proxy): allow `llm_api_routes` virtual keys to list MCP servers (#28442)

* feat(proxy): allow llm_api_routes virtual keys to list MCP servers

Add a new `mcp_discovery_routes` group (GET /v1/mcp/server and GET
/v1/mcp/server/{server_id}) and include it in `llm_api_routes` so that
virtual keys configured with `allowed_routes=["llm_api_routes"]` can
discover the MCP servers they have access to. Previously these calls
failed with 'Virtual key is not allowed to call this route. Only allowed
to call routes: [llm_api_routes]'.

The GET handlers already sanitize the response for restricted virtual
keys via `_sanitize_mcp_server_list_for_virtual_key`, stripping
credential-bearing fields (url, headers, env). Write methods
(POST/PUT/DELETE) on the same paths remain gated by the existing
handler-level admin role checks.

The new discovery list is intentionally kept OUT of
`mcp_inference_routes`, so `is_llm_api_route()` still returns False
for these paths — this preserves the existing contract that
DISABLE_LLM_API_ENDPOINTS must not block the Admin UI from listing MCP
servers.

Co-authored-by: ryan-crabbe-berri <ryan-crabbe-berri@users.noreply.github.com>

* refactor(proxy): make MCP discovery carve-out method-aware

Replace the `mcp_discovery_routes` group in `llm_api_routes` with a
method-aware special case inside `is_virtual_key_allowed_to_call_route`.
Virtual keys with allowed_routes=["llm_api_routes"] are now permitted
to call only GET /v1/mcp/server and GET /v1/mcp/server/{server_id} —
non-GET methods and multi-segment admin sub-paths fall through to the
existing 403. This keeps the general llm_api_routes list free of
management paths and avoids accidentally exposing POST/PUT/DELETE
writes through the route-check layer.

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: ryan-crabbe-berri <ryan-crabbe-berri@users.noreply.github.com>

* chore(ci): merge dev branch (#28807)

* chore(proxy): route path-dependent call sites through get_request_route

Replace direct ``request.url.path`` reads in auth, ACL, routing, and
audit-log decisions with ``get_request_route(request)`` — the helper
already added in ``auth/auth_utils.py`` that returns the ASGI
``scope["path"]`` with ``root_path`` stripped. Starlette reconstructs
``url.path`` from the Host header; ``scope["path"]`` is uvicorn's
parse of the request line and matches what FastAPI dispatches on, so
it's the authoritative route for any decision that should agree with
the actual handler.

Sites:
- _experimental/mcp_server/auth/user_api_key_auth_mcp.py
- management_endpoints/mcp_management_endpoints.py
- vector_store_endpoints/utils.py
- pass_through_endpoints/pass_through_endpoints.py
- auth/route_checks.py
- litellm_pre_call_utils.py
- spend_tracking/spend_management_endpoints.py
- common_utils/http_parsing_utils.py
- management_helpers/utils.py
- health_endpoints/_health_endpoints.py

Adds regression tests in tests/proxy_unit_tests/test_proxy_routes.py
that construct a Request with scope["path"] set to a benign route and
the Host header crafted so url.path would resolve differently; each
site's decision is asserted against scope["path"].

* chore(proxy): make get_request_route imports lazy at call sites

Move the ``from litellm.proxy.auth.auth_utils import get_request_route``
imports added in the prior commit back to the function bodies that use
them. The module-level form participates in a long-standing import
cycle through ``auth_utils -> _types -> ...`` and was flagged by CodeQL
on the PR; the lazy form matches the pattern the proxy already uses
for ``user_api_key_auth`` and related helpers elsewhere in these files.

Also drop the ``RouteChecks._is_assistants_api_request`` delegation in
``_get_metadata_variable_name`` introduced in the prior commit — the
delegation pulled ``RouteChecks`` into the same cycle, and the call
site reuses the resolved route for its other branches, so inlining
the substring check is both cycle-free and avoids a redundant second
``get_request_route`` call.

Comment in test_proxy_routes.py acknowledges that the two MCP table
entries exercise ``get_request_route`` directly rather than the full
production handler (which needs ASGI scope + MCP state to invoke).

---------

Co-authored-by: shin-berri <shin-laptop@berri.ai>
Co-authored-by: user <70670632+stuxf@users.noreply.github.com>

* fix(team): keep team_alias cache in sync on _cache_team_object writes (#28737)

* fix(team): keep team_alias cache in sync on _cache_team_object writes

_cache_team_object wrote only to the team_id:<id> cache key, but the
JWT auth path that uses team_alias_jwt_field reads from a separate
team_alias:<alias> key (get_team_object_by_alias caches under both
keys on miss, but reads only the alias-keyed one). After any
team-mutation endpoint (team_model_add, team_model_delete,
update_team, the two access-group writes) the team_id cache was
refreshed but the team_alias cache stayed stale until TTL — JWT
callers using team_alias_jwt_field kept seeing the pre-mutation
team for the full cache window.

Mirror the write under the alias key inside _cache_team_object so
every existing caller stays in sync without further changes. Skip
the alias write when team_alias is None/empty so we don't collide
across alias-less teams.

Surfaced testing the LIT-3244 cherry-pick on patch/1.86.0: the
LIT-3244 fix correctly invalidated the team_id cache but the
customer's JWT used team_alias_jwt_field, so they kept hitting the
stale alias-keyed entry.

* fix(team): delete (not overwrite) team_alias cache on _cache_team_object

The prior shape of this PR wrote both team_id:<id> AND team_alias:<alias>
from _cache_team_object. team_alias is NOT unique in the schema
(no @unique on LiteLLM_TeamTable.team_alias), and get_team_object_by_alias
enforces uniqueness on its own DB-fetch path (len(teams) > 1 raises).
Writing the alias-keyed cache from the generic refresh path bypassed
that check: a team admin renaming their team to collide with another
team's alias could silently overwrite the cached team for JWT-by-alias
auth, swapping the resolved team under that alias for the cache window.

Switch the alias-keyed operation from a write to a delete (mirroring
the dual-cache delete pattern in _delete_cache_key_object). After every
team write, the next JWT-by-alias reader cache-misses and falls through
to get_team_object_by_alias, which (a) re-fetches the fresh team from
DB, closing the LIT-3244 staleness gap that motivated this PR, and
(b) enforces alias uniqueness before populating either cache key.

team_id:<id> writes are unchanged — team_id is the table PK and is
guaranteed unique.

Surfaced in veria-ai review on #28739.

* fix(managed-files): anchor model_id regex so it doesn't match llm_output_file_model_id

extract_model_id_from_unified_id used `re.search(r"model_id,([^;]+)", ...)`
which substring-matches the `model_id,` inside the file-ID encoding's
`llm_output_file_model_id,<deployment_uuid>` field. parse_unified_id
then fed that deployment UUID back into the auth path as a model
candidate via _extract_models_from_managed_resource_id, and every
team-BYOK file attach 403'd with:

    team not allowed to access model. This team can only access
    models=['openai/*']. Tried to access <deployment-uuid>

The team's models list correctly contains the public name (`openai/*`)
that target_model_names matches, but the bogus UUID candidate fails
the wildcard check first.

Anchor the regex to a field boundary (`(?:^|;)model_id,`) so it
matches the legitimate top-level `model_id,<value>` field on
vector_store unified IDs and skips substring matches inside other
fields. File-IDs (which have no top-level `model_id` field) now
return None and contribute no spurious UUID candidate.

Surfaced reproducing LIT-3244 on patch/1.86.0 with the customer's
exact flow: team with openai/* BYOK deployment, JWT-scoped user,
POST /v1/vector_stores/{id}/files attaching a file uploaded with
target_model_names=openai/gpt-4o.

* fix(proxy): hydrate wildcard discovery credentials (#28284) (#28822)

* fix(proxy): hydrate wildcard discovery credentials

* fix(proxy): constrain wildcard credential hydration

Co-authored-by: Dibyo Mukherjee <dibyo@adobe.com>

* ci: add daily oss-agent-shin branch creation workflow (#28829)

Creates litellm_oss_agent_shin_MM_DD_YYYY from main every day at 00:00 UTC.
Lets us retarget oss-agent-shin fork PRs onto a canonical branch so CircleCI runs with secrets, without granting the agent write access.

Co-authored-by: shin-berri <shin-laptop@berri.ai>
Co-authored-by: yuneng-jiang <yuneng@berri.ai>
Co-authored-by: Ishaan Jaffer <ishaanjaffer0324@gmail.com>

* test(proxy): add harness for proxy_server.py behavior-pinning (#28827)

* test(proxy): add harness for proxy_server.py behavior-pinning

Creates tests/test_litellm/proxy/proxy_server/ with:
- conftest.py: 11 shared fixtures (app, client, mock_prisma, auth_as,
  mock_router with parametrized response builders, normalize, etc.)
- _coverage_check.py: per-PR coverage gate (line + branch) against a
  baseline, self-selects target by inspecting which placeholder files
  have been filled
- _pin_check.py: AST-based gate that verifies every pin-list item has
  >=1 happy + >=1 error test with a real assertion (no status-only)
- test_harness_smoke.py: 19 smoke tests covering every fixture +
  both scripts end-to-end
- 26 placeholder test files (one docstring each) reserved for
  follow-up PRs per the directory ownership in the Notion plan
- .coverage_baseline pinned at 0% so future PRs measure deltas
  against new-tests-only and aren't entangled with the broader
  scattered test suite

Adds a dedicated proxy-server job to test-unit-proxy-endpoints.yml
so this directory's runtime + coverage are tracked independently.

Plan: https://www.notion.so/36c43b8acdab81ee845fd5365128a2fc

* ci(proxy-endpoints): allow workflow_dispatch

Lets the workflow be triggered manually on a branch via
`gh workflow run`, which is needed for the verify-first
flow on workflow changes before opening a PR.

* test(proxy): address review feedback on proxy_server harness

- conftest.py: anchor sys.path insert to __file__ (Path(__file__).resolve().parents[4])
  instead of CWD-relative os.path.abspath("../../../../") which resolved
  to the wrong directory when pytest is launched from the repo root.
- _coverage_check.py: actually read .coverage_baseline and use it as
  the floor (line_min = max(target, baseline)). Closes the gap between
  the PR description's "delta semantics" and what the script was doing.
  With baseline=0.0 today this is a no-op; future PRs that update the
  baseline cause regressions (test deletions etc.) to trip the gate
  even if the static PR target is still met.
- _pin_check.py: drop unreachable startswith("_") guard
  (test_*.py glob never yields underscore-prefixed names) and read
  each test file once instead of twice.

* feat(openai): apply regional-processing cost uplift for EU/US data residency (#28626)

* feat(openai): apply regional-processing cost uplift for EU/US data residency

OpenAI charges a 10% uplift on the latest GPT models when requests are
served from a regionalized hostname (eu./us.api.openai.com).  Infer the
region from `api_base`, expose it on `kwargs["litellm_params"]["data_residency"]`,
and multiply the computed cost by a per-model
`regional_processing_uplift_multiplier_<region>` field.

https://claude.ai/code/session_012ebH44s7ohYxjoix5CXzTW

* test: allow regional_processing_uplift_multiplier_{eu,us} in model_prices schema

* fix(cost): tighten data_residency inference and restore model_cost in tests

- Only infer OpenAI data_residency when custom_llm_provider == "openai";
  drop the implicit None fallback so non-OpenAI callers can't accidentally
  pick up a regional tag from a stray OpenAI hostname.
- _local_model_cost_map fixture now snapshots and restores
  litellm.model_cost and LITELLM_LOCAL_MODEL_COST_MAP so tests don't leak
  state across the session.

* refactor(openai): move data_residency helper under llms/openai

* fix: thread data_residency through realtime stream cost calculation

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(cost): thread data_residency through batch_cost_calculator

Apply the OpenAI regional-processing uplift multiplier to retrieve_batch
cost paths so Batch API requests served via eu./us.api.openai.com are
priced at the same uplifted token rates as completions/transcriptions.

* refactor(openai): encapsulate provider check inside infer_openai_data_residency

Move the custom_llm_provider == "openai" guard from get_litellm_params
into the helper itself so the core utility no longer carries
provider-specific dispatch logic. Callers pass through the provider
unconditionally; the helper returns None for any non-OpenAI provider.

* fix(responses): thread data_residency through Responses logging params

The Responses API paths build their logging litellm_params dict after
provider resolution but did not include data_residency, so cost calc
saw None even when the effective api_base was a regional OpenAI host.

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Yassin Kortam <yassin@berri.ai>

---------

Co-authored-by: yuneng-jiang <yuneng@berri.ai>
Co-authored-by: ryan-crabbe-berri <ryan@berri.ai>
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: ryan-crabbe-berri <ryan-crabbe-berri@users.noreply.github.com>
Co-authored-by: milan-berri <milan@berri.ai>
Co-authored-by: Mateo Wang <277851410+mateo-berri@users.noreply.github.com>
Co-authored-by: Mateo <mateo@Mateos-MacBook-Pro.local>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Yassin Kortam <yassin@berri.ai>
Co-authored-by: Yassin Kortam <yassinkortam@Yassins-MacBook-Pro.local>
Co-authored-by: shin-berri <shin-laptop@berri.ai>
Co-authored-by: user <70670632+stuxf@users.noreply.github.com>
Co-authored-by: Krrish Dholakia <krrish+github@berri.ai>
Co-authored-by: Dibyo Mukherjee <dibyo@adobe.com>
Co-authored-by: ishaan-berri <155045088+ishaan-berri@users.noreply.github.com>
Co-authored-by: Ishaan Jaffer <ishaanjaffer0324@gmail.com>

* Revert "merge main (#28839)"

This reverts commit fa956e8c42ea975a452665fedd5c4ecebaab9be8.

* fix(gemini-realtime): prevent double-attribution of usageMetadata in multi-key frames

Co-authored-by: Yassin Kortam <yassin@berri.ai>

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Yassin Kortam <yassin@berri.ai>
Co-authored-by: mateo-berri <277851410+mateo-berri@users.noreply.github.com>
Co-authored-by: yuneng-jiang <yuneng@berri.ai>
Co-authored-by: ryan-crabbe-berri <ryan@berri.ai>
Co-authored-by: ryan-crabbe-berri <ryan-crabbe-berri@users.noreply.github.com>
Co-authored-by: milan-berri <milan@berri.ai>
Co-authored-by: Mateo <mateo@Mateos-MacBook-Pro.local>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Yassin Kortam <yassinkortam@Yassins-MacBook-Pro.local>
Co-authored-by: shin-berri <shin-laptop@berri.ai>
Co-authored-by: user <70670632+stuxf@users.noreply.github.com>
Co-authored-by: Krrish Dholakia <krrish+github@berri.ai>
Co-authored-by: Dibyo Mukherjee <dibyo@adobe.com>
Co-authored-by: ishaan-berri <155045088+ishaan-berri@users.noreply.github.com>
Co-authored-by: Ishaan Jaffer <ishaanjaffer0324@gmail.com>
This commit is contained in:
Sameer Kankute 2026-05-27 03:10:42 +05:30 committed by GitHub
parent 9c0a98c9f1
commit 3d0e0cee56
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 3219 additions and 129 deletions

View File

@ -225,6 +225,11 @@ use_chat_completions_url_for_anthropic_messages: bool = bool(
route_all_chat_openai_to_responses: bool = (
os.getenv("LITELLM_ROUTE_ALL_CHAT_OPENAI_TO_RESPONSES", "false").lower() == "true"
) # When True, routes all OpenAI /chat/completions requests through the Responses API bridge
# When True, Gemini/Vertex Live setup is deferred until client `session.update`.
# Default False preserves historical behavior (auto-send setup on connect).
gemini_live_defer_setup: bool = (
os.getenv("LITELLM_GEMINI_LIVE_DEFER_SETUP", "false").lower() == "true"
)
use_legacy_interactions_schema: bool = (
os.getenv("LITELLM_USE_LEGACY_INTERACTIONS_SCHEMA", "false").lower() == "true"
) # When True, sends Api-Revision: 2026-05-07 to Google so responses use the legacy `outputs`

View File

@ -86,6 +86,12 @@ class RealTimeStreaming:
# When a text message is blocked, hold the guardrail reason so the next
# response.create can be rewritten to include the failure context.
self._pending_guardrail_message: Optional[str] = None
# Track whether session.created has already been sent to the client
# (e.g. synthetic event in deferred setup mode).
self._session_created_sent_to_client: bool = False
# Track whether we have already sent the guardrail turn-detection update
# that disables provider auto-response for transcription guardrails.
self._guardrail_turn_detection_update_sent: bool = False
_SESSION_EVENT_TYPES = frozenset(["session.created", "session.updated"])
_AUDIO_FORMAT_MAP: Dict[str, Dict[str, Any]] = {
@ -248,22 +254,52 @@ class RealTimeStreaming:
## SYNC LOGGING
executor.submit(self.logging_obj.success_handler(self.messages))
async def _send_to_backend(self, message: str) -> None:
async def _send_to_backend(self, message: str) -> bool:
"""Send a message to the backend WebSocket.
If a provider_config is set the message is first passed through
transform_realtime_request so that provider-specific translation
(e.g. dropping session.update for Vertex AI) is applied even for
guardrail-injected messages.
Returns True if at least one message was actually delivered to the
backend, False if the provider transformation produced no output and
the message was effectively dropped.
"""
if self.provider_config:
transformed = self.provider_config.transform_realtime_request(
message, self.model, self.session_configuration_request
)
sent = False
for msg in transformed:
# Send first; only cache the setup payload once the backend
# has actually accepted it. Caching before send would leave
# ``session_configuration_request`` populated after a failed
# send, causing subsequent client session.update messages to
# be treated as "subsequent" and dropped even though the
# backend never received the original setup.
await self.backend_ws.send(msg) # type: ignore[union-attr, attr-defined]
else:
await self.backend_ws.send(message) # type: ignore[union-attr, attr-defined]
self._cache_session_configuration_request(msg)
sent = True
return sent
await self.backend_ws.send(message) # type: ignore[union-attr, attr-defined]
return True
def _cache_session_configuration_request(self, transformed_message: str) -> None:
"""Store setup payload once sent to backend.
Updates the cached setup on every successful setup send so follow-up
``session.update`` messages (which produce a merged setup with new
``generationConfig`` / ``systemInstruction`` / etc.) are reflected in
the cache used by downstream readers (``transform_session_created_event``,
``return_new_content_delta_events`` modality lookup, ...).
"""
try:
message_obj = json.loads(transformed_message)
if "setup" in message_obj:
self.session_configuration_request = transformed_message
except (json.JSONDecodeError, TypeError):
return
def _make_disable_auto_response_message(self) -> str:
"""Return a session.update that disables VAD auto-response."""
@ -280,6 +316,20 @@ class RealTimeStreaming:
}
return json.dumps({"type": "session.update", "session": session})
async def _maybe_send_guardrail_turn_detection_update(self) -> None:
"""Disable provider auto-response once when transcription guardrails are enabled."""
if self._guardrail_turn_detection_update_sent:
return
if not self._has_audio_transcription_guardrails():
return
sent = await self._send_to_backend(self._make_disable_auto_response_message())
# Only mark as sent when the provider transformation actually delivered
# the update to the backend. Otherwise (e.g. Gemini drops session.update
# after the initial setup), leave the flag unset so future opportunities
# — such as a duplicate session.created — can retry.
if sent:
self._guardrail_turn_detection_update_sent = True
def _has_realtime_guardrails(self) -> bool:
"""Return True if any callback is registered for realtime guardrail event types."""
from litellm.integrations.custom_guardrail import CustomGuardrail
@ -318,12 +368,20 @@ class RealTimeStreaming:
self,
transcript: str,
item_id: Optional[str] = None,
pre_block_backend_message: Optional[str] = None,
) -> bool:
"""
Run registered guardrails on a completed speech transcription.
Returns True if blocked (synthetic warning already sent to client).
Returns False if clean (caller should send response.create to the backend).
``pre_block_backend_message`` (if provided) is sent to the backend
BEFORE any of the guardrail's own backend messages when a block is
triggered. This is needed for protocol contracts that require a
specific message to be sent first e.g. Gemini Live requires a
matching ``toolResponse`` immediately after a ``toolCall`` before any
other client messages can be accepted.
"""
from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.types.guardrails import GuardrailEventHooks
@ -383,6 +441,13 @@ class RealTimeStreaming:
getattr(callback, "realtime_violation_message", None) or safe_msg
)
# Deliver any caller-supplied backend message FIRST so that
# protocol contracts requiring a specific ordering (e.g.
# Gemini Live's mandatory ``toolResponse`` after a
# ``toolCall``) are honored before the guardrail's own
# clientContent / cancel messages are sent.
if pre_block_backend_message is not None:
await self._send_to_backend(pre_block_backend_message)
# Cancel any in-progress LLM response (e.g. VAD auto-response).
await self._send_to_backend(json.dumps({"type": "response.cancel"}))
# Send the policy violation hint (shows as small gray status text in UI).
@ -478,16 +543,34 @@ class RealTimeStreaming:
else [transformed_response]
)
for event in events:
is_session_created_event = (
isinstance(event, dict) and event.get("type") == "session.created"
)
if is_session_created_event:
if self._session_created_sent_to_client:
# A synthetic session.created (with placeholder defaults) was
# already forwarded to the client when we connected. The
# provider's real session.created (e.g. emitted from Gemini
# `setupComplete`) carries the authoritative modalities/model
# from the client's session.update. Re-emit it as
# `session.updated` so the client learns the corrected
# configuration without seeing two `session.created` events.
event = {**event, "type": "session.updated"}
else:
self._session_created_sent_to_client = True
event_str = json.dumps(event)
## For audio/VAD guardrail path: forward session.created first, then inject.
if (
isinstance(event, dict)
and event.get("type") == "session.created"
and self._has_audio_transcription_guardrails()
):
## For audio/VAD guardrail path: forward the (possibly retyped)
## session.created first, then invoke the one-time guardrail
## turn-detection update. ``_maybe_send_guardrail_turn_detection_update``
## is idempotent (gated by ``_guardrail_turn_detection_update_sent``),
## so duplicate session.created events — including those emitted
## after a synthetic session.created from ``llm_http_handler`` in
## deferred-setup mode — still get a single chance to inject the
## update if a prior attempt was dropped by the provider transform.
if is_session_created_event and self._has_audio_transcription_guardrails():
self.store_message(event_str)
await self.websocket.send_text(event_str)
await self._send_to_backend(self._make_disable_auto_response_message())
await self._maybe_send_guardrail_turn_detection_update()
continue
## GUARDRAIL: run on transcription events in provider_config path too
if (
@ -790,12 +873,13 @@ class RealTimeStreaming:
item["content"] = new_content
return item
async def client_ack_messages(self):
async def client_ack_messages(self): # noqa: PLR0915
try:
while True:
message = await self.websocket.receive_text()
## GUARDRAIL: intercept conversation.item.create for text-based injection.
guardrail_turn_detection_injected = False
try:
msg_obj = json.loads(message)
msg_type = msg_obj.get("type")
@ -803,7 +887,68 @@ class RealTimeStreaming:
if msg_type == "conversation.item.create":
# Check user text messages for prompt injection
item = msg_obj.get("item", {})
if item.get("role") == "user":
# Check function_call_output first so a client cannot
# bypass the tool-result guardrail by also setting
# role="user" on a function_call_output item.
if item.get("type") == "function_call_output":
# Tool results are client-controlled and fed to the
# model; check them with the same guardrail used for
# user text so an attacker cannot smuggle blocked
# content into a function_call_output.
output = item.get("output", "")
output_text = (
output
if isinstance(output, str)
else json.dumps(output)
)
if output_text:
# Build the sanitized function_call_output up
# front so we can hand it to the guardrail
# runner as the pre-block message. Providers
# that pair every toolCall with a toolResponse
# (e.g. Gemini/Vertex Live) require the
# toolResponse to arrive BEFORE any other
# client message — otherwise the guardrail's
# own clientContent would violate the
# pending-tool-call protocol contract and the
# backend could close the connection before
# the sanitized response ever lands. Dropping
# the blocked item outright would similarly
# leave such providers waiting indefinitely.
# The sanitized payload carries no blocked
# content — only a generic policy marker.
sanitized_msg = json.dumps(
{
**msg_obj,
"item": {
**item,
"output": json.dumps(
{
"error": "Tool output blocked by content policy",
}
),
},
}
)
blocked = await self.run_realtime_guardrails(
output_text,
pre_block_backend_message=sanitized_msg,
)
if blocked:
# ``_pending_guardrail_message`` is
# intentionally NOT set here. That flag
# exists to swallow the reflexive
# ``response.create`` an OpenAI client
# sends immediately after a user text
# message. In a tool-calling flow the
# client may not send a ``response.create``
# at all (e.g. Gemini SDKs auto-respond),
# so leaving the flag set would
# incorrectly drop an unrelated
# ``response.create`` from a later
# interaction turn.
continue
elif item.get("role") == "user":
content_list = item.get("content", [])
texts = [
c.get("text", "")
@ -831,6 +976,89 @@ class RealTimeStreaming:
self._pending_guardrail_message = None
continue
## GUARDRAIL: Inject turn_detection into first session.update
# if needed. Done BEFORE the GA remap so the injected
# ``create_response`` rides along with any client-provided
# turn_detection fields (e.g. silence_duration_ms) into the
# nested ``audio.input.turn_detection`` path produced by the
# remap. Doing this after the remap would create a separate
# minimal root-level ``turn_detection`` and silently drop
# the client's nested settings.
if (
msg_type == "session.update"
and self.session_configuration_request is None
and not self._guardrail_turn_detection_update_sent
and self._has_audio_transcription_guardrails()
):
session = msg_obj.setdefault("session", {})
if isinstance(session, dict):
existing_td = session.get("turn_detection")
if not isinstance(existing_td, dict):
existing_td = {}
existing_td["create_response"] = False
session["turn_detection"] = existing_td
message = json.dumps(msg_obj)
guardrail_turn_detection_injected = True
verbose_logger.debug(
"Injected turn_detection into first session.update for audio transcription guardrails"
)
## GUARDRAIL: Force ``create_response`` to False in any
# client-provided ``turn_detection`` so a later
# ``session.update`` cannot re-enable VAD auto-response
# and bypass the transcription guardrail after the
# initial disable. Covers both the flat beta key and the
# nested GA ``audio.input.turn_detection`` shape, since
# the GA remap below also accepts either form. Skipped
# when the injection block above already ran for this
# message, to avoid redundant double-serialization.
if (
msg_type == "session.update"
and not guardrail_turn_detection_injected
and self._has_audio_transcription_guardrails()
):
session = msg_obj.get("session")
if isinstance(session, dict):
td_overridden = False
flat_td = session.get("turn_detection")
flat_td_present = flat_td is not None
if flat_td_present:
if not isinstance(flat_td, dict):
flat_td = {}
if flat_td.get("create_response") is not False:
flat_td["create_response"] = False
session["turn_detection"] = flat_td
td_overridden = True
nested_td_present = False
audio = session.get("audio")
if isinstance(audio, dict):
audio_input = audio.get("input")
if isinstance(audio_input, dict):
nested_td = audio_input.get("turn_detection")
if nested_td is not None:
nested_td_present = True
if not isinstance(nested_td, dict):
nested_td = {}
if (
nested_td.get("create_response")
is not False
):
nested_td["create_response"] = False
audio_input["turn_detection"] = nested_td
td_overridden = True
# Symmetric with the first-update injection block:
# if the client omitted turn_detection entirely on
# a subsequent session.update, still inject the
# ``create_response: False`` override so the
# transcription guardrail cannot be re-enabled by
# any downstream merge that drops the original
# disable.
if not flat_td_present and not nested_td_present:
session["turn_detection"] = {"create_response": False}
td_overridden = True
if td_overridden:
message = json.dumps(msg_obj)
# GA compatibility: remap beta-style session fields only when
# the upstream is in GA mode. Beta upstreams expect the flat
# session shape unchanged.
@ -848,17 +1076,20 @@ class RealTimeStreaming:
pass
## LOGGING
# Log after any in-place modifications (GA remap, guardrail
# turn_detection injection) so audit logs reflect what we
# actually forward to the backend.
self.store_input(message=message)
## FORWARD TO BACKEND
if self.provider_config:
message = self.provider_config.transform_realtime_request(
message, self.model
)
for msg in message:
await self.backend_ws.send(msg) # type: ignore[union-attr]
else:
await self.backend_ws.send(message) # type: ignore[union-attr]
## FORWARD TO BACKEND
# Only mark the guardrail turn_detection update as sent after the
# backend actually accepted the message. Setting the flag earlier
# would permanently disable the injection if ``_send_to_backend``
# raised — neither this loop nor
# ``_maybe_send_guardrail_turn_detection_update`` would retry.
sent = await self._send_to_backend(message)
if guardrail_turn_detection_injected and sent:
self._guardrail_turn_detection_update_sent = True
except Exception as e:
verbose_logger.debug(f"Error in client ack messages: {e}")

View File

@ -3,6 +3,7 @@ from typing import TYPE_CHECKING, Any, List, Optional, Union
import httpx
from litellm.types.llms.openai import OpenAIRealtimeStreamSessionEvents
from litellm.types.realtime import (
RealtimeResponseTransformInput,
RealtimeResponseTypedDict,
@ -69,6 +70,20 @@ class BaseRealtimeConfig(ABC):
) -> Optional[str]: # message sent to setup the realtime session
return None
def transform_session_created_event(
self,
model: str,
logging_session_id: str,
session_configuration_request: Optional[str] = None,
) -> Optional[Union[dict, OpenAIRealtimeStreamSessionEvents]]:
"""
Optional hook for providers that defer session setup until client `session.update`.
Return an OpenAI-compatible `session.created` payload when the proxy should
emit a synthetic event immediately after backend websocket connection.
"""
return None
@abstractmethod
def transform_realtime_response(
self,

View File

@ -5316,6 +5316,28 @@ class BaseLLMHTTPHandler:
)
if _session_config:
realtime_streaming.session_configuration_request = _session_config
# For providers that defer setup until client session.update, optionally
# send synthetic session.created to unblock clients waiting on connect.
if not provider_config.requires_session_configuration():
synthetic_session = provider_config.transform_session_created_event(
model=model,
logging_session_id=logging_obj.litellm_trace_id,
session_configuration_request=None,
)
if synthetic_session is not None:
synthetic_session_str = json.dumps(synthetic_session)
# Record before sending so the synthetic session.created is
# captured in the session log alongside provider-driven
# events; without this it would be silently absent from
# success_handler / async_success_handler payloads.
realtime_streaming.store_message(synthetic_session_str)
await websocket.send_text(synthetic_session_str)
realtime_streaming._session_created_sent_to_client = True
verbose_logger.debug(
"Sent synthetic session.created to client to unblock connection"
)
await realtime_streaming.bidirectional_forward()
except websockets.exceptions.InvalidStatusCode as e: # type: ignore

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,7 @@ Auth: OAuth2 Bearer token (not an API key).
import json
from typing import List, Optional
from litellm import verbose_logger
from litellm.llms.gemini.realtime.transformation import GeminiRealtimeConfig
@ -26,6 +27,7 @@ class VertexAIRealtimeConfig(GeminiRealtimeConfig):
"""
def __init__(self, access_token: str, project: str, location: str) -> None:
super().__init__()
self._access_token = access_token
self._project = project
self._location = location
@ -138,6 +140,62 @@ class VertexAIRealtimeConfig(GeminiRealtimeConfig):
# Request translation
# ------------------------------------------------------------------
def _vertex_model_path(self, model: str) -> str:
"""Return the fully-qualified Vertex AI model resource path."""
return (
f"projects/{self._project}"
f"/locations/{self._location}"
f"/publishers/google/models/{model}"
)
def _build_vertex_ai_setup_config(self, model: str, session_params: dict) -> dict:
"""Build Vertex AI setup configuration with proper model path and defaults."""
# Normalize GA-remapped fields (``output_modalities``, nested
# ``audio.input.transcription``, ``audio.input.turn_detection``) back to
# their flat beta keys so ``map_openai_params`` picks them up. Without
# this, GA clients' explicit modality / transcription / turn-detection
# settings would be silently dropped because ``map_openai_params`` only
# recognises the flat OpenAI-beta key names.
session_params = self._normalize_session_payload_for_mapping(session_params)
setup_config = self.map_openai_params(
optional_params={}, non_default_params=session_params
)
# Use full Vertex AI model path
setup_config["model"] = self._vertex_model_path(model)
# Add Vertex AI specific defaults if not provided
generation_config = setup_config.setdefault("generationConfig", {})
generation_config.setdefault("responseModalities", ["AUDIO"])
# Ensure Vertex defaults for realtimeInputConfig apply even when
# the client provided a partial ``turn_detection`` (e.g. only
# ``silence_duration_ms``). ``map_automatic_turn_detection`` sets
# ``disabled=True`` whenever ``create_response`` is absent or
# ``False``. Force ``disabled=False`` only when the client did
# not explicitly request ``create_response: False`` — that path
# is how transcription guardrails suppress automatic responses,
# and overriding it here would silently bypass the guardrail.
# Vertex Live has no "VAD on, no auto-response" mode, so callers
# that need that behaviour must accept that VAD is off.
client_turn_detection = session_params.get("turn_detection")
client_disabled_auto_response = (
isinstance(client_turn_detection, dict)
and client_turn_detection.get("create_response") is False
)
realtime_input_config = setup_config.setdefault("realtimeInputConfig", {})
automatic_detection = realtime_input_config.setdefault(
"automaticActivityDetection", {}
)
if not client_disabled_auto_response:
automatic_detection["disabled"] = False
automatic_detection.setdefault("silenceDurationMs", 800)
setup_config.setdefault("inputAudioTranscription", {})
setup_config.setdefault("outputAudioTranscription", {})
return setup_config
def transform_realtime_request(
self,
message: str,
@ -147,16 +205,50 @@ class VertexAIRealtimeConfig(GeminiRealtimeConfig):
"""
Translate OpenAI realtime client messages to Vertex AI format.
``session.update`` is intentionally ignored (returns []) because
Vertex AI only accepts a single ``setup`` message at the start of
the connection sending a second one causes a 1007 close error.
The initial setup (sent automatically before bidirectional_forward)
already includes AUDIO modality and server VAD, so there is nothing
more to configure.
On the first ``session.update`` (when no setup has been sent yet) the
full ``BidiGenerateContentSetup`` is built with Vertex AI's model path
and forwarded. Any later ``session.update`` is dropped: Vertex AI
documents ``setup`` as the first-and-only client message, and a second
``setup`` closes the connection with a 1007 policy error.
"""
json_message = json.loads(message)
if json_message.get("type") == "session.update":
# Do not forward as a second setup — Vertex AI rejects it.
msg_type = json_message.get("type")
if msg_type == "session.update":
if session_configuration_request is None:
setup_config = self._build_vertex_ai_setup_config(
model, json_message.get("session") or {}
)
gemini_setup_msg = json.dumps({"setup": setup_config})
verbose_logger.debug(
"Vertex AI Realtime: Sending initial setup with tools to backend"
)
return [gemini_setup_msg]
# A follow-up session.update can't be forwarded as a second setup
# (Vertex Live closes the WebSocket with 1007). If this drop is
# silencing the audio-transcription guardrail's create_response
# disable, surface a warning so operators know the model will
# auto-respond before the guardrail can gate it on Vertex AI.
client_turn_detection = GeminiRealtimeConfig._extract_turn_detection(
json_message.get("session") or {}
)
if (
isinstance(client_turn_detection, dict)
and client_turn_detection.get("create_response") is False
):
verbose_logger.warning(
"Vertex AI Realtime: Dropping subsequent session.update "
"(turn_detection.create_response=False) — Vertex Live "
"rejects a second setup message. Audio-transcription "
"guardrails cannot suppress the model's auto-response on "
"Vertex AI in non-deferred mode."
)
else:
verbose_logger.debug(
"Vertex AI Realtime: Ignoring session.update (setup already sent)"
)
return []
return super().transform_realtime_request(

View File

@ -133,7 +133,7 @@ class BidiGenerateContentSetup(TypedDict, total=False):
tools: List[Tools]
"""The tools to be used for the realtime session."""
realtimeInputConfig: dict
realtimeInputConfig: BidiGenerateContentRealtimeInputConfig
"""The realtime config to be used for the realtime session."""
sessionResumption: dict

View File

@ -79,7 +79,14 @@ from pydantic import (
field_serializer,
field_validator,
)
from typing_extensions import Annotated, Dict, Required, TypedDict, override
from typing_extensions import (
Annotated,
Dict,
NotRequired,
Required,
TypedDict,
override,
)
from litellm.types.llms.base import BaseLiteLLMOpenAIResponseObject
from litellm.types.responses.main import (
@ -1935,6 +1942,7 @@ class OpenAIRealtimeStreamResponseOutputItemAdded(TypedDict):
response_id: str
output_index: int
item: OpenAIRealtimeStreamResponseOutputItem
event_id: NotRequired[str]
class OpenAIRealtimeStreamResponseBaseObject(TypedDict):
@ -2061,6 +2069,17 @@ class OpenAIRealtimeContentPartDone(TypedDict):
type: Literal["response.content_part.done"]
class OpenAIRealtimeFunctionCallArgumentsDone(TypedDict):
type: Literal["response.function_call_arguments.done"]
event_id: str
response_id: str
item_id: str
output_index: int
call_id: str
name: str
arguments: str
class OpenAIRealtimeOutputItemDone(TypedDict):
event_id: str
item: OpenAIRealtimeStreamResponseOutputItem
@ -2126,6 +2145,7 @@ OpenAIRealtimeEvents = Union[
OpenAIRealtimeResponseAudioDone,
OpenAIRealtimeContentPartDone,
OpenAIRealtimeOutputItemDone,
OpenAIRealtimeFunctionCallArgumentsDone,
OpenAIRealtimeDoneEvent,
]

View File

@ -476,6 +476,50 @@ async def test_transcription_captured_in_backend_to_client():
assert logging_obj.model_call_details["messages"] == streaming.input_messages
@pytest.mark.asyncio
async def test_client_ack_caches_setup_to_prevent_duplicate_session_update_setup():
websocket = MagicMock()
backend_ws = MagicMock()
logging_obj = MagicMock()
logging_obj.pre_call = MagicMock()
# Two session.update messages arrive before setupComplete round-trip.
websocket.receive_text = AsyncMock(
side_effect=[
json.dumps({"type": "session.update", "session": {"tools": []}}),
json.dumps({"type": "session.update", "session": {"tools": []}}),
Exception("client done"),
]
)
provider_config = MagicMock()
def _transform(message: str, model: str, session_configuration_request=None):
if session_configuration_request is None:
return [json.dumps({"setup": {"model": "models/gemini-2.5-flash"}})]
return []
provider_config.transform_realtime_request = MagicMock(side_effect=_transform)
backend_ws.send = AsyncMock()
streaming = RealTimeStreaming(
websocket=websocket,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
await streaming.client_ack_messages()
# Setup should be forwarded exactly once even with repeated session.update.
assert backend_ws.send.await_count == 1
assert streaming.session_configuration_request is not None
sent_payload = json.loads(backend_ws.send.await_args_list[0].args[0])
assert "setup" in sent_payload
def test_collect_session_tools_from_session_update():
"""
Test that tools from session.update events are collected.
@ -879,6 +923,169 @@ async def test_realtime_text_input_guardrail_blocks_and_returns_error():
litellm.callbacks = [] # cleanup
@pytest.mark.asyncio
async def test_realtime_function_call_output_guardrail_blocks_and_returns_error():
"""
Test that a client-supplied function_call_output whose content triggers a
guardrail is blocked: it is not forwarded to the backend, and an error
event is sent to the client.
"""
from fastapi import HTTPException
import litellm
from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.types.guardrails import GuardrailEventHooks
class BlockingGuardrail(CustomGuardrail):
async def apply_guardrail(
self, inputs, request_data, input_type, logging_obj=None
):
texts = inputs.get("texts", [])
for text in texts:
if "@" in text:
raise HTTPException(
status_code=403,
detail={"error": "email address detected"},
)
return inputs
guardrail = BlockingGuardrail(
guardrail_name="email-blocker",
event_hook=GuardrailEventHooks.pre_call,
default_on=True,
)
litellm.callbacks = [guardrail]
client_ws = MagicMock()
client_ws.send_text = AsyncMock()
backend_ws = MagicMock()
backend_ws.send = AsyncMock()
backend_ws.recv = AsyncMock(side_effect=ConnectionClosed(None, None))
logging_obj = MagicMock()
logging_obj.pre_call = MagicMock()
streaming = RealTimeStreaming(client_ws, backend_ws, logging_obj)
item_create_msg = json.dumps(
{
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": "call_123",
"output": "Tool says: my email is test@example.com",
},
}
)
client_ws.receive_text = AsyncMock(
side_effect=[
item_create_msg,
Exception("connection closed"),
]
)
await streaming.client_ack_messages()
sent_texts = [json.loads(c.args[0]) for c in client_ws.send_text.call_args_list]
error_events = [e for e in sent_texts if e.get("type") == "error"]
assert len(error_events) == 1, f"Expected one error event, got: {sent_texts}"
assert error_events[0]["error"]["type"] == "guardrail_violation"
sent_to_backend = [c.args[0] for c in backend_ws.send.call_args_list if c.args]
forwarded_tool_outputs = [
json.loads(m)
for m in sent_to_backend
if isinstance(m, str)
and json.loads(m).get("type") == "conversation.item.create"
and json.loads(m).get("item", {}).get("type") == "function_call_output"
]
# A sanitized placeholder must reach the backend so providers that pair
# every toolCall with a toolResponse (Gemini/Vertex Live) exit their
# pending-tool-call state instead of stalling. The placeholder must NOT
# contain any of the blocked content.
assert len(forwarded_tool_outputs) == 1, (
f"Sanitized function_call_output should be forwarded, got: "
f"{forwarded_tool_outputs}"
)
sanitized_item = forwarded_tool_outputs[0]["item"]
assert sanitized_item["call_id"] == "call_123"
assert "test@example.com" not in sanitized_item["output"]
litellm.callbacks = [] # cleanup
@pytest.mark.asyncio
async def test_realtime_function_call_output_guardrail_allows_clean_output():
"""
Test that a clean function_call_output passes through and reaches the backend
when guardrails are configured.
"""
import litellm
from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.types.guardrails import GuardrailEventHooks
class BlockingGuardrail(CustomGuardrail):
async def apply_guardrail(
self, inputs, request_data, input_type, logging_obj=None
):
return inputs
guardrail = BlockingGuardrail(
guardrail_name="noop",
event_hook=GuardrailEventHooks.pre_call,
default_on=True,
)
litellm.callbacks = [guardrail]
client_ws = MagicMock()
client_ws.send_text = AsyncMock()
backend_ws = MagicMock()
backend_ws.send = AsyncMock()
backend_ws.recv = AsyncMock(side_effect=ConnectionClosed(None, None))
logging_obj = MagicMock()
logging_obj.pre_call = MagicMock()
streaming = RealTimeStreaming(client_ws, backend_ws, logging_obj)
item_create_msg = json.dumps(
{
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": "call_456",
"output": '{"temperature": 72, "unit": "F"}',
},
}
)
client_ws.receive_text = AsyncMock(
side_effect=[
item_create_msg,
Exception("connection closed"),
]
)
await streaming.client_ack_messages()
sent_to_backend = [c.args[0] for c in backend_ws.send.call_args_list if c.args]
forwarded = [
json.loads(m)
for m in sent_to_backend
if isinstance(m, str)
and json.loads(m).get("type") == "conversation.item.create"
and json.loads(m).get("item", {}).get("type") == "function_call_output"
]
assert (
len(forwarded) == 1
), f"Clean function_call_output should be forwarded, got: {forwarded}"
litellm.callbacks = [] # cleanup
@pytest.mark.asyncio
async def test_realtime_text_input_guardrail_uses_pre_call_mode():
"""
@ -1160,3 +1367,406 @@ async def test_on_violation_end_session_closes_on_first_fail():
assert streaming._violation_count == 1
litellm.callbacks = [] # cleanup
@pytest.mark.asyncio
async def test_provider_path_suppresses_duplicate_session_created_after_synthetic():
client_ws = MagicMock()
client_ws.send_text = AsyncMock()
backend_ws = MagicMock()
backend_ws.recv = AsyncMock(
side_effect=[b'{"setupComplete": {}}', ConnectionClosed(None, None)]
)
backend_ws.send = AsyncMock()
provider_config = MagicMock()
provider_config.transform_realtime_response = MagicMock(
return_value={
"response": [
{
"type": "session.created",
"event_id": "event_1",
"session": {"id": "sess_1", "modalities": ["audio"]},
}
],
"current_output_item_id": None,
"current_response_id": None,
"current_delta_chunks": [],
"current_conversation_id": None,
"current_item_chunks": [],
"current_delta_type": None,
"session_configuration_request": None,
}
)
logging_obj = MagicMock()
logging_obj.litellm_trace_id = "trace_1"
logging_obj.async_success_handler = AsyncMock()
logging_obj.success_handler = MagicMock()
streaming = RealTimeStreaming(
websocket=client_ws,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
# Simulate synthetic session.created already sent by llm_http_handler.
streaming._session_created_sent_to_client = True
await streaming.backend_to_client_send_messages()
sent_payloads = [json.loads(c.args[0]) for c in client_ws.send_text.call_args_list]
assert not any(
payload.get("type") == "session.created" for payload in sent_payloads
), f"Expected duplicate session.created to be suppressed, got: {sent_payloads}"
@pytest.mark.asyncio
async def test_duplicate_session_created_still_triggers_guardrail_turn_detection_update():
client_ws = MagicMock()
client_ws.send_text = AsyncMock()
backend_ws = MagicMock()
backend_ws.recv = AsyncMock(
side_effect=[b'{"setupComplete": {}}', ConnectionClosed(None, None)]
)
backend_ws.send = AsyncMock()
provider_config = MagicMock()
provider_config.transform_realtime_response = MagicMock(
return_value={
"response": [
{
"type": "session.created",
"event_id": "event_1",
"session": {"id": "sess_1", "modalities": ["audio"]},
}
],
"current_output_item_id": None,
"current_response_id": None,
"current_delta_chunks": [],
"current_conversation_id": None,
"current_item_chunks": [],
"current_delta_type": None,
"session_configuration_request": None,
}
)
logging_obj = MagicMock()
logging_obj.litellm_trace_id = "trace_1"
logging_obj.async_success_handler = AsyncMock()
logging_obj.success_handler = MagicMock()
streaming = RealTimeStreaming(
websocket=client_ws,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
# Synthetic session.created already sent by llm_http_handler.
streaming._session_created_sent_to_client = True
streaming._has_audio_transcription_guardrails = MagicMock(return_value=True) # type: ignore[method-assign]
streaming._send_to_backend = AsyncMock() # type: ignore[method-assign]
await streaming.backend_to_client_send_messages()
# Duplicate session.created should still cause the one-time guardrail
# turn_detection update to be sent to backend.
assert streaming._send_to_backend.await_count == 1
sent_update = json.loads(streaming._send_to_backend.await_args_list[0].args[0])
assert sent_update["type"] == "session.update"
injected_session = sent_update["session"]
assert injected_session["type"] == "realtime"
assert (
injected_session["audio"]["input"]["turn_detection"]["create_response"] is False
)
@pytest.mark.asyncio
async def test_guardrail_update_respects_idempotency_flag():
"""Verify guardrail turn-detection update uses idempotency flag correctly."""
client_ws = AsyncMock()
backend_ws = MagicMock()
backend_ws.send = AsyncMock()
logging_obj = MagicMock()
logging_obj.litellm_trace_id = "trace_1"
logging_obj.async_success_handler = AsyncMock()
logging_obj.success_handler = MagicMock()
provider_config = MagicMock()
provider_config.transform_realtime_request = MagicMock(
side_effect=lambda msg, model, session_config: [msg]
)
streaming = RealTimeStreaming(
websocket=client_ws,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
streaming._has_audio_transcription_guardrails = MagicMock(return_value=True) # type: ignore[method-assign]
# First call should send the update
assert streaming._guardrail_turn_detection_update_sent is False
await streaming._maybe_send_guardrail_turn_detection_update()
assert streaming._guardrail_turn_detection_update_sent is True
assert backend_ws.send.await_count == 1
# Second call should be a no-op (idempotent)
await streaming._maybe_send_guardrail_turn_detection_update()
assert backend_ws.send.await_count == 1 # Still 1, not 2
@pytest.mark.asyncio
async def test_guardrail_turn_detection_injected_into_first_session_update_deferred_mode():
"""Verify turn_detection is injected into first session.update in deferred mode."""
client_ws = AsyncMock()
client_ws.receive_text = AsyncMock(
side_effect=[
json.dumps(
{
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"tools": [{"type": "function", "name": "get_weather"}],
},
}
),
ConnectionClosed(None, None),
]
)
backend_ws = MagicMock()
backend_ws.send = AsyncMock()
logging_obj = MagicMock()
logging_obj.litellm_trace_id = "trace_1"
logging_obj.async_success_handler = AsyncMock()
logging_obj.success_handler = MagicMock()
provider_config = MagicMock()
transformed_messages = []
def mock_transform(msg, model, session_config):
transformed_messages.append((msg, session_config))
return [msg] # Pass through for simplicity
provider_config.transform_realtime_request = MagicMock(side_effect=mock_transform)
streaming = RealTimeStreaming(
websocket=client_ws,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
streaming._has_audio_transcription_guardrails = MagicMock(return_value=True) # type: ignore[method-assign]
# Simulate first session.update in deferred mode
await streaming.client_ack_messages()
# Verify turn_detection was injected into the session.update. The
# injection runs before the GA remap, so the create_response flag ends
# up nested under audio.input.turn_detection in the GA-shaped payload.
assert len(transformed_messages) == 1
transformed_msg, session_config = transformed_messages[0]
msg_obj = json.loads(transformed_msg)
assert msg_obj["type"] == "session.update"
session_obj = msg_obj["session"]
injected_turn_detection = session_obj.get("turn_detection") or session_obj.get(
"audio", {}
).get("input", {}).get("turn_detection")
assert injected_turn_detection is not None
assert injected_turn_detection["create_response"] is False
assert streaming._guardrail_turn_detection_update_sent is True
@pytest.mark.asyncio
@pytest.mark.parametrize("existing_turn_detection", [None, "auto", 42, ["server_vad"]])
async def test_guardrail_turn_detection_injection_tolerates_non_dict_value(
existing_turn_detection,
):
"""Client-supplied non-dict turn_detection must not crash client_ack_messages."""
client_ws = AsyncMock()
client_ws.receive_text = AsyncMock(
side_effect=[
json.dumps(
{
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"turn_detection": existing_turn_detection,
},
}
),
ConnectionClosed(None, None),
]
)
backend_ws = MagicMock()
backend_ws.send = AsyncMock()
logging_obj = MagicMock()
logging_obj.litellm_trace_id = "trace_1"
logging_obj.async_success_handler = AsyncMock()
logging_obj.success_handler = MagicMock()
provider_config = MagicMock()
transformed_messages = []
def mock_transform(msg, model, session_config):
transformed_messages.append((msg, session_config))
return [msg]
provider_config.transform_realtime_request = MagicMock(side_effect=mock_transform)
streaming = RealTimeStreaming(
websocket=client_ws,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
streaming._has_audio_transcription_guardrails = MagicMock(return_value=True) # type: ignore[method-assign]
await streaming.client_ack_messages()
assert len(transformed_messages) == 1
transformed_msg, _ = transformed_messages[0]
msg_obj = json.loads(transformed_msg)
session_obj = msg_obj["session"]
injected_turn_detection = session_obj.get("turn_detection") or session_obj.get(
"audio", {}
).get("input", {}).get("turn_detection")
assert isinstance(injected_turn_detection, dict)
assert injected_turn_detection["create_response"] is False
assert streaming._guardrail_turn_detection_update_sent is True
@pytest.mark.asyncio
@pytest.mark.parametrize(
"client_session",
[
{"turn_detection": {"type": "server_vad", "create_response": True}},
{
"audio": {
"input": {
"turn_detection": {"type": "server_vad", "create_response": True}
}
}
},
],
)
async def test_subsequent_session_update_cannot_reenable_vad_when_guardrails_active(
client_session,
):
"""A subsequent client session.update must not be allowed to flip
``create_response`` back to True once audio transcription guardrails have
disabled VAD auto-response. Covers both the flat beta shape and the
nested GA ``audio.input.turn_detection`` shape.
"""
client_ws = AsyncMock()
client_ws.receive_text = AsyncMock(
side_effect=[
json.dumps({"type": "session.update", "session": client_session}),
ConnectionClosed(None, None),
]
)
backend_ws = MagicMock()
backend_ws.send = AsyncMock()
logging_obj = MagicMock()
logging_obj.litellm_trace_id = "trace_1"
logging_obj.async_success_handler = AsyncMock()
logging_obj.success_handler = MagicMock()
provider_config = MagicMock()
transformed_messages = []
def mock_transform(msg, model, session_config):
transformed_messages.append((msg, session_config))
return [msg]
provider_config.transform_realtime_request = MagicMock(side_effect=mock_transform)
streaming = RealTimeStreaming(
websocket=client_ws,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
streaming._has_audio_transcription_guardrails = MagicMock(return_value=True) # type: ignore[method-assign]
# Simulate that initial setup + guardrail disable have already happened.
streaming.session_configuration_request = json.dumps({"setup": {"model": "x"}})
streaming._guardrail_turn_detection_update_sent = True
await streaming.client_ack_messages()
assert len(transformed_messages) == 1
forwarded_msg, _ = transformed_messages[0]
msg_obj = json.loads(forwarded_msg)
session_obj = msg_obj["session"]
forwarded_turn_detection = session_obj.get("turn_detection") or session_obj.get(
"audio", {}
).get("input", {}).get("turn_detection")
assert isinstance(forwarded_turn_detection, dict)
assert forwarded_turn_detection["create_response"] is False
@pytest.mark.asyncio
async def test_follow_up_setup_updates_cached_session_configuration_request():
"""A follow-up setup produced by a subsequent session.update must replace
the cached ``session_configuration_request`` so downstream readers
(e.g. modality lookup in ``response.created``) see the latest config."""
client_ws = AsyncMock()
client_ws.receive_text = AsyncMock(
side_effect=[
json.dumps({"type": "session.update", "session": {"tools": []}}),
ConnectionClosed(None, None),
]
)
backend_ws = MagicMock()
backend_ws.send = AsyncMock()
logging_obj = MagicMock()
logging_obj.async_success_handler = AsyncMock()
logging_obj.success_handler = MagicMock()
provider_config = MagicMock()
follow_up_setup = json.dumps(
{
"setup": {
"model": "models/gemini-2.5-flash",
"generationConfig": {"responseModalities": ["TEXT"]},
"tools": [{"function_declarations": []}],
}
}
)
provider_config.transform_realtime_request = MagicMock(
return_value=[follow_up_setup]
)
streaming = RealTimeStreaming(
websocket=client_ws,
backend_ws=backend_ws,
logging_obj=logging_obj,
provider_config=provider_config,
model="gemini-2.5-flash",
)
# Simulate that the original auto-setup was already cached.
streaming.session_configuration_request = json.dumps(
{
"setup": {
"model": "models/gemini-2.5-flash",
"generationConfig": {"responseModalities": ["AUDIO"]},
}
}
)
await streaming.client_ack_messages()
assert streaming.session_configuration_request == follow_up_setup

View File

@ -19,6 +19,7 @@ import websockets.exceptions # registers websockets.exceptions on the websocket
sys.path.insert(0, os.path.abspath("../../../../.."))
import litellm
from litellm.llms.vertex_ai.realtime.transformation import VertexAIRealtimeConfig
# ---------------------------------------------------------------------------
@ -82,6 +83,85 @@ def test_session_configuration_request_model_format():
)
def test_vertex_requires_session_configuration_feature_flag(monkeypatch):
cfg = VertexAIRealtimeConfig(
access_token="tok", project="my-proj", location="us-central1"
)
# Default remains backwards-compatible (auto setup on connect)
monkeypatch.setattr(litellm, "gemini_live_defer_setup", False, raising=False)
assert cfg.requires_session_configuration() is True
# Opt-in deferred setup for tool-injection flow
monkeypatch.setattr(litellm, "gemini_live_defer_setup", True, raising=False)
assert cfg.requires_session_configuration() is False
def test_vertex_session_update_defaults_to_audio_modality():
cfg = VertexAIRealtimeConfig(
access_token="tok", project="my-proj", location="us-central1"
)
session_update = {
"type": "session.update",
"session": {
"instructions": "You are a helpful assistant.",
# No modalities provided on purpose
},
}
messages = cfg.transform_realtime_request(
json.dumps(session_update),
"gemini-live-2.5-flash-native-audio",
session_configuration_request=None,
)
assert len(messages) == 1
setup_payload = json.loads(messages[0])["setup"]
assert setup_payload["generationConfig"]["responseModalities"] == ["AUDIO"]
def test_vertex_session_update_normalizes_ga_remapped_fields():
"""GA-format clients send ``output_modalities`` and nested
``audio.input.transcription`` / ``audio.input.turn_detection``. These must
be normalised back to the flat beta keys before ``map_openai_params``
runs so client preferences aren't silently dropped.
"""
cfg = VertexAIRealtimeConfig(
access_token="tok", project="my-proj", location="us-central1"
)
session_update = {
"type": "session.update",
"session": {
"instructions": "Be concise.",
"output_modalities": ["text"],
"audio": {
"input": {
"transcription": {},
"turn_detection": {"silence_duration_ms": 1500},
},
},
},
}
messages = cfg.transform_realtime_request(
json.dumps(session_update),
"gemini-live-2.5-flash-native-audio",
session_configuration_request=None,
)
assert len(messages) == 1
setup_payload = json.loads(messages[0])["setup"]
assert setup_payload["generationConfig"]["responseModalities"] == ["TEXT"]
assert setup_payload["inputAudioTranscription"] == {}
assert (
setup_payload["realtimeInputConfig"]["automaticActivityDetection"][
"silenceDurationMs"
]
== 1500
)
# ---------------------------------------------------------------------------
# Round-trip test: text-in / text-out via RealTimeStreaming
# ---------------------------------------------------------------------------
@ -208,3 +288,61 @@ async def test_vertex_realtime_text_in_text_out():
# response.done should have been forwarded
done_msgs = [m for m in sent_to_client if '"response.done"' in m]
assert done_msgs, "Expected response.done to be sent to client"
def test_vertex_warns_when_dropping_guardrail_turn_detection_update(caplog):
"""A subsequent session.update carrying the guardrail's
``create_response: False`` cannot be forwarded as a follow-up setup on
Vertex AI (1007). Surface a warning so operators know the auto-response
suppression is being silently dropped."""
import logging
cfg = VertexAIRealtimeConfig(
access_token="tok", project="my-proj", location="us-central1"
)
session_update = {
"type": "session.update",
"session": {"turn_detection": {"create_response": False}},
}
with caplog.at_level(logging.WARNING, logger="LiteLLM"):
result = cfg.transform_realtime_request(
json.dumps(session_update),
"gemini-live-2.5-flash-native-audio",
session_configuration_request=json.dumps({"setup": {"model": "x"}}),
)
assert result == []
assert any(
"Vertex AI Realtime" in record.message
and "create_response=False" in record.message
for record in caplog.records
)
def test_vertex_does_not_warn_when_dropping_non_guardrail_session_update(caplog):
"""A subsequent session.update without ``create_response: False`` is a
routine drop and should stay at debug level (no warning)."""
import logging
cfg = VertexAIRealtimeConfig(
access_token="tok", project="my-proj", location="us-central1"
)
session_update = {
"type": "session.update",
"session": {"instructions": "Be concise."},
}
with caplog.at_level(logging.WARNING, logger="LiteLLM"):
cfg.transform_realtime_request(
json.dumps(session_update),
"gemini-live-2.5-flash-native-audio",
session_configuration_request=json.dumps({"setup": {"model": "x"}}),
)
assert not any(
"Vertex AI Realtime" in record.message and "session.update" in record.message
for record in caplog.records
)