From 7a96b3490d8ac241865fe6930f658aee1575976f Mon Sep 17 00:00:00 2001 From: Mateo Wang <277851410+mateo-berri@users.noreply.github.com> Date: Wed, 10 Jun 2026 21:26:35 -0700 Subject: [PATCH] [internal copy of #30137] perf(realtime): eliminate redundant per-frame JSON work on OpenAI realtime relay (#30142) * perf(realtime): eliminate redundant per-frame JSON work on OpenAI realtime relay The GA realtime support added in #27110 made backend_to_client_send_messages parse every backend frame up to three times for beta clients (OpenAI-Beta: realtime=v1), build a discarded Pydantic object per frame for logging, and re-serialize even frames that need no translation. For high-frequency response.output_audio.delta frames carrying multi-KB base64 payloads, that serialized CPU work on the hottest relay path drove the latency regression between v1.83.14 and v1.88.1 for gpt-realtime-1.5 and gpt-realtime-2. This parses each frame once via _parse_backend_event and threads the dict into _handle_raw_backend_message, store_message, and _translate_event_to_beta; short-circuits store_message before the Pydantic build for events not in the logged set; returns the original event unchanged from _translate_event_to_beta when no rename applies so the raw frame is forwarded without re-serialization; and only json.dumps when the type is actually renamed. * fix(realtime): widen store_message type hint to accept plain dict The parse-once refactor passes the dict produced by _parse_backend_event into store_message, but the parameter was typed as str | bytes | OpenAIRealtimeEvents (a union of TypedDicts), which mypy does not consider compatible with a plain dict. Add dict to the accepted union; the body already handles it. --------- Co-authored-by: Miguel Armenta --- .../litellm_core_utils/realtime_streaming.py | 180 +++++++------- .../test_realtime_streaming.py | 220 +++++++++++++++++- 2 files changed, 297 insertions(+), 103 deletions(-) diff --git a/litellm/litellm_core_utils/realtime_streaming.py b/litellm/litellm_core_utils/realtime_streaming.py index 772f058d9b..4b7f0e2219 100644 --- a/litellm/litellm_core_utils/realtime_streaming.py +++ b/litellm/litellm_core_utils/realtime_streaming.py @@ -144,7 +144,7 @@ class RealTimeStreaming: return True return False - def store_message(self, message: Union[str, bytes, OpenAIRealtimeEvents]): + def store_message(self, message: Union[str, bytes, dict, OpenAIRealtimeEvents]): """Store message in list""" if isinstance(message, bytes): message = message.decode("utf-8") @@ -154,22 +154,20 @@ class RealTimeStreaming: else: message_obj = cast(Dict[str, Any], json.loads(cast(str, message))) self._collect_tool_calls_from_response_done(cast(dict, message_obj)) + if not self._should_store_message(message_obj): + return try: event_type = message_obj.get("type", "") if event_type in self._SESSION_EVENT_TYPES: - typed_obj = OpenAIRealtimeStreamSessionEvents(**message_obj) # type: ignore + typed_obj: OpenAIRealtimeEvents = OpenAIRealtimeStreamSessionEvents(**message_obj) # type: ignore else: - # Use the base object as a safe catch-all for all other event types - # (both beta and GA), so unknown/new event names never raise here. + # Catch-all base object so unknown/new event names never raise. typed_obj = OpenAIRealtimeStreamResponseBaseObject(**message_obj) # type: ignore except Exception as e: verbose_logger.debug(f"Error parsing message for logging: {e}") - # Don't re-raise — a parse failure must not drop or delay the message - if self._should_store_message(message_obj): - self.messages.append(message_obj) # type: ignore[arg-type] + self.messages.append(message_obj) # type: ignore[arg-type] return - if self._should_store_message(typed_obj): - self.messages.append(typed_obj) + self.messages.append(typed_obj) def _collect_user_input_from_client_event(self, message: Union[str, dict]) -> None: """Extract user text content from client WebSocket events for spend logging.""" @@ -358,8 +356,7 @@ class RealTimeStreaming: for msg in self._pending_messages_until_setup ) verbose_logger.debug( - "Failed to flush buffered client message after setup: %s " - "(%d buffered message(s) retained)", + "Failed to flush buffered client message after setup: %s (%d buffered message(s) retained)", e, len(unsent), ) @@ -376,8 +373,7 @@ class RealTimeStreaming: return True except Exception as e: verbose_logger.warning( - "Failed to translate %s to beta protocol, forwarding " - "untranslated event to client: %s", + "Failed to translate %s to beta protocol, forwarding untranslated event to client: %s", event.get("type"), e, ) @@ -705,48 +701,48 @@ class RealTimeStreaming: self.store_message(event_str) await self._send_event_to_client(event, event_str) - async def _handle_raw_backend_message(self, raw_response) -> bool: + @staticmethod + def _parse_backend_event(raw_response: str) -> Optional[dict]: + """Parse a backend frame once. Returns None for non-JSON or non-object frames.""" + try: + event = json.loads(raw_response) + except (json.JSONDecodeError, TypeError): + return None + return event if isinstance(event, dict) else None + + async def _handle_raw_backend_message( + self, event_obj: dict, raw_response: str + ) -> bool: """Process a backend message without provider_config (raw path). Returns True if the caller should skip the default store+forward (i.e. continue the loop). """ - try: - event_obj = json.loads(raw_response) + event_type = event_obj.get("type") - # For audio/VAD guardrail path: once the session is ready, tell the backend - # not to auto-respond after VAD detects end-of-speech. We send the - # session.created to the client FIRST so the client is always in sync, then - # inject the session.update so a potential error from the backend doesn't - # arrive before the client sees session.created. - if ( - event_obj.get("type") == "session.created" - and self._has_audio_transcription_guardrails() - ): - self.store_message(raw_response) - await self.websocket.send_text(raw_response) - await self._send_to_backend(self._make_disable_auto_response_message()) - return True + # Send session.created to the client FIRST so it stays in sync, then inject + # the disable-auto-response session.update; otherwise a backend error could + # reach the client before it sees session.created. + if ( + event_type == "session.created" + and self._has_audio_transcription_guardrails() + ): + self.store_message(event_obj) + await self.websocket.send_text(raw_response) + await self._send_to_backend(self._make_disable_auto_response_message()) + return True - if ( - event_obj.get("type") - == "conversation.item.input_audio_transcription.completed" - ): - transcript = event_obj.get("transcript", "") - self._collect_user_input_from_backend_event(event_obj) - ## LOGGING — must happen before continue below - self.store_message(raw_response) - # Forward transcript to client so user sees what they said - await self.websocket.send_text(raw_response) - blocked = await self.run_realtime_guardrails( - transcript, - item_id=event_obj.get("item_id"), - ) - if not blocked: - # Clean — trigger LLM response - await self._send_to_backend(json.dumps({"type": "response.create"})) - return True - except (json.JSONDecodeError, AttributeError): - pass + if event_type == "conversation.item.input_audio_transcription.completed": + transcript = event_obj.get("transcript", "") + self._collect_user_input_from_backend_event(event_obj) + self.store_message(event_obj) + await self.websocket.send_text(raw_response) + blocked = await self.run_realtime_guardrails( + transcript, + item_id=event_obj.get("item_id"), + ) + if not blocked: + await self._send_to_backend(json.dumps({"type": "response.create"})) + return True return False async def backend_to_client_send_messages(self): @@ -779,25 +775,25 @@ class RealTimeStreaming: ) continue else: - handled = await self._handle_raw_backend_message(raw_response) - if handled: - continue - ## LOGGING - self.store_message(raw_response) - - # If the client opted into beta protocol, translate GA event - # names/shapes back to the beta equivalents before forwarding. - if self._client_wants_beta: - try: - event_dict = json.loads(raw_response) - translated = self._translate_event_to_beta(event_dict) - if translated is None: - continue # drop GA-only events (e.g. conversation.item.done) - await self.websocket.send_text(json.dumps(translated)) - except Exception: - await self.websocket.send_text(raw_response) - else: + event = self._parse_backend_event(raw_response) + if event is None: await self.websocket.send_text(raw_response) + continue + + if await self._handle_raw_backend_message(event, raw_response): + continue + self.store_message(event) + + if not self._client_wants_beta: + await self.websocket.send_text(raw_response) + continue + + translated = self._translate_event_to_beta(event) + if translated is None: + continue + await self.websocket.send_text( + raw_response if translated is event else json.dumps(translated) + ) except websockets.exceptions.ConnectionClosed as e: # type: ignore verbose_logger.exception( @@ -927,41 +923,43 @@ class RealTimeStreaming: def _translate_event_to_beta(event: dict) -> Optional[dict]: """Translate a single GA event dict to its beta equivalent. - Returns None if the event should be dropped entirely (e.g. the GA-only - conversation.item.done has no beta counterpart). - Returns the (possibly mutated copy of the) event otherwise. + Returns None when the event must be dropped (the GA-only + conversation.item.done has no beta counterpart). Returns the original + event object unchanged when no translation applies, so the caller can + forward the raw frame without re-serializing; otherwise returns a + translated copy. """ event_type = event.get("type", "") - # conversation.item.done has no beta equivalent — the client already - # received conversation.item.created (translated from .added). if event_type == "conversation.item.done": return None - # Shallow-copy so we don't mutate the stored message + renamed_type = RealTimeStreaming._GA_TO_BETA_EVENT_TYPES.get(event_type) + has_item = isinstance(event.get("item"), dict) + response = event.get("response") + has_response_output = isinstance(response, dict) and isinstance( + response.get("output"), list + ) + if renamed_type is None and not has_item and not has_response_output: + return event + translated = dict(event) - - # Rename the type field - if event_type in RealTimeStreaming._GA_TO_BETA_EVENT_TYPES: - translated["type"] = RealTimeStreaming._GA_TO_BETA_EVENT_TYPES[event_type] - - # Fix content block types inside items (response.done output list, - # conversation.item.created item content, etc.) - if "item" in translated and isinstance(translated["item"], dict): + if renamed_type is not None: + translated["type"] = renamed_type + if has_item: translated["item"] = RealTimeStreaming._translate_item_content_types( dict(translated["item"]) ) - if "response" in translated and isinstance(translated["response"], dict): + if has_response_output: resp = dict(translated["response"]) - if "output" in resp and isinstance(resp["output"], list): - resp["output"] = [ - ( - RealTimeStreaming._translate_item_content_types(dict(o)) - if isinstance(o, dict) - else o - ) - for o in resp["output"] - ] + resp["output"] = [ + ( + RealTimeStreaming._translate_item_content_types(dict(o)) + if isinstance(o, dict) + else o + ) + for o in resp["output"] + ] translated["response"] = resp return translated diff --git a/tests/test_litellm/litellm_core_utils/test_realtime_streaming.py b/tests/test_litellm/litellm_core_utils/test_realtime_streaming.py index 3424bfd801..0f8d5cfd85 100644 --- a/tests/test_litellm/litellm_core_utils/test_realtime_streaming.py +++ b/tests/test_litellm/litellm_core_utils/test_realtime_streaming.py @@ -773,17 +773,15 @@ async def test_realtime_guardrail_blocks_prompt_injection(): guardrail_items = [ e for e in sent_to_backend if e.get("type") == "conversation.item.create" ] - assert len(guardrail_items) == 1, ( - f"Guardrail should inject a conversation.item.create with violation message, " - f"got: {guardrail_items}" - ) + assert ( + len(guardrail_items) == 1 + ), f"Guardrail should inject a conversation.item.create with violation message, got: {guardrail_items}" response_creates = [ e for e in sent_to_backend if e.get("type") == "response.create" ] - assert len(response_creates) == 1, ( - f"Guardrail should send exactly one response.create to voice the violation, " - f"got: {response_creates}" - ) + assert ( + len(response_creates) == 1 + ), f"Guardrail should send exactly one response.create to voice the violation, got: {response_creates}" # ASSERT 2: error event was sent directly to the client WebSocket sent_to_client = [ @@ -1050,10 +1048,9 @@ async def test_realtime_function_call_output_guardrail_blocks_and_returns_error( # every toolCall with a toolResponse (Gemini/Vertex Live) exit their # pending-tool-call state instead of stalling. The placeholder must NOT # contain any of the blocked content. - assert len(forwarded_tool_outputs) == 1, ( - f"Sanitized function_call_output should be forwarded, got: " - f"{forwarded_tool_outputs}" - ) + assert ( + len(forwarded_tool_outputs) == 1 + ), f"Sanitized function_call_output should be forwarded, got: {forwarded_tool_outputs}" sanitized_item = forwarded_tool_outputs[0]["item"] assert sanitized_item["call_id"] == "call_123" assert "test@example.com" not in sanitized_item["output"] @@ -2110,3 +2107,202 @@ async def test_deferred_setup_caps_non_audio_buffered_bytes(monkeypatch): assert ( streaming._pending_messages_byte_total <= RealTimeStreaming._MAX_BUFFERED_BYTES ) + + +def _beta_client_ws(): + ws = MagicMock() + ws.scope = {"headers": [(b"openai-beta", b"realtime=v1")]} + ws.send_text = AsyncMock() + return ws + + +def _ga_client_ws(): + ws = MagicMock() + ws.scope = {"headers": []} + ws.send_text = AsyncMock() + return ws + + +def _streaming_with(client_ws): + backend_ws = MagicMock() + logging_obj = MagicMock() + logging_obj.async_success_handler = AsyncMock() + logging_obj.success_handler = MagicMock() + return RealTimeStreaming(client_ws, backend_ws, logging_obj) + + +def test_parse_backend_event_returns_none_for_non_json(): + assert RealTimeStreaming._parse_backend_event("not json") is None + + +def test_parse_backend_event_returns_none_for_non_dict_json(): + assert RealTimeStreaming._parse_backend_event("[1, 2, 3]") is None + assert RealTimeStreaming._parse_backend_event('"a string"') is None + + +def test_parse_backend_event_returns_dict(): + parsed = RealTimeStreaming._parse_backend_event('{"type": "x", "v": 1}') + assert parsed == {"type": "x", "v": 1} + + +def test_translate_event_to_beta_returns_identity_when_no_translation(): + """An event with no renamed type and no item/response is returned unchanged + (same object), so the caller can forward the raw frame without re-serializing.""" + ev = {"type": "error", "error": {"message": "boom"}} + out = RealTimeStreaming._translate_event_to_beta(ev) + assert out is ev + + +def test_translate_event_to_beta_preserves_audio_delta_payload(): + payload = "QUJDREVG" * 200 + out = RealTimeStreaming._translate_event_to_beta( + {"type": "response.output_audio.delta", "delta": payload, "event_id": "e1"} + ) + assert out is not None + assert out["type"] == "response.audio.delta" + assert out["delta"] == payload + + +def test_translate_event_to_beta_remaps_response_done_output_content_types(): + out = RealTimeStreaming._translate_event_to_beta( + { + "type": "response.done", + "response": { + "output": [ + { + "type": "message", + "content": [{"type": "output_audio", "transcript": "hi"}], + } + ] + }, + } + ) + assert out is not None + assert out["response"]["output"][0]["content"][0]["type"] == "audio" + + +@pytest.mark.asyncio +async def test_beta_client_receives_translated_audio_delta(): + client_ws = _beta_client_ws() + frame = json.dumps( + {"type": "response.output_audio.delta", "delta": "QUJD", "event_id": "e1"} + ) + backend_ws = MagicMock() + backend_ws.recv = AsyncMock( + side_effect=[frame.encode(), ConnectionClosed(None, None)] + ) + logging_obj = MagicMock() + logging_obj.async_success_handler = AsyncMock() + logging_obj.success_handler = MagicMock() + streaming = RealTimeStreaming(client_ws, backend_ws, logging_obj) + + await streaming.backend_to_client_send_messages() + + assert client_ws.send_text.await_count == 1 + sent = json.loads(client_ws.send_text.await_args.args[0]) + assert sent["type"] == "response.audio.delta" + assert sent["delta"] == "QUJD" + + +@pytest.mark.asyncio +async def test_ga_client_receives_raw_passthrough(): + client_ws = _ga_client_ws() + frame = json.dumps( + {"type": "response.output_audio.delta", "delta": "QUJD", "event_id": "e1"} + ) + backend_ws = MagicMock() + backend_ws.recv = AsyncMock( + side_effect=[frame.encode(), ConnectionClosed(None, None)] + ) + logging_obj = MagicMock() + logging_obj.async_success_handler = AsyncMock() + logging_obj.success_handler = MagicMock() + streaming = RealTimeStreaming(client_ws, backend_ws, logging_obj) + + await streaming.backend_to_client_send_messages() + + assert client_ws.send_text.await_count == 1 + # GA client gets the byte-identical frame, no re-serialization. + assert client_ws.send_text.await_args.args[0] == frame + + +@pytest.mark.asyncio +async def test_beta_client_non_translated_event_forwarded_raw(): + """For a beta client, an event needing no translation is forwarded as the + original raw frame (identity return path), not a re-serialized copy.""" + client_ws = _beta_client_ws() + frame = json.dumps({"type": "error", "error": {"message": "boom"}}) + backend_ws = MagicMock() + backend_ws.recv = AsyncMock( + side_effect=[frame.encode(), ConnectionClosed(None, None)] + ) + logging_obj = MagicMock() + logging_obj.async_success_handler = AsyncMock() + logging_obj.success_handler = MagicMock() + streaming = RealTimeStreaming(client_ws, backend_ws, logging_obj) + + await streaming.backend_to_client_send_messages() + + assert client_ws.send_text.await_count == 1 + assert client_ws.send_text.await_args.args[0] == frame + + +@pytest.mark.asyncio +async def test_beta_client_drops_conversation_item_done(): + client_ws = _beta_client_ws() + frame = json.dumps({"type": "conversation.item.done", "item": {"id": "i1"}}) + backend_ws = MagicMock() + backend_ws.recv = AsyncMock( + side_effect=[frame.encode(), ConnectionClosed(None, None)] + ) + logging_obj = MagicMock() + logging_obj.async_success_handler = AsyncMock() + logging_obj.success_handler = MagicMock() + streaming = RealTimeStreaming(client_ws, backend_ws, logging_obj) + + await streaming.backend_to_client_send_messages() + + assert client_ws.send_text.await_count == 0 + + +def test_store_message_skips_pydantic_for_unlogged_audio_delta(): + """Audio deltas are not in DefaultLoggedRealTimeEventTypes; store_message must + skip the Pydantic build entirely (no append, no validation).""" + streaming = _streaming_with(_ga_client_ws()) + with patch( + "litellm.litellm_core_utils.realtime_streaming.OpenAIRealtimeStreamResponseBaseObject" + ) as base_obj: + streaming.store_message({"type": "response.output_audio.delta", "delta": "x"}) + base_obj.assert_not_called() + assert streaming.messages == [] + + +@pytest.mark.asyncio +async def test_audio_delta_frame_parsed_at_most_once(): + client_ws = _beta_client_ws() + frame = json.dumps( + {"type": "response.output_audio.delta", "delta": "QUJD", "event_id": "e1"} + ) + backend_ws = MagicMock() + backend_ws.recv = AsyncMock( + side_effect=[frame.encode(), ConnectionClosed(None, None)] + ) + logging_obj = MagicMock() + logging_obj.async_success_handler = AsyncMock() + logging_obj.success_handler = MagicMock() + streaming = RealTimeStreaming(client_ws, backend_ws, logging_obj) + + real_loads = json.loads + calls = {"n": 0} + + def counting_loads(*args, **kwargs): + calls["n"] += 1 + return real_loads(*args, **kwargs) + + with patch( + "litellm.litellm_core_utils.realtime_streaming.json.loads", + side_effect=counting_loads, + ): + await streaming.backend_to_client_send_messages() + + assert calls["n"] == 1