feat: add network_mock transport for benchmarking proxy overhead without real API calls

Intercepts at httpx transport layer so the full proxy path (auth, routing,
OpenAI SDK, response transformation) is exercised with zero-latency responses.
Activated via `litellm_settings: { network_mock: true }` in proxy config.
This commit is contained in:
Ryan Crabbe 2026-02-18 12:19:18 -08:00
parent 356eb5a413
commit 94b76ea9ad
5 changed files with 475 additions and 0 deletions

View File

@ -405,6 +405,7 @@ disable_aiohttp_trust_env: bool = (
force_ipv4: bool = (
False # when True, litellm will force ipv4 for all LLM requests. Some users have seen httpx ConnectionError when using ipv6.
)
network_mock: bool = False # When True, use mock transport — no real network calls
####### STOP SEQUENCE LIMIT #######
disable_stop_sequence_limit: bool = False # when True, stop sequence limit is disabled

View File

@ -0,0 +1,163 @@
"""
Mock httpx transport that returns valid OpenAI ChatCompletion responses.
Activated via `litellm_settings: { network_mock: true }`.
Intercepts at the httpx transport layer the lowest point before bytes hit the wire
so the full proxy -> router -> OpenAI SDK -> httpx path is exercised.
"""
import json
import time
from typing import Iterator, List
import httpx
# ---------------------------------------------------------------------------
# Pre-built response templates
# ---------------------------------------------------------------------------
def _chat_completion_json(model: str) -> dict:
"""Return a minimal valid ChatCompletion object."""
return {
"id": "chatcmpl-mock",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Mock response",
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 1,
"completion_tokens": 1,
"total_tokens": 2,
},
}
def _streaming_sse_payloads(model: str) -> List[bytes]:
"""Pre-build the SSE byte payloads for a streaming response."""
chunk = {
"id": "chatcmpl-mock",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": "Mock response"},
"finish_reason": None,
}
],
}
done_chunk = {
"id": "chatcmpl-mock",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
}
return [
b"data: " + json.dumps(chunk).encode() + b"\n\n",
b"data: " + json.dumps(done_chunk).encode() + b"\n\n",
b"data: [DONE]\n\n",
]
# ---------------------------------------------------------------------------
# Byte-stream wrappers
# ---------------------------------------------------------------------------
class MockSSEAsyncStream(httpx.AsyncByteStream):
"""Async byte stream that yields pre-built SSE payloads."""
def __init__(self, payloads: List[bytes]) -> None:
self._payloads = payloads
async def __aiter__(self): # type: ignore[override]
for payload in self._payloads:
yield payload
class MockSSESyncStream(httpx.SyncByteStream):
"""Sync byte stream that yields pre-built SSE payloads."""
def __init__(self, payloads: List[bytes]) -> None:
self._payloads = payloads
def __iter__(self) -> Iterator[bytes]:
return iter(self._payloads)
# ---------------------------------------------------------------------------
# Transport
# ---------------------------------------------------------------------------
_STREAM_HEADERS = {
"content-type": "text/event-stream",
}
_JSON_HEADERS = {
"content-type": "application/json",
}
class MockOpenAITransport(httpx.AsyncBaseTransport, httpx.BaseTransport):
"""
httpx transport that returns canned OpenAI ChatCompletion responses.
Supports both async (AsyncOpenAI) and sync (OpenAI) SDK paths.
"""
@staticmethod
def _parse_request(request: httpx.Request) -> tuple:
"""Extract (model, stream) from the request body."""
body = json.loads(request.content)
model = body.get("model", "mock-model")
stream = body.get("stream", False)
return model, stream
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
model, stream = self._parse_request(request)
if stream:
payloads = _streaming_sse_payloads(model)
return httpx.Response(
status_code=200,
headers=_STREAM_HEADERS,
stream=MockSSEAsyncStream(payloads),
)
body = json.dumps(_chat_completion_json(model)).encode()
return httpx.Response(
status_code=200,
headers=_JSON_HEADERS,
content=body,
)
def handle_request(self, request: httpx.Request) -> httpx.Response:
model, stream = self._parse_request(request)
if stream:
payloads = _streaming_sse_payloads(model)
return httpx.Response(
status_code=200,
headers=_STREAM_HEADERS,
stream=MockSSESyncStream(payloads),
)
body = json.dumps(_chat_completion_json(model)).encode()
return httpx.Response(
status_code=200,
headers=_JSON_HEADERS,
content=body,
)

View File

@ -205,6 +205,11 @@ class BaseOpenAILLM:
if litellm.aclient_session is not None:
return litellm.aclient_session
if getattr(litellm, "network_mock", False):
from litellm.llms.custom_httpx.mock_transport import MockOpenAITransport
return httpx.AsyncClient(transport=MockOpenAITransport())
# Get unified SSL configuration
ssl_config = get_ssl_configuration()
@ -225,6 +230,11 @@ class BaseOpenAILLM:
if litellm.client_session is not None:
return litellm.client_session
if getattr(litellm, "network_mock", False):
from litellm.llms.custom_httpx.mock_transport import MockOpenAITransport
return httpx.Client(transport=MockOpenAITransport())
# Get unified SSL configuration
ssl_config = get_ssl_configuration()

