diff --git a/litellm/__init__.py b/litellm/__init__.py index 97f36a9b00..8537ccfd6e 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -405,6 +405,7 @@ disable_aiohttp_trust_env: bool = ( force_ipv4: bool = ( False # when True, litellm will force ipv4 for all LLM requests. Some users have seen httpx ConnectionError when using ipv6. ) +network_mock: bool = False # When True, use mock transport — no real network calls ####### STOP SEQUENCE LIMIT ####### disable_stop_sequence_limit: bool = False # when True, stop sequence limit is disabled diff --git a/litellm/llms/custom_httpx/mock_transport.py b/litellm/llms/custom_httpx/mock_transport.py new file mode 100644 index 0000000000..6a8ece73d8 --- /dev/null +++ b/litellm/llms/custom_httpx/mock_transport.py @@ -0,0 +1,163 @@ +""" +Mock httpx transport that returns valid OpenAI ChatCompletion responses. + +Activated via `litellm_settings: { network_mock: true }`. +Intercepts at the httpx transport layer — the lowest point before bytes hit the wire — +so the full proxy -> router -> OpenAI SDK -> httpx path is exercised. +""" + +import json +import time +from typing import Iterator, List + +import httpx + + +# --------------------------------------------------------------------------- +# Pre-built response templates +# --------------------------------------------------------------------------- + +def _chat_completion_json(model: str) -> dict: + """Return a minimal valid ChatCompletion object.""" + return { + "id": "chatcmpl-mock", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "Mock response", + }, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": 1, + "completion_tokens": 1, + "total_tokens": 2, + }, + } + + +def _streaming_sse_payloads(model: str) -> List[bytes]: + """Pre-build the SSE byte payloads for a streaming response.""" + chunk = { + "id": "chatcmpl-mock", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": "Mock response"}, + "finish_reason": None, + } + ], + } + done_chunk = { + "id": "chatcmpl-mock", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "delta": {}, + "finish_reason": "stop", + } + ], + } + return [ + b"data: " + json.dumps(chunk).encode() + b"\n\n", + b"data: " + json.dumps(done_chunk).encode() + b"\n\n", + b"data: [DONE]\n\n", + ] + + +# --------------------------------------------------------------------------- +# Byte-stream wrappers +# --------------------------------------------------------------------------- + +class MockSSEAsyncStream(httpx.AsyncByteStream): + """Async byte stream that yields pre-built SSE payloads.""" + + def __init__(self, payloads: List[bytes]) -> None: + self._payloads = payloads + + async def __aiter__(self): # type: ignore[override] + for payload in self._payloads: + yield payload + + +class MockSSESyncStream(httpx.SyncByteStream): + """Sync byte stream that yields pre-built SSE payloads.""" + + def __init__(self, payloads: List[bytes]) -> None: + self._payloads = payloads + + def __iter__(self) -> Iterator[bytes]: + return iter(self._payloads) + + +# --------------------------------------------------------------------------- +# Transport +# --------------------------------------------------------------------------- + +_STREAM_HEADERS = { + "content-type": "text/event-stream", +} + +_JSON_HEADERS = { + "content-type": "application/json", +} + + +class MockOpenAITransport(httpx.AsyncBaseTransport, httpx.BaseTransport): + """ + httpx transport that returns canned OpenAI ChatCompletion responses. + + Supports both async (AsyncOpenAI) and sync (OpenAI) SDK paths. + """ + + @staticmethod + def _parse_request(request: httpx.Request) -> tuple: + """Extract (model, stream) from the request body.""" + body = json.loads(request.content) + model = body.get("model", "mock-model") + stream = body.get("stream", False) + return model, stream + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + model, stream = self._parse_request(request) + if stream: + payloads = _streaming_sse_payloads(model) + return httpx.Response( + status_code=200, + headers=_STREAM_HEADERS, + stream=MockSSEAsyncStream(payloads), + ) + body = json.dumps(_chat_completion_json(model)).encode() + return httpx.Response( + status_code=200, + headers=_JSON_HEADERS, + content=body, + ) + + def handle_request(self, request: httpx.Request) -> httpx.Response: + model, stream = self._parse_request(request) + if stream: + payloads = _streaming_sse_payloads(model) + return httpx.Response( + status_code=200, + headers=_STREAM_HEADERS, + stream=MockSSESyncStream(payloads), + ) + body = json.dumps(_chat_completion_json(model)).encode() + return httpx.Response( + status_code=200, + headers=_JSON_HEADERS, + content=body, + ) diff --git a/litellm/llms/openai/common_utils.py b/litellm/llms/openai/common_utils.py index 28de9f1303..61f150f1c2 100644 --- a/litellm/llms/openai/common_utils.py +++ b/litellm/llms/openai/common_utils.py @@ -205,6 +205,11 @@ class BaseOpenAILLM: if litellm.aclient_session is not None: return litellm.aclient_session + if getattr(litellm, "network_mock", False): + from litellm.llms.custom_httpx.mock_transport import MockOpenAITransport + + return httpx.AsyncClient(transport=MockOpenAITransport()) + # Get unified SSL configuration ssl_config = get_ssl_configuration() @@ -225,6 +230,11 @@ class BaseOpenAILLM: if litellm.client_session is not None: return litellm.client_session + if getattr(litellm, "network_mock", False): + from litellm.llms.custom_httpx.mock_transport import MockOpenAITransport + + return httpx.Client(transport=MockOpenAITransport()) + # Get unified SSL configuration ssl_config = get_ssl_configuration() diff --git a/scripts/benchmark_mock.py b/scripts/benchmark_mock.py new file mode 100644 index 0000000000..3d681c9660 --- /dev/null +++ b/scripts/benchmark_mock.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +"""Quick benchmark for network_mock proxy overhead measurement.""" + +import argparse +import asyncio +import time +import statistics + +import aiohttp + + +REQUEST_BODY = { + "model": "db-openai-endpoint", + "messages": [{"role": "user", "content": "hi"}], + "max_tokens": 100, + "user": "new_user", +} + + +async def send_request(session, url, semaphore): + async with semaphore: + start = time.perf_counter() + try: + async with session.post(url, json=REQUEST_BODY) as resp: + await resp.read() + elapsed = time.perf_counter() - start + return elapsed if resp.status == 200 else None + except Exception: + return None + + +async def run_benchmark(url, n_requests, max_concurrent): + semaphore = asyncio.Semaphore(max_concurrent) + connector_limit = min(max_concurrent * 2, 200) + connector = aiohttp.TCPConnector( + limit=connector_limit, + limit_per_host=max_concurrent, + force_close=False, + enable_cleanup_closed=True, + ) + async with aiohttp.ClientSession(connector=connector) as session: + # warmup + await asyncio.gather(*[send_request(session, url, semaphore) for _ in range(min(50, n_requests))]) + + # timed run + wall_start = time.perf_counter() + results = await asyncio.gather(*[send_request(session, url, semaphore) for _ in range(n_requests)]) + wall_elapsed = time.perf_counter() - wall_start + + latencies = [r for r in results if r is not None] + failures = sum(1 for r in results if r is None) + + latencies.sort() + n = len(latencies) + mean = statistics.mean(latencies) * 1000 + p50 = latencies[n // 2] * 1000 + p95 = latencies[int(n * 0.95)] * 1000 + p99 = latencies[int(n * 0.99)] * 1000 + throughput = n_requests / wall_elapsed + + return { + "mean": mean, "p50": p50, "p95": p95, "p99": p99, + "throughput": throughput, "failures": failures, + "wall_time": wall_elapsed, "n_requests": n_requests, + "max_concurrent": max_concurrent, "latencies": latencies, + } + + +def print_run_results(run_num, total_runs, result): + label = f" Run {run_num}/{total_runs}" if total_runs > 1 else " Results" + print(f"\n{'='*60}") + print(label) + print(f"{'='*60}") + print(f" Requests: {result['n_requests']} (failures: {result['failures']})") + print(f" Concurrency: {result['max_concurrent']}") + print(f" Wall time: {result['wall_time']:.2f}s") + print(f" Throughput: {result['throughput']:.0f} req/s") + print(f" Mean: {result['mean']:.2f} ms") + print(f" P50: {result['p50']:.2f} ms") + print(f" P95: {result['p95']:.2f} ms") + print(f" P99: {result['p99']:.2f} ms") + + +def print_aggregate(results): + all_latencies = [] + for r in results: + all_latencies.extend(r["latencies"]) + all_latencies.sort() + + total_failures = sum(r["failures"] for r in results) + total_requests = sum(r["n_requests"] for r in results) + n = len(all_latencies) + mean = statistics.mean(all_latencies) * 1000 + p50 = all_latencies[n // 2] * 1000 + p95 = all_latencies[int(n * 0.95)] * 1000 + p99 = all_latencies[int(n * 0.99)] * 1000 + avg_throughput = statistics.mean(r["throughput"] for r in results) + + print(f"\n{'='*60}") + print(f" Aggregate ({len(results)} runs, {total_requests} total requests)") + print(f"{'='*60}") + print(f" Failures: {total_failures}") + print(f" Throughput: {avg_throughput:.0f} req/s (avg across runs)") + print(f" Mean: {mean:.2f} ms") + print(f" P50: {p50:.2f} ms") + print(f" P95: {p95:.2f} ms") + print(f" P99: {p99:.2f} ms") + + # Run-to-run variance + run_means = [r["mean"] for r in results] + run_throughputs = [r["throughput"] for r in results] + if len(run_means) > 1: + cov_latency = statistics.stdev(run_means) / statistics.mean(run_means) * 100 + cov_throughput = statistics.stdev(run_throughputs) / statistics.mean(run_throughputs) * 100 + print(f"\n Run-to-run variance:") + print(f" Latency CoV: {cov_latency:.1f}%") + print(f" Throughput CoV: {cov_throughput:.1f}%") + + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--url", default="http://localhost:4000/chat/completions") + parser.add_argument("--requests", type=int, default=2000) + parser.add_argument("--max-concurrent", type=int, default=200) + parser.add_argument("--runs", type=int, default=1) + args = parser.parse_args() + + print(f"Benchmarking {args.url}") + print(f" {args.requests} requests, {args.max_concurrent} concurrency, {args.runs} run(s)") + + results = [] + for run_num in range(1, args.runs + 1): + result = await run_benchmark(args.url, args.requests, args.max_concurrent) + results.append(result) + print_run_results(run_num, args.runs, result) + + if args.runs > 1: + print_aggregate(results) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_litellm/llms/custom_httpx/test_mock_transport.py b/tests/test_litellm/llms/custom_httpx/test_mock_transport.py new file mode 100644 index 0000000000..d18f79ac97 --- /dev/null +++ b/tests/test_litellm/llms/custom_httpx/test_mock_transport.py @@ -0,0 +1,159 @@ +""" +Tests for MockOpenAITransport — verifies that the mock transport produces +responses parseable by the OpenAI SDK for both streaming and non-streaming paths. +""" + +import json + +import httpx +import pytest + +from litellm.llms.custom_httpx.mock_transport import MockOpenAITransport + + +# --------------------------------------------------------------------------- +# Non-streaming +# --------------------------------------------------------------------------- + + +class TestNonStreaming: + def test_sync_returns_valid_chat_completion(self): + transport = MockOpenAITransport() + request = httpx.Request( + method="POST", + url="https://api.openai.com/v1/chat/completions", + content=json.dumps({"model": "gpt-4o", "messages": [{"role": "user", "content": "hi"}]}), + ) + response = transport.handle_request(request) + assert response.status_code == 200 + + body = json.loads(response.content) + assert body["object"] == "chat.completion" + assert body["model"] == "gpt-4o" + assert body["choices"][0]["message"]["role"] == "assistant" + assert body["choices"][0]["finish_reason"] == "stop" + assert "usage" in body + + @pytest.mark.asyncio + async def test_async_returns_valid_chat_completion(self): + transport = MockOpenAITransport() + request = httpx.Request( + method="POST", + url="https://api.openai.com/v1/chat/completions", + content=json.dumps({"model": "gpt-4o-mini", "messages": [{"role": "user", "content": "hi"}]}), + ) + response = await transport.handle_async_request(request) + assert response.status_code == 200 + + body = json.loads(response.content) + assert body["object"] == "chat.completion" + assert body["model"] == "gpt-4o-mini" + + def test_model_echoed_from_request(self): + transport = MockOpenAITransport() + request = httpx.Request( + method="POST", + url="https://api.openai.com/v1/chat/completions", + content=json.dumps({"model": "my-custom-model", "messages": []}), + ) + response = transport.handle_request(request) + body = json.loads(response.content) + assert body["model"] == "my-custom-model" + + +# --------------------------------------------------------------------------- +# Streaming +# --------------------------------------------------------------------------- + + +class TestStreaming: + def test_sync_streaming_returns_sse_events(self): + transport = MockOpenAITransport() + request = httpx.Request( + method="POST", + url="https://api.openai.com/v1/chat/completions", + content=json.dumps({"model": "gpt-4o", "stream": True, "messages": []}), + ) + response = transport.handle_request(request) + assert response.status_code == 200 + assert "text/event-stream" in response.headers["content-type"] + + chunks = list(response.stream) + # Should have: content chunk, finish chunk, [DONE] + assert len(chunks) == 3 + assert chunks[-1] == b"data: [DONE]\n\n" + + # Parse the first chunk + first_line = chunks[0].decode() + assert first_line.startswith("data: ") + data = json.loads(first_line[len("data: "):].strip()) + assert data["object"] == "chat.completion.chunk" + assert data["model"] == "gpt-4o" + assert data["choices"][0]["delta"]["content"] == "Mock response" + + @pytest.mark.asyncio + async def test_async_streaming_returns_sse_events(self): + transport = MockOpenAITransport() + request = httpx.Request( + method="POST", + url="https://api.openai.com/v1/chat/completions", + content=json.dumps({"model": "gpt-4o", "stream": True, "messages": []}), + ) + response = await transport.handle_async_request(request) + assert response.status_code == 200 + + chunks = [] + async for chunk in response.stream: + chunks.append(chunk) + + assert len(chunks) == 3 + assert chunks[-1] == b"data: [DONE]\n\n" + + # Parse finish chunk + finish_line = chunks[1].decode() + data = json.loads(finish_line[len("data: "):].strip()) + assert data["choices"][0]["finish_reason"] == "stop" + + def test_streaming_model_echoed(self): + transport = MockOpenAITransport() + request = httpx.Request( + method="POST", + url="https://api.openai.com/v1/chat/completions", + content=json.dumps({"model": "custom-stream", "stream": True, "messages": []}), + ) + response = transport.handle_request(request) + first_chunk = next(iter(response.stream)) + data = json.loads(first_chunk.decode()[len("data: "):].strip()) + assert data["model"] == "custom-stream" + + +# --------------------------------------------------------------------------- +# Integration with httpx client +# --------------------------------------------------------------------------- + + +class TestHttpxClientIntegration: + def test_sync_client_get(self): + """Verify the transport works when wired into an httpx.Client.""" + client = httpx.Client(transport=MockOpenAITransport()) + response = client.post( + "https://api.openai.com/v1/chat/completions", + json={"model": "gpt-4o", "messages": [{"role": "user", "content": "test"}]}, + ) + assert response.status_code == 200 + body = response.json() + assert body["object"] == "chat.completion" + client.close() + + @pytest.mark.asyncio + async def test_async_client_get(self): + """Verify the transport works when wired into an httpx.AsyncClient.""" + client = httpx.AsyncClient(transport=MockOpenAITransport()) + response = await client.post( + "https://api.openai.com/v1/chat/completions", + json={"model": "gpt-4o", "messages": [{"role": "user", "content": "test"}]}, + ) + assert response.status_code == 200 + body = response.json() + assert body["object"] == "chat.completion" + await client.aclose()