Extend the record/replay proxy to chat, embeddings, moderations, rerank, and Anthropic (#29847)
* test(ci): extend record/replay proxy to chat, embeddings, moderations, rerank, anthropic The record/replay proxy that took the gpt-image-1 spend E2E off the live OpenAI path now fronts every provider, so the other real-provider E2Es stop paying for and depending on live calls each commit. It keys per upstream and selects a non-OpenAI provider by a /__recorder_upstream/<host>/ path prefix carried on the model's api_base, since some litellm handlers (cohere rerank) drop custom request headers. Wired into build_and_test (chat, embeddings, moderations, image), the otel job (cohere rerank), and the anthropic-messages job via a reusable start_openai_record_replay_proxy command. Dropped the time.time()/uuid prompt cache-busters in the build_and_test chat tests, whose config has the response cache off, so identical requests are recordable. The image spend test now asserts a repeat call still bills spend, failing loudly if the proxy response cache is ever turned on. Responses, the anthropic passthrough, bedrock, and fake-endpoint tests are left live: their lifecycles, api_base assertions, providers, or fake targets make a stateless body-keyed cache either break them or add nothing. * docs(ci): note the recorder command's OpenAI default upstream and prefix override Addresses a review note: the shared start_openai_record_replay_proxy command defaults the upstream to OpenAI, so a non-OpenAI model must carry the /__recorder_upstream/<host>/ prefix on its api_base. Document that in the command description so a future caller does not assume the default follows the provider.
This commit is contained in:
parent
38b28b96ff
commit
33c363d4d4
@ -111,6 +111,28 @@ commands:
|
||||
- wait_for_service:
|
||||
url: tcp://localhost:6379
|
||||
timeout: "60"
|
||||
start_openai_record_replay_proxy:
|
||||
description: "Start the record/replay proxy (tests/_openai_record_replay_proxy.py) on host port 8090 and wait until healthy. Models whose api_base points here replay recorded provider responses, so the E2E run neither pays for nor depends on the live provider. The default upstream is OpenAI; a non-OpenAI model must point its api_base at /__recorder_upstream/<host>/ so the recorder forwards there instead of defaulting to OpenAI. Run after uv deps are synced."
|
||||
steps:
|
||||
- run:
|
||||
name: Start record/replay proxy
|
||||
background: true
|
||||
command: |
|
||||
CASSETTE_REDIS_URL="$CASSETTE_REDIS_URL" \
|
||||
RECORDER_UPSTREAM_BASE_URL="https://api.openai.com" \
|
||||
uv run --no-sync python tests/_openai_record_replay_proxy.py --host 0.0.0.0 --port 8090
|
||||
- run:
|
||||
name: Wait for record/replay proxy
|
||||
command: |
|
||||
for i in $(seq 1 30); do
|
||||
if curl -sf http://localhost:8090/__recorder_health >/dev/null 2>&1; then
|
||||
echo "record/replay proxy is up"
|
||||
exit 0
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
echo "record/replay proxy did not become ready" >&2
|
||||
exit 1
|
||||
setup_litellm_enterprise_pip:
|
||||
steps:
|
||||
- run:
|
||||
@ -1625,25 +1647,7 @@ jobs:
|
||||
command: |
|
||||
zstd -d litellm-docker-database.tar.zst --stdout | docker load
|
||||
docker tag litellm-docker-database:ci my-app:latest
|
||||
- run:
|
||||
name: Start OpenAI image record/replay proxy
|
||||
background: true
|
||||
command: |
|
||||
CASSETTE_REDIS_URL="$CASSETTE_REDIS_URL" \
|
||||
RECORDER_UPSTREAM_BASE_URL="https://api.openai.com" \
|
||||
uv run --no-sync python tests/_openai_record_replay_proxy.py --host 0.0.0.0 --port 8090
|
||||
- run:
|
||||
name: Wait for record/replay proxy
|
||||
command: |
|
||||
for i in $(seq 1 30); do
|
||||
if curl -sf http://localhost:8090/__recorder_health >/dev/null 2>&1; then
|
||||
echo "record/replay proxy is up"
|
||||
exit 0
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
echo "record/replay proxy did not become ready" >&2
|
||||
exit 1
|
||||
- start_openai_record_replay_proxy
|
||||
- run:
|
||||
name: Run Docker container
|
||||
command: |
|
||||
@ -1674,7 +1678,7 @@ jobs:
|
||||
-e LANGFUSE_PROJECT2_PUBLIC=$LANGFUSE_PROJECT2_PUBLIC \
|
||||
-e LANGFUSE_PROJECT1_SECRET=$LANGFUSE_PROJECT1_SECRET \
|
||||
-e LANGFUSE_PROJECT2_SECRET=$LANGFUSE_PROJECT2_SECRET \
|
||||
-e IMAGE_GEN_RECORDER_BASE_URL=http://host.docker.internal:8090/v1 \
|
||||
-e RECORDER_OPENAI_BASE_URL=http://host.docker.internal:8090/v1 \
|
||||
--add-host host.docker.internal:host-gateway \
|
||||
--name my-app \
|
||||
-v $(pwd)/proxy_server_config.yaml:/app/config.yaml \
|
||||
@ -1812,6 +1816,7 @@ jobs:
|
||||
command: |
|
||||
zstd -d litellm-docker-database.tar.zst --stdout | docker load
|
||||
docker images | grep litellm-docker-database
|
||||
- start_openai_record_replay_proxy
|
||||
- run:
|
||||
name: Run Docker container
|
||||
# intentionally give bad redis credentials here
|
||||
@ -1835,6 +1840,7 @@ jobs:
|
||||
-e DD_SITE=$DD_SITE \
|
||||
-e AWS_REGION_NAME=$AWS_REGION_NAME \
|
||||
-e COHERE_API_KEY=$COHERE_API_KEY \
|
||||
-e RECORDER_COHERE_BASE_URL=http://host.docker.internal:8090/__recorder_upstream/api.cohere.com \
|
||||
-e GCS_FLUSH_INTERVAL="1" \
|
||||
--add-host host.docker.internal:host-gateway \
|
||||
--name my-app \
|
||||
@ -2400,6 +2406,7 @@ jobs:
|
||||
command: |
|
||||
zstd -d litellm-docker-database.tar.zst --stdout | docker load
|
||||
docker images | grep litellm-docker-database
|
||||
- start_openai_record_replay_proxy
|
||||
- run:
|
||||
name: Run Docker container with test config
|
||||
command: |
|
||||
@ -2408,6 +2415,7 @@ jobs:
|
||||
-e DATABASE_URL=postgresql://postgres:postgres@host.docker.internal:5432/circle_test \
|
||||
-e LITELLM_MASTER_KEY="sk-1234" \
|
||||
-e ANTHROPIC_API_KEY=$ANTHROPIC_API_KEY \
|
||||
-e RECORDER_ANTHROPIC_BASE_URL=http://host.docker.internal:8090/__recorder_upstream/api.anthropic.com \
|
||||
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
|
||||
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
|
||||
-e AWS_REGION_NAME="us-east-1" \
|
||||
|
||||
@ -19,6 +19,7 @@ model_list:
|
||||
litellm_params:
|
||||
model: cohere/rerank-english-v3.0
|
||||
api_key: os.environ/COHERE_API_KEY
|
||||
api_base: os.environ/RECORDER_COHERE_BASE_URL # In CI, routes through the record/replay proxy; unset elsewhere -> direct to Cohere
|
||||
- model_name: fake-azure-endpoint
|
||||
litellm_params:
|
||||
model: openai/429
|
||||
|
||||
@ -24,6 +24,7 @@ model_list:
|
||||
litellm_params:
|
||||
model: openai/gpt-4.1
|
||||
api_key: os.environ/OPENAI_API_KEY # The `os.environ/` prefix tells litellm to read this from the env. See https://docs.litellm.ai/docs/simple_proxy#load-api-keys-from-vault
|
||||
api_base: os.environ/RECORDER_OPENAI_BASE_URL # In CI, routes through the record/replay proxy; unset elsewhere -> direct to OpenAI
|
||||
rpm: 480
|
||||
timeout: 300
|
||||
stream_timeout: 60
|
||||
@ -35,6 +36,7 @@ model_list:
|
||||
litellm_params:
|
||||
model: openai/text-embedding-3-small
|
||||
api_key: os.environ/OPENAI_API_KEY
|
||||
api_base: os.environ/RECORDER_OPENAI_BASE_URL # In CI, routes through the record/replay proxy; unset elsewhere -> direct to OpenAI
|
||||
model_info:
|
||||
mode: embedding
|
||||
base_model: text-embedding-3-small
|
||||
@ -44,15 +46,20 @@ model_list:
|
||||
- model_name: openai-dall-e-3 # dall-e-3 deprecated 2026-05-12; underlying now gpt-image-1
|
||||
litellm_params:
|
||||
model: gpt-image-1
|
||||
# In CI, IMAGE_GEN_RECORDER_BASE_URL points this at the record/replay proxy
|
||||
# (tests/_openai_record_replay_proxy.py) so the image spend E2E doesn't depend
|
||||
# on OpenAI's uptime every commit. Unset elsewhere, so it resolves to None and
|
||||
# falls back to api.openai.com.
|
||||
# In CI, RECORDER_OPENAI_BASE_URL points OpenAI models at the record/replay
|
||||
# proxy (tests/_openai_record_replay_proxy.py) so the spend/cost E2Es don't
|
||||
# depend on OpenAI's uptime every commit. Unset elsewhere, so it resolves to
|
||||
# None and falls back to api.openai.com.
|
||||
- model_name: gpt-image-1
|
||||
litellm_params:
|
||||
model: openai/gpt-image-1
|
||||
api_key: os.environ/OPENAI_API_KEY
|
||||
api_base: os.environ/IMAGE_GEN_RECORDER_BASE_URL
|
||||
api_base: os.environ/RECORDER_OPENAI_BASE_URL
|
||||
- model_name: text-moderation-stable
|
||||
litellm_params:
|
||||
model: openai/omni-moderation-latest
|
||||
api_key: os.environ/OPENAI_API_KEY
|
||||
api_base: os.environ/RECORDER_OPENAI_BASE_URL
|
||||
- model_name: fake-openai-endpoint
|
||||
litellm_params:
|
||||
model: openai/gpt-5-mini
|
||||
|
||||
@ -1,17 +1,22 @@
|
||||
"""Record/replay reverse proxy for the dockerized image-gen spend E2E.
|
||||
"""Record/replay reverse proxy for the dockerized real-provider spend E2Es.
|
||||
|
||||
The spend-accuracy test ``tests/test_keys.py::
|
||||
test_key_info_spend_values_image_generation`` runs the litellm proxy in its own
|
||||
container and curls it over real HTTP, then asserts the proxy tracked a nonzero
|
||||
spend for a ``gpt-image-1`` call. That call wildcard-routes to ``openai/*`` on
|
||||
the real key, so every commit run hit api.openai.com for a paid image and was
|
||||
exposed to OpenAI outages (the 401 that started this).
|
||||
Several E2E tests run the litellm proxy in its own container and curl it over
|
||||
real HTTP, then assert on spend, cost, or rerank output. Those calls reach real
|
||||
provider APIs (OpenAI image gen and chat, Cohere rerank, Anthropic messages),
|
||||
so every commit run paid for them and was exposed to provider outages (the 401
|
||||
that started this).
|
||||
|
||||
This process sits between the proxy and api.openai.com. The proxy points only
|
||||
its image model's ``api_base`` here; nothing else about the topology changes.
|
||||
The first request (or the first after a recording lapses) is forwarded live to
|
||||
OpenAI and recorded; subsequent requests within the TTL replay the recorded
|
||||
response, so the per-commit run no longer depends on OpenAI being up.
|
||||
This process sits between the proxy and the provider. A model points its
|
||||
``api_base`` here; nothing else about the topology changes. The first request
|
||||
(or the first after a recording lapses) is forwarded live to the provider and
|
||||
recorded; subsequent identical requests within the TTL replay the recorded
|
||||
response, so the per-commit run no longer depends on the provider being up.
|
||||
|
||||
One recorder fronts every provider. The default upstream is api.openai.com; a
|
||||
non-OpenAI model points its ``api_base`` at ``/__recorder_upstream/<host>`` so
|
||||
the recorder forwards to ``https://<host>`` (folded into the cache key so two
|
||||
providers sharing a path can't collide). Routing rides ``api_base`` because
|
||||
some provider handlers drop custom request headers.
|
||||
|
||||
Recordings live in the same Redis cassette store as the VCR persister
|
||||
(``CASSETTE_REDIS_URL``) and expire ``CASSETTE_TTL_SECONDS`` after their last
|
||||
@ -43,6 +48,12 @@ RECORD_KEY_PREFIX = "litellm:openai:record:"
|
||||
RECORDER_REDIS_URL_ENV = "CASSETTE_REDIS_URL"
|
||||
UPSTREAM_BASE_URL_ENV = "RECORDER_UPSTREAM_BASE_URL"
|
||||
DEFAULT_UPSTREAM_BASE_URL = "https://api.openai.com"
|
||||
# One recorder fronts many providers. A non-default provider is addressed by
|
||||
# prefixing the request path with ``/__recorder_upstream/<host>/`` via the
|
||||
# model's ``api_base``. This rides ``api_base`` (which every litellm provider
|
||||
# honours) rather than a custom header (which some provider handlers, e.g.
|
||||
# cohere rerank, silently drop).
|
||||
UPSTREAM_PATH_PREFIX = "/__recorder_upstream/"
|
||||
|
||||
Headers = List[Tuple[str, str]]
|
||||
UpstreamResult = Tuple[int, Headers, bytes]
|
||||
@ -72,11 +83,25 @@ _STRIPPED_RESPONSE_HEADERS = frozenset(
|
||||
)
|
||||
|
||||
|
||||
def _resolve_upstream(path: str, default_upstream: str) -> Tuple[str, str]:
|
||||
"""Map an incoming request path to ``(upstream_base_url, real_path)``.
|
||||
|
||||
A path under ``/__recorder_upstream/<host>/...`` targets that provider; any
|
||||
other path goes to the default upstream unchanged.
|
||||
"""
|
||||
if path.startswith(UPSTREAM_PATH_PREFIX):
|
||||
host, _, rest = path[len(UPSTREAM_PATH_PREFIX) :].partition("/")
|
||||
return f"https://{host}", f"/{rest}"
|
||||
return default_upstream, path
|
||||
|
||||
|
||||
def _canonical_body(body: bytes) -> bytes:
|
||||
if not body:
|
||||
return b""
|
||||
try:
|
||||
return json.dumps(json.loads(body), sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||
return json.dumps(
|
||||
json.loads(body), sort_keys=True, separators=(",", ":")
|
||||
).encode("utf-8")
|
||||
except (ValueError, TypeError):
|
||||
return body
|
||||
|
||||
@ -86,11 +111,12 @@ def _sanitize_headers(headers: Headers) -> Headers:
|
||||
|
||||
|
||||
class OpenAIRecordReplay:
|
||||
"""Record-once / replay-from-Redis for upstream OpenAI HTTP calls.
|
||||
"""Record-once / replay-from-Redis for upstream provider HTTP calls.
|
||||
|
||||
``redis_client`` is injected so the process wiring and the tests share one
|
||||
code path; pass ``None`` to run as a pure live passthrough (local dev with
|
||||
no cassette Redis).
|
||||
no cassette Redis). ``upstream_base_url`` is the default provider; per
|
||||
request it can be overridden by a ``/__recorder_upstream/<host>/`` path.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -105,10 +131,16 @@ class OpenAIRecordReplay:
|
||||
self._ttl_seconds = ttl_seconds
|
||||
|
||||
@staticmethod
|
||||
def record_key(method: str, path: str, body: bytes) -> str:
|
||||
def record_key(
|
||||
method: str,
|
||||
path: str,
|
||||
body: bytes,
|
||||
upstream_base_url: str = DEFAULT_UPSTREAM_BASE_URL,
|
||||
) -> str:
|
||||
digest = hashlib.sha256(
|
||||
b"\n".join(
|
||||
[
|
||||
upstream_base_url.rstrip("/").encode("utf-8"),
|
||||
method.upper().encode("utf-8"),
|
||||
path.encode("utf-8"),
|
||||
_canonical_body(body),
|
||||
@ -117,8 +149,18 @@ class OpenAIRecordReplay:
|
||||
).hexdigest()
|
||||
return f"{RECORD_KEY_PREFIX}{digest}"
|
||||
|
||||
async def handle(self, method: str, path: str, body: bytes, fetch_upstream: FetchUpstream) -> UpstreamResult:
|
||||
key = self.record_key(method, path, body)
|
||||
async def handle(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
body: bytes,
|
||||
fetch_upstream: FetchUpstream,
|
||||
*,
|
||||
upstream_base_url: Optional[str] = None,
|
||||
) -> UpstreamResult:
|
||||
key = self.record_key(
|
||||
method, path, body, upstream_base_url or self.upstream_base_url
|
||||
)
|
||||
cached = self._cache_get(key)
|
||||
if cached is not None:
|
||||
_LOGGER.info("HIT replayed from cassette: %s %s", method, path)
|
||||
@ -127,7 +169,12 @@ class OpenAIRecordReplay:
|
||||
status, headers, resp_body = await fetch_upstream()
|
||||
sanitized = _sanitize_headers(headers)
|
||||
if not (200 <= status < 300):
|
||||
_LOGGER.info("MISS forwarded live, not cached (status=%s): %s %s", status, method, path)
|
||||
_LOGGER.info(
|
||||
"MISS forwarded live, not cached (status=%s): %s %s",
|
||||
status,
|
||||
method,
|
||||
path,
|
||||
)
|
||||
elif self._cache_set(key, status, sanitized, resp_body):
|
||||
_LOGGER.info("MISS forwarded live and recorded: %s %s", method, path)
|
||||
else:
|
||||
@ -220,7 +267,9 @@ def create_app(recorder: Optional[OpenAIRecordReplay] = None, http_client=None):
|
||||
if recorder is None:
|
||||
recorder = OpenAIRecordReplay(
|
||||
redis_client=_build_default_redis_client(),
|
||||
upstream_base_url=os.environ.get(UPSTREAM_BASE_URL_ENV, DEFAULT_UPSTREAM_BASE_URL),
|
||||
upstream_base_url=os.environ.get(
|
||||
UPSTREAM_BASE_URL_ENV, DEFAULT_UPSTREAM_BASE_URL
|
||||
),
|
||||
)
|
||||
owns_client = http_client is None
|
||||
client = http_client or httpx.AsyncClient(timeout=httpx.Timeout(120.0))
|
||||
@ -239,14 +288,21 @@ def create_app(recorder: Optional[OpenAIRecordReplay] = None, http_client=None):
|
||||
|
||||
async def proxy(request):
|
||||
body = await request.body()
|
||||
path = request.url.path
|
||||
full_path = f"{path}?{request.url.query}" if request.url.query else path
|
||||
upstream_base_url, real_path = _resolve_upstream(
|
||||
request.url.path, recorder.upstream_base_url
|
||||
)
|
||||
upstream_base_url = upstream_base_url.rstrip("/")
|
||||
full_path = (
|
||||
f"{real_path}?{request.url.query}" if request.url.query else real_path
|
||||
)
|
||||
|
||||
async def fetch_upstream() -> UpstreamResult:
|
||||
fwd_headers = {k: v for k, v in request.headers.items() if k.lower() != "host"}
|
||||
fwd_headers = {
|
||||
k: v for k, v in request.headers.items() if k.lower() != "host"
|
||||
}
|
||||
upstream = await client.request(
|
||||
request.method,
|
||||
f"{recorder.upstream_base_url}{full_path}",
|
||||
f"{upstream_base_url}{full_path}",
|
||||
content=body,
|
||||
headers=fwd_headers,
|
||||
)
|
||||
@ -256,13 +312,21 @@ def create_app(recorder: Optional[OpenAIRecordReplay] = None, http_client=None):
|
||||
upstream.content,
|
||||
)
|
||||
|
||||
status, headers, resp_body = await recorder.handle(request.method, full_path, body, fetch_upstream)
|
||||
status, headers, resp_body = await recorder.handle(
|
||||
request.method,
|
||||
full_path,
|
||||
body,
|
||||
fetch_upstream,
|
||||
upstream_base_url=upstream_base_url,
|
||||
)
|
||||
return Response(content=resp_body, status_code=status, headers=dict(headers))
|
||||
|
||||
return Starlette(
|
||||
routes=[
|
||||
Route("/__recorder_health", health, methods=["GET"]),
|
||||
Route("/{path:path}", proxy, methods=["GET", "POST", "PUT", "PATCH", "DELETE"]),
|
||||
Route(
|
||||
"/{path:path}", proxy, methods=["GET", "POST", "PUT", "PATCH", "DELETE"]
|
||||
),
|
||||
],
|
||||
lifespan=lifespan,
|
||||
)
|
||||
@ -273,7 +337,9 @@ if __name__ == "__main__":
|
||||
|
||||
import uvicorn
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s"
|
||||
)
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--host", default="0.0.0.0")
|
||||
parser.add_argument("--port", type=int, default=8090)
|
||||
|
||||
@ -12,7 +12,9 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..",
|
||||
from tests._openai_record_replay_proxy import ( # noqa: E402
|
||||
CASSETTE_TTL_SECONDS,
|
||||
RECORD_KEY_PREFIX,
|
||||
UPSTREAM_PATH_PREFIX,
|
||||
OpenAIRecordReplay,
|
||||
_resolve_upstream,
|
||||
)
|
||||
|
||||
_OK_BODY = b'{"data":[{"b64_json":"aW1n"}],"usage":{"total_tokens":42}}'
|
||||
@ -24,7 +26,9 @@ class _Upstream:
|
||||
def __init__(self, status=200, headers=None, body=_OK_BODY):
|
||||
self.calls = 0
|
||||
self._status = status
|
||||
self._headers = headers if headers is not None else [("content-type", "application/json")]
|
||||
self._headers = (
|
||||
headers if headers is not None else [("content-type", "application/json")]
|
||||
)
|
||||
self._body = body
|
||||
|
||||
async def __call__(self):
|
||||
@ -33,7 +37,9 @@ class _Upstream:
|
||||
|
||||
|
||||
def _recorder(client=None):
|
||||
return OpenAIRecordReplay(client if client is not None else fakeredis.FakeStrictRedis())
|
||||
return OpenAIRecordReplay(
|
||||
client if client is not None else fakeredis.FakeStrictRedis()
|
||||
)
|
||||
|
||||
|
||||
def _run(coro):
|
||||
@ -46,13 +52,17 @@ def test_miss_forwards_to_upstream_and_records():
|
||||
upstream = _Upstream()
|
||||
|
||||
status, headers, body = _run(
|
||||
recorder.handle("POST", "/v1/images/generations", b'{"model":"gpt-image-1"}', upstream)
|
||||
recorder.handle(
|
||||
"POST", "/v1/images/generations", b'{"model":"gpt-image-1"}', upstream
|
||||
)
|
||||
)
|
||||
|
||||
assert upstream.calls == 1
|
||||
assert status == 200
|
||||
assert body == _OK_BODY
|
||||
key = OpenAIRecordReplay.record_key("POST", "/v1/images/generations", b'{"model":"gpt-image-1"}')
|
||||
key = OpenAIRecordReplay.record_key(
|
||||
"POST", "/v1/images/generations", b'{"model":"gpt-image-1"}'
|
||||
)
|
||||
assert key.startswith(RECORD_KEY_PREFIX)
|
||||
assert fake.get(key) is not None
|
||||
|
||||
@ -74,15 +84,27 @@ def test_different_body_is_a_separate_recording():
|
||||
recorder = _recorder()
|
||||
upstream = _Upstream()
|
||||
|
||||
_run(recorder.handle("POST", "/v1/images/generations", b'{"prompt":"otter"}', upstream))
|
||||
_run(recorder.handle("POST", "/v1/images/generations", b'{"prompt":"seal"}', upstream))
|
||||
_run(
|
||||
recorder.handle(
|
||||
"POST", "/v1/images/generations", b'{"prompt":"otter"}', upstream
|
||||
)
|
||||
)
|
||||
_run(
|
||||
recorder.handle(
|
||||
"POST", "/v1/images/generations", b'{"prompt":"seal"}', upstream
|
||||
)
|
||||
)
|
||||
|
||||
assert upstream.calls == 2
|
||||
|
||||
|
||||
def test_record_key_ignores_json_key_order():
|
||||
a = OpenAIRecordReplay.record_key("POST", "/v1/images/generations", b'{"model":"x","prompt":"y"}')
|
||||
b = OpenAIRecordReplay.record_key("POST", "/v1/images/generations", b'{"prompt":"y","model":"x"}')
|
||||
a = OpenAIRecordReplay.record_key(
|
||||
"POST", "/v1/images/generations", b'{"model":"x","prompt":"y"}'
|
||||
)
|
||||
b = OpenAIRecordReplay.record_key(
|
||||
"POST", "/v1/images/generations", b'{"prompt":"y","model":"x"}'
|
||||
)
|
||||
assert a == b
|
||||
|
||||
|
||||
@ -126,8 +148,12 @@ def test_replay_drops_framing_headers_so_server_recomputes():
|
||||
)
|
||||
body_in = b'{"model":"gpt-image-1"}'
|
||||
|
||||
_, live_headers, _ = _run(recorder.handle("POST", "/v1/images/generations", body_in, upstream))
|
||||
_, replay_headers, _ = _run(recorder.handle("POST", "/v1/images/generations", body_in, upstream))
|
||||
_, live_headers, _ = _run(
|
||||
recorder.handle("POST", "/v1/images/generations", body_in, upstream)
|
||||
)
|
||||
_, replay_headers, _ = _run(
|
||||
recorder.handle("POST", "/v1/images/generations", body_in, upstream)
|
||||
)
|
||||
|
||||
for headers in (live_headers, replay_headers):
|
||||
names = {k.lower() for k, _ in headers}
|
||||
@ -151,7 +177,9 @@ def test_non_2xx_response_is_not_cached():
|
||||
body_in = b'{"model":"gpt-image-1"}'
|
||||
key = OpenAIRecordReplay.record_key("POST", "/v1/images/generations", body_in)
|
||||
|
||||
status, _, _ = _run(recorder.handle("POST", "/v1/images/generations", body_in, upstream))
|
||||
status, _, _ = _run(
|
||||
recorder.handle("POST", "/v1/images/generations", body_in, upstream)
|
||||
)
|
||||
assert status == 500
|
||||
assert fake.get(key) is None
|
||||
|
||||
@ -238,9 +266,16 @@ def test_handle_warns_when_recording_not_persisted(caplog):
|
||||
upstream = _Upstream()
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="openai_record_replay"):
|
||||
_run(recorder.handle("POST", "/v1/images/generations", b'{"model":"gpt-image-1"}', upstream))
|
||||
_run(
|
||||
recorder.handle(
|
||||
"POST", "/v1/images/generations", b'{"model":"gpt-image-1"}', upstream
|
||||
)
|
||||
)
|
||||
|
||||
assert any(r.levelno == logging.WARNING and "NOT recorded" in r.getMessage() for r in caplog.records)
|
||||
assert any(
|
||||
r.levelno == logging.WARNING and "NOT recorded" in r.getMessage()
|
||||
for r in caplog.records
|
||||
)
|
||||
|
||||
|
||||
def test_log_startup_mode_distinguishes_replay_from_passthrough(caplog):
|
||||
@ -264,4 +299,147 @@ def test_log_startup_mode_warns_when_redis_configured_but_unreachable(caplog):
|
||||
with caplog.at_level(logging.WARNING, logger="openai_record_replay"):
|
||||
_recorder(_UnreachableRedis()).log_startup_mode()
|
||||
|
||||
assert any(r.levelno == logging.WARNING and "DEGRADED" in r.getMessage() for r in caplog.records)
|
||||
assert any(
|
||||
r.levelno == logging.WARNING and "DEGRADED" in r.getMessage()
|
||||
for r in caplog.records
|
||||
)
|
||||
|
||||
|
||||
def test_record_key_distinguishes_upstreams():
|
||||
"""One recorder fronts many providers; an identical path+body to two of them
|
||||
must not collide into one recording."""
|
||||
args = ("POST", "/v1/rerank", b'{"query":"x"}')
|
||||
cohere = OpenAIRecordReplay.record_key(*args, "https://api.cohere.com")
|
||||
anthropic = OpenAIRecordReplay.record_key(*args, "https://api.anthropic.com")
|
||||
assert cohere != anthropic
|
||||
|
||||
|
||||
def test_same_path_and_body_to_different_upstreams_record_separately():
|
||||
fake = fakeredis.FakeStrictRedis()
|
||||
recorder = _recorder(fake)
|
||||
cohere_upstream = _Upstream(body=b'{"from":"cohere"}')
|
||||
anthropic_upstream = _Upstream(body=b'{"from":"anthropic"}')
|
||||
body = b'{"query":"x"}'
|
||||
|
||||
first = _run(
|
||||
recorder.handle(
|
||||
"POST",
|
||||
"/v1/rerank",
|
||||
body,
|
||||
cohere_upstream,
|
||||
upstream_base_url="https://api.cohere.com",
|
||||
)
|
||||
)
|
||||
second = _run(
|
||||
recorder.handle(
|
||||
"POST",
|
||||
"/v1/rerank",
|
||||
body,
|
||||
anthropic_upstream,
|
||||
upstream_base_url="https://api.anthropic.com",
|
||||
)
|
||||
)
|
||||
|
||||
assert cohere_upstream.calls == 1 and anthropic_upstream.calls == 1
|
||||
assert first[2] == b'{"from":"cohere"}'
|
||||
assert second[2] == b'{"from":"anthropic"}'
|
||||
|
||||
replayed = _run(
|
||||
recorder.handle(
|
||||
"POST",
|
||||
"/v1/rerank",
|
||||
body,
|
||||
cohere_upstream,
|
||||
upstream_base_url="https://api.cohere.com",
|
||||
)
|
||||
)
|
||||
assert cohere_upstream.calls == 1
|
||||
assert replayed[2] == b'{"from":"cohere"}'
|
||||
|
||||
|
||||
def test_resolve_upstream_prefix_selects_host_and_strips_it():
|
||||
upstream, real_path = _resolve_upstream(
|
||||
f"{UPSTREAM_PATH_PREFIX}api.cohere.com/v2/rerank", "https://api.openai.com"
|
||||
)
|
||||
assert upstream == "https://api.cohere.com"
|
||||
assert real_path == "/v2/rerank"
|
||||
|
||||
|
||||
def test_resolve_upstream_without_prefix_uses_default():
|
||||
upstream, real_path = _resolve_upstream("/v1/embeddings", "https://api.openai.com")
|
||||
assert upstream == "https://api.openai.com"
|
||||
assert real_path == "/v1/embeddings"
|
||||
|
||||
|
||||
class _Resp:
|
||||
def __init__(self, status, headers, body):
|
||||
self.status_code = status
|
||||
self.headers = dict(headers)
|
||||
self.content = body
|
||||
|
||||
|
||||
class _CapturingClient:
|
||||
"""Captures the upstream request the app makes so routing can be asserted."""
|
||||
|
||||
def __init__(self, status=200, headers=None, body=b'{"ok":true}'):
|
||||
self.calls = []
|
||||
self._status = status
|
||||
self._headers = (
|
||||
headers if headers is not None else [("content-type", "application/json")]
|
||||
)
|
||||
self._body = body
|
||||
|
||||
async def request(self, method, url, *, content, headers):
|
||||
self.calls.append({"method": method, "url": url, "headers": headers})
|
||||
return _Resp(self._status, self._headers, self._body)
|
||||
|
||||
async def aclose(self):
|
||||
pass
|
||||
|
||||
|
||||
def test_upstream_prefix_routes_live_call_to_named_host_and_preserves_auth():
|
||||
"""A non-OpenAI model routes by the path prefix; the prefix selects the real
|
||||
provider host and the caller's auth header must pass through unchanged."""
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
from tests._openai_record_replay_proxy import create_app
|
||||
|
||||
client = _CapturingClient()
|
||||
app = create_app(
|
||||
recorder=OpenAIRecordReplay(fakeredis.FakeStrictRedis()), http_client=client
|
||||
)
|
||||
|
||||
with TestClient(app) as tc:
|
||||
resp = tc.post(
|
||||
f"{UPSTREAM_PATH_PREFIX}api.anthropic.com/v1/messages",
|
||||
content=b'{"model":"claude"}',
|
||||
headers={"x-api-key": "secret", "anthropic-version": "2023-06-01"},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert len(client.calls) == 1
|
||||
call = client.calls[0]
|
||||
assert call["url"] == "https://api.anthropic.com/v1/messages"
|
||||
forwarded = {k.lower() for k in call["headers"]}
|
||||
assert "host" not in forwarded
|
||||
assert "x-api-key" in forwarded
|
||||
|
||||
|
||||
def test_no_prefix_falls_back_to_default_openai_upstream():
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
from tests._openai_record_replay_proxy import create_app
|
||||
|
||||
client = _CapturingClient()
|
||||
app = create_app(
|
||||
recorder=OpenAIRecordReplay(fakeredis.FakeStrictRedis()), http_client=client
|
||||
)
|
||||
|
||||
with TestClient(app) as tc:
|
||||
tc.post(
|
||||
"/v1/embeddings",
|
||||
content=b'{"input":"hi"}',
|
||||
headers={"authorization": "Bearer k"},
|
||||
)
|
||||
|
||||
assert client.calls[0]["url"] == "https://api.openai.com/v1/embeddings"
|
||||
|
||||
@ -3,6 +3,7 @@ model_list:
|
||||
litellm_params:
|
||||
model: "anthropic/claude-sonnet-4-5-20250929"
|
||||
api_key: os.environ/ANTHROPIC_API_KEY
|
||||
api_base: os.environ/RECORDER_ANTHROPIC_BASE_URL # In CI, routes through the record/replay proxy; unset elsewhere -> direct to Anthropic
|
||||
|
||||
- model_name: bedrock-claude-sonnet-3.5
|
||||
litellm_params:
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
## Tests /key endpoints.
|
||||
|
||||
import pytest
|
||||
import asyncio, time, uuid
|
||||
import asyncio, uuid
|
||||
import aiohttp
|
||||
from openai import AsyncOpenAI
|
||||
import sys, os
|
||||
@ -272,7 +272,7 @@ async def chat_completion_streaming(session, key, model="gpt-4"):
|
||||
client = AsyncOpenAI(api_key=key, base_url="http://0.0.0.0:4000")
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful assistant"},
|
||||
{"role": "user", "content": f"Hello! {time.time()}"},
|
||||
{"role": "user", "content": "Hello!"},
|
||||
]
|
||||
prompt_tokens = litellm.token_counter(model="gpt-35-turbo", messages=messages)
|
||||
data = {
|
||||
@ -620,6 +620,20 @@ async def test_key_info_spend_values_image_generation():
|
||||
spend = key_info["info"]["spend"]
|
||||
assert spend > 0
|
||||
|
||||
# The record/replay proxy serves this identical second call from its
|
||||
# cassette (free), but the proxy must still bill it. If the proxy's own
|
||||
# response cache were on, the repeat would be a $0 cache hit and spend
|
||||
# would not move, silently zeroing recorded-call spend; assert it grows.
|
||||
await image_generation(session=session, key=key)
|
||||
await asyncio.sleep(5)
|
||||
key_info = await retry_request(
|
||||
get_key_info, session=session, get_key=key, call_key=key
|
||||
)
|
||||
assert key_info["info"]["spend"] > spend, (
|
||||
"spend did not increase on an identical repeat image call; the proxy "
|
||||
"response cache appears to be ON, which would zero recorded-call spend"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Frequent check on ci/cd leads to read timeout issue.")
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@ -5,7 +5,6 @@ import asyncio
|
||||
import aiohttp, openai
|
||||
from openai import OpenAI, AsyncOpenAI, AzureOpenAI, AsyncAzureOpenAI
|
||||
from typing import Optional, List, Union
|
||||
from litellm._uuid import uuid
|
||||
|
||||
LITELLM_MASTER_KEY = "sk-1234"
|
||||
|
||||
@ -82,7 +81,7 @@ async def moderation(session, key):
|
||||
"Authorization": f"Bearer {key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
data = {"input": "I want to kill the cat."}
|
||||
data = {"model": "text-moderation-stable", "input": "I want to kill the cat."}
|
||||
|
||||
async with session.post(url, headers=headers, json=data) as response:
|
||||
status = response.status
|
||||
@ -107,7 +106,7 @@ async def chat_completion(session, key, model: Union[str, List] = "gpt-4"):
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": "You are a helpful assistant."},
|
||||
{"role": "user", "content": f"Hello! {uuid.uuid4()}"},
|
||||
{"role": "user", "content": "Hello!"},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user