142
scripts/benchmark_mock.py Normal file
View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""Quick benchmark for network_mock proxy overhead measurement."""
import argparse
import asyncio
import time
import statistics
import aiohttp
REQUEST_BODY = {
"model": "db-openai-endpoint",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 100,
"user": "new_user",
}
async def send_request(session, url, semaphore):
async with semaphore:
start = time.perf_counter()
try:
async with session.post(url, json=REQUEST_BODY) as resp:
await resp.read()
elapsed = time.perf_counter() - start
return elapsed if resp.status == 200 else None
except Exception:
return None
async def run_benchmark(url, n_requests, max_concurrent):
semaphore = asyncio.Semaphore(max_concurrent)
connector_limit = min(max_concurrent * 2, 200)
connector = aiohttp.TCPConnector(
limit=connector_limit,
limit_per_host=max_concurrent,
force_close=False,
enable_cleanup_closed=True,
)
async with aiohttp.ClientSession(connector=connector) as session:
# warmup
await asyncio.gather(*[send_request(session, url, semaphore) for _ in range(min(50, n_requests))])
# timed run
wall_start = time.perf_counter()
results = await asyncio.gather(*[send_request(session, url, semaphore) for _ in range(n_requests)])
wall_elapsed = time.perf_counter() - wall_start
latencies = [r for r in results if r is not None]
failures = sum(1 for r in results if r is None)
latencies.sort()
n = len(latencies)
mean = statistics.mean(latencies) * 1000
p50 = latencies[n // 2] * 1000
p95 = latencies[int(n * 0.95)] * 1000
p99 = latencies[int(n * 0.99)] * 1000
throughput = n_requests / wall_elapsed
return {
"mean": mean, "p50": p50, "p95": p95, "p99": p99,
"throughput": throughput, "failures": failures,
"wall_time": wall_elapsed, "n_requests": n_requests,
"max_concurrent": max_concurrent, "latencies": latencies,
}
def print_run_results(run_num, total_runs, result):
label = f" Run {run_num}/{total_runs}" if total_runs > 1 else " Results"
print(f"\n{'='*60}")
print(label)
print(f"{'='*60}")
print(f" Requests: {result['n_requests']} (failures: {result['failures']})")
print(f" Concurrency: {result['max_concurrent']}")
print(f" Wall time: {result['wall_time']:.2f}s")
print(f" Throughput: {result['throughput']:.0f} req/s")
print(f" Mean: {result['mean']:.2f} ms")
print(f" P50: {result['p50']:.2f} ms")
print(f" P95: {result['p95']:.2f} ms")
print(f" P99: {result['p99']:.2f} ms")
def print_aggregate(results):
all_latencies = []
for r in results:
all_latencies.extend(r["latencies"])
all_latencies.sort()
total_failures = sum(r["failures"] for r in results)
total_requests = sum(r["n_requests"] for r in results)
n = len(all_latencies)
mean = statistics.mean(all_latencies) * 1000
p50 = all_latencies[n // 2] * 1000
p95 = all_latencies[int(n * 0.95)] * 1000
p99 = all_latencies[int(n * 0.99)] * 1000
avg_throughput = statistics.mean(r["throughput"] for r in results)
print(f"\n{'='*60}")
print(f" Aggregate ({len(results)} runs, {total_requests} total requests)")
print(f"{'='*60}")
print(f" Failures: {total_failures}")
print(f" Throughput: {avg_throughput:.0f} req/s (avg across runs)")
print(f" Mean: {mean:.2f} ms")
print(f" P50: {p50:.2f} ms")
print(f" P95: {p95:.2f} ms")
print(f" P99: {p99:.2f} ms")
# Run-to-run variance
run_means = [r["mean"] for r in results]
run_throughputs = [r["throughput"] for r in results]
if len(run_means) > 1:
cov_latency = statistics.stdev(run_means) / statistics.mean(run_means) * 100
cov_throughput = statistics.stdev(run_throughputs) / statistics.mean(run_throughputs) * 100
print(f"\n Run-to-run variance:")
print(f" Latency CoV: {cov_latency:.1f}%")
print(f" Throughput CoV: {cov_throughput:.1f}%")
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--url", default="http://localhost:4000/chat/completions")
parser.add_argument("--requests", type=int, default=2000)
parser.add_argument("--max-concurrent", type=int, default=200)
parser.add_argument("--runs", type=int, default=1)
args = parser.parse_args()
print(f"Benchmarking {args.url}")
print(f" {args.requests} requests, {args.max_concurrent} concurrency, {args.runs} run(s)")
results = []
for run_num in range(1, args.runs + 1):
result = await run_benchmark(args.url, args.requests, args.max_concurrent)
results.append(result)
print_run_results(run_num, args.runs, result)
if args.runs > 1:
print_aggregate(results)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,159 @@
"""
Tests for MockOpenAITransport verifies that the mock transport produces
responses parseable by the OpenAI SDK for both streaming and non-streaming paths.
"""
import json
import httpx
import pytest
from litellm.llms.custom_httpx.mock_transport import MockOpenAITransport
# ---------------------------------------------------------------------------
# Non-streaming
# ---------------------------------------------------------------------------
class TestNonStreaming:
def test_sync_returns_valid_chat_completion(self):
transport = MockOpenAITransport()
request = httpx.Request(
method="POST",
url="https://api.openai.com/v1/chat/completions",
content=json.dumps({"model": "gpt-4o", "messages": [{"role": "user", "content": "hi"}]}),
)
response = transport.handle_request(request)
assert response.status_code == 200
body = json.loads(response.content)
assert body["object"] == "chat.completion"
assert body["model"] == "gpt-4o"
assert body["choices"][0]["message"]["role"] == "assistant"
assert body["choices"][0]["finish_reason"] == "stop"
assert "usage" in body
@pytest.mark.asyncio
async def test_async_returns_valid_chat_completion(self):
transport = MockOpenAITransport()
request = httpx.Request(
method="POST",
url="https://api.openai.com/v1/chat/completions",
content=json.dumps({"model": "gpt-4o-mini", "messages": [{"role": "user", "content": "hi"}]}),
)
response = await transport.handle_async_request(request)
assert response.status_code == 200
body = json.loads(response.content)
assert body["object"] == "chat.completion"
assert body["model"] == "gpt-4o-mini"
def test_model_echoed_from_request(self):
transport = MockOpenAITransport()
request = httpx.Request(
method="POST",
url="https://api.openai.com/v1/chat/completions",
content=json.dumps({"model": "my-custom-model", "messages": []}),
)
response = transport.handle_request(request)
body = json.loads(response.content)
assert body["model"] == "my-custom-model"
# ---------------------------------------------------------------------------
# Streaming
# ---------------------------------------------------------------------------
class TestStreaming:
def test_sync_streaming_returns_sse_events(self):
transport = MockOpenAITransport()
request = httpx.Request(
method="POST",
url="https://api.openai.com/v1/chat/completions",
content=json.dumps({"model": "gpt-4o", "stream": True, "messages": []}),
)
response = transport.handle_request(request)
assert response.status_code == 200
assert "text/event-stream" in response.headers["content-type"]
chunks = list(response.stream)
# Should have: content chunk, finish chunk, [DONE]
assert len(chunks) == 3
assert chunks[-1] == b"data: [DONE]\n\n"
# Parse the first chunk
first_line = chunks[0].decode()
assert first_line.startswith("data: ")
data = json.loads(first_line[len("data: "):].strip())
assert data["object"] == "chat.completion.chunk"
assert data["model"] == "gpt-4o"
assert data["choices"][0]["delta"]["content"] == "Mock response"
@pytest.mark.asyncio
async def test_async_streaming_returns_sse_events(self):
transport = MockOpenAITransport()
request = httpx.Request(
method="POST",
url="https://api.openai.com/v1/chat/completions",
content=json.dumps({"model": "gpt-4o", "stream": True, "messages": []}),
)
response = await transport.handle_async_request(request)
assert response.status_code == 200
chunks = []
async for chunk in response.stream:
chunks.append(chunk)
assert len(chunks) == 3
assert chunks[-1] == b"data: [DONE]\n\n"
# Parse finish chunk
finish_line = chunks[1].decode()
data = json.loads(finish_line[len("data: "):].strip())
assert data["choices"][0]["finish_reason"] == "stop"
def test_streaming_model_echoed(self):
transport = MockOpenAITransport()
request = httpx.Request(
method="POST",
url="https://api.openai.com/v1/chat/completions",
content=json.dumps({"model": "custom-stream", "stream": True, "messages": []}),
)
response = transport.handle_request(request)
first_chunk = next(iter(response.stream))
data = json.loads(first_chunk.decode()[len("data: "):].strip())
assert data["model"] == "custom-stream"
# ---------------------------------------------------------------------------
# Integration with httpx client
# ---------------------------------------------------------------------------
class TestHttpxClientIntegration:
def test_sync_client_get(self):
"""Verify the transport works when wired into an httpx.Client."""
client = httpx.Client(transport=MockOpenAITransport())
response = client.post(
"https://api.openai.com/v1/chat/completions",
json={"model": "gpt-4o", "messages": [{"role": "user", "content": "test"}]},
)
assert response.status_code == 200
body = response.json()
assert body["object"] == "chat.completion"
client.close()
@pytest.mark.asyncio
async def test_async_client_get(self):
"""Verify the transport works when wired into an httpx.AsyncClient."""
client = httpx.AsyncClient(transport=MockOpenAITransport())
response = await client.post(
"https://api.openai.com/v1/chat/completions",
json={"model": "gpt-4o", "messages": [{"role": "user", "content": "test"}]},
)
assert response.status_code == 200
body = response.json()
assert body["object"] == "chat.completion"
await client.aclose()