litellm/tests/_vcr_redis_persister.py
Mateo Wang 533eab4dbd
fix(tests/vcr): make Redis cassette cache replay deterministically (zero VCR misses on consecutive runs) (#28826)
* test(vcr): make Redis-backed cassettes replay deterministically across runs

- Pin LITELLM_LOCAL_MODEL_COST_MAP=True in the shared VCR harness so the
  per-test importlib.reload(litellm) no longer fetches the model cost map
  from raw.githubusercontent.com. That live fetch was being recorded into
  cassettes; for tests that subsequently skip it was the only recorded
  episode, so the persister refused to save it (skipped tests don't persist)
  and the test re-recorded it live every run (MISS:NOT_PERSISTED).

- Compare-time symmetric matcher tolerance for Google OAuth (ya29.*) tokens,
  observability/telemetry payloads, credential-exchange bodies, and volatile
  UUID/timestamp tokens, so existing cassettes select a recorded episode
  instead of growing past the 50-episode cap and re-recording live.

- Don't record fire-and-forget telemetry (langfuse/arize/otel/...) into
  non-telemetry tests' cassettes. Several modules set litellm.success_callback
  at import time, so observability logging is globally enabled and an async
  flush from the background logging worker lands in an unrelated test's VCR
  window, saved as a spurious MISS:RECORDED (observed: a Langfuse batch from
  another completion landing on test_lowest_latency_routing_buffer). Such a
  request now passes through live (telemetry hosts aren't real-spend hosts);
  tests that actually assert on telemetry keep recording it.

- Dedupe + cap the VCR diagnostic dump so the classification summary survives
  CircleCI's ~400KB step-output truncation.

- Stabilize a non-deterministic rate-limit test body; mark AWS Secrets Manager
  lifecycle tests VCR-incompatible (uniquely-named secrets can't be replayed).

- Mark test_router_text_completion_client VCR-incompatible: it fires 300
  identical requests to verify async-client reuse, but vcrpy patches the HTTP
  transport so replay never exercises the real connection pool the test
  validates, and recording 300 near-identical episodes overflows the
  50-episode cap (MISS:OVERFLOW every run). It hits a free mock endpoint.

- Mark the Vertex AI MaaS Mistral OCR tests (vertex_ai/mistral-ocr-2505)
  VCR-incompatible: the MaaS model is not provisioned in the CI GCP project,
  so the live :rawPredict call fails and the test skips every run, leaving no
  cassette to record (MISS:NOT_PERSISTED every run). Sibling direct-Mistral
  and Azure OCR tests are unaffected and still replay from cache.

* fix(tests/vcr): refresh cassette TTL on read so replayed cassettes don't expire

The Redis VCR persister loaded cassettes with a plain GET, which does not
touch the key's TTL. A cassette that is only ever replayed (HIT/NOOP, never
re-recorded) therefore expired exactly 24h after its last *write*, no matter
how often it was read. Whichever CI run happened to cross that boundary
re-recorded the cassette live and surfaced a spurious VCR MISS on otherwise
deterministic cassettes — the residual per-run flakiness floor (a different
random subset of read-only cassettes expiring each run).

Slide the expiry forward on every successful load (best-effort EXPIRE), so
any cassette used at least once per TTL window stays alive indefinitely and
the 2nd/3rd run of a day replays cleanly.

* fix(tests/vcr): recover from spurious GET-None for existing cassette keys

Under concurrent CI load, the persister's load GET was observed returning
None for a cassette key that demonstrably existed on the (single, non-
clustered) Redis master — an external monitor saw the key present with a
healthy TTL at the same instant the in-process client read None. Because
None is a valid GET result (not a RedisError), the retry-on-error client
config never engaged, so the cassette re-recorded live (a phantom
MISS:RECORDED); for flaky/networked tests the failed live call then
triggered a pytest rerun, which is why a rotating subset of otherwise
deterministic tests missed each run.

On a None result, re-check EXISTS and re-read once. If the key really
exists, use the recovered value and log [vcr-transient-miss-recovered]
(also counted in cassette_cache_health). A genuinely absent key (a new
cassette) still falls through to CassetteNotFoundError.

* chore(tests/vcr): TEMP diagnostic for persistent-miss cassette load path

Logs GET/EXISTS at load time for the three cassettes that re-record every
run despite being present in Redis, to capture what the in-process client
sees. To be reverted before merge.

* chore(tests/vcr): write load diagnostic to Redis (truncation-proof)

CI stdout truncates to the last ~400KB, dropping the early loaddbg lines
for the alphabetically-first failing test. Push the load probe to a Redis
list instead so it survives. To be reverted before merge.

* fix(tests/vcr): don't drop stored telemetry episodes during cassette load

Root cause of the residual per-run misses on present cassettes: vcrpy's
Cassette._load() replays each *stored* interaction through Cassette.append(),
which runs before_record_request on it — and a None return there silently
drops that episode. The telemetry-leak suppressor (_should_drop_telemetry_record)
returns None for telemetry requests, so when a non-telemetry-named test (or the
alphabetically-first test in a worker, whose _current_test_nodeid is still empty)
loaded a cassette containing a Langfuse ingestion episode, the episode was
dropped on read — forcing an endless live re-record (a phantom MISS:RECORDED on
a cassette that was demonstrably present in Redis). Verified by reproducing
Cassette._load() against the real cassette: empty/non-telemetry nodeid -> 0
episodes survive; with the guard -> 1 survives.

Fix: guard the suppressor with a thread-local set around Cassette._load (via a
small idempotent monkeypatch), so the drop only ever stops *new* incidental
telemetry from being recorded and never filters the existing cassette on read.

Also drops the speculative GET-None recovery + its diagnostics from the previous
commits: the load diagnostic showed GET returns the cassette bytes fine
(get=1440B), so the persister never returned a spurious None — the loss happened
later in vcrpy's append. The proven TTL-refresh-on-read fix is retained.

* fix(tests/vcr): drop incidental telemetry export POSTs to stop rotating async-flush misses

litellm's observability loggers flush on a background thread, so a Langfuse
ingestion POST scheduled by one telemetry test can fire mid-way through a
*later* telemetry-named test (after that test's own httpx mock has exited) and
be recorded by VCR as a phantom episode — a non-deterministic MISS:RECORDED /
PARTIAL that rotates onto a different telemetry test from run to run.

Telemetry export POSTs are fire-and-forget; no test asserts on a *recorded*
export response except the pass-through proxy test (which forwards a client POST
to Langfuse ingestion and replays its 207). So _should_drop_telemetry_record now
drops incidental export POSTs for every test except that one. Dropping returns
None (live fire-and-forget, never stored), so it can only turn a phantom miss
into a harmless live call, never the reverse; recorded read-back GETs that
telemetry tests assert on are matched by method and left untouched.

* fix(tests/vcr): restore assertion in test_banner_silent_when_vcr_disabled

The assertion that the banner is suppressed when VCR is disabled was
inadvertently moved into test_diagnostic_log_silent_when_no_dir when
the diagnostic-log tests were added, leaving the disabled-VCR test
verifying nothing.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Yassin Kortam <yassin@berri.ai>
2026-05-26 11:30:44 -07:00

294 lines
11 KiB
Python

from __future__ import annotations
import logging
import os
import warnings
from typing import Any, Optional
from vcr.persisters.filesystem import CassetteNotFoundError
from vcr.serialize import deserialize, serialize
CASSETTE_TTL_SECONDS = 24 * 60 * 60
REDIS_KEY_PREFIX = "litellm:vcr:cassette:"
CASSETTE_REDIS_URL_ENV = "CASSETTE_REDIS_URL"
VCR_VERBOSE_ENV = "LITELLM_VCR_VERBOSE"
MAX_EPISODES_PER_CASSETTE = 50
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_log = logging.getLogger(__name__)
_passed_by_cassette_key: dict[str, bool] = {}
class VCRCassetteCacheWarning(UserWarning):
"""Emitted when the cassette Redis cache fails to load or save.
Surfaced in pytest's session-end warnings summary so failures are
visible in CI logs even when the underlying tests pass.
"""
# Per-process counters; surfaced via :func:`cassette_cache_health` so
# conftests can emit a session-end banner when failures occurred.
_cache_health = {
"save_failures": 0,
"save_failure_last_error": "",
"load_failures": 0,
"load_failure_last_error": "",
}
def _record_cache_failure(kind: str, exc: BaseException) -> None:
err = f"{type(exc).__name__}: {exc}"
if kind == "save":
_cache_health["save_failures"] = int(_cache_health["save_failures"]) + 1
_cache_health["save_failure_last_error"] = err
elif kind == "load":
_cache_health["load_failures"] = int(_cache_health["load_failures"]) + 1
_cache_health["load_failure_last_error"] = err
def cassette_cache_health() -> dict:
return dict(_cache_health)
def reset_cassette_cache_health() -> None:
_cache_health["save_failures"] = 0
_cache_health["save_failure_last_error"] = ""
_cache_health["load_failures"] = 0
_cache_health["load_failure_last_error"] = ""
def cassette_cache_capacity_snapshot(client: Optional[Any] = None) -> Optional[dict]:
"""Probe Redis ``INFO memory`` and return used/max bytes and percent.
Returns ``None`` if Redis is unreachable, the server didn't report
``maxmemory``, or ``maxmemory`` is 0 (uncapped). Best-effort: any
exception turns into ``None`` so this never breaks a test session.
"""
try:
if client is None:
client = _build_default_client()
info = client.info(section="memory")
except Exception: # pragma: no cover - best-effort probe
return None
used = info.get("used_memory")
maxmem = info.get("maxmemory")
try:
used = int(used) if used is not None else None
maxmem = int(maxmem) if maxmem is not None else None
except (TypeError, ValueError): # pragma: no cover - defensive
return None
if not used or not maxmem or maxmem <= 0:
return None
return {
"used_memory_bytes": used,
"maxmemory_bytes": maxmem,
"used_pct": (used / maxmem) * 100.0,
}
def mark_test_outcome_for_cassette(cassette_path: str, passed: bool) -> None:
_passed_by_cassette_key[redis_key_for(cassette_path)] = passed
def redis_key_for(cassette_path: str) -> str:
abs_path = os.path.abspath(str(cassette_path))
try:
rel = os.path.relpath(abs_path, start=_REPO_ROOT)
except ValueError:
rel = os.path.basename(abs_path)
if rel.endswith(".yaml"):
rel = rel[: -len(".yaml")]
rel = rel.replace("/cassettes/", "/").lstrip("./")
return f"{REDIS_KEY_PREFIX}{rel}"
def _redis_url_from_env() -> Optional[str]:
return os.environ.get(CASSETTE_REDIS_URL_ENV) or None
def _build_default_client():
import redis
from redis.backoff import ExponentialBackoff
from redis.exceptions import ConnectionError as RedisConnectionError
from redis.exceptions import TimeoutError as RedisTimeoutError
from redis.retry import Retry
url = _redis_url_from_env()
if not url:
raise RuntimeError(
f"Set {CASSETTE_REDIS_URL_ENV} to enable the VCR persister. "
"Cassette Redis is intentionally separate from the application "
"Redis (REDIS_URL/REDIS_HOST) to avoid being flushed by tests."
)
return redis.Redis.from_url(
url,
socket_timeout=5,
socket_connect_timeout=5,
decode_responses=False,
retry=Retry(ExponentialBackoff(cap=2, base=0.1), retries=2),
retry_on_error=[RedisConnectionError, RedisTimeoutError],
)
def make_redis_persister(
client: Optional[Any] = None,
ttl_seconds: int = CASSETTE_TTL_SECONDS,
):
redis_client = client if client is not None else _build_default_client()
try:
from redis.exceptions import RedisError
except ImportError: # pragma: no cover - redis is a hard test dep
RedisError = Exception # type: ignore[assignment,misc]
class _RedisPersister:
@staticmethod
def load_cassette(cassette_path, serializer):
key = redis_key_for(cassette_path)
try:
data = redis_client.get(key)
except RedisError as exc:
_record_cache_failure("load", exc)
msg = (
f"VCR redis load failed for {cassette_path}; treating "
f"as cache miss: {type(exc).__name__}: {exc}"
)
_log.warning(msg)
warnings.warn(msg, VCRCassetteCacheWarning, stacklevel=2)
raise CassetteNotFoundError() from exc
if data is None:
raise CassetteNotFoundError()
try:
if isinstance(data, bytes):
data = data.decode("utf-8")
result = deserialize(data, serializer)
except Exception as exc:
_record_cache_failure("load", exc)
msg = (
f"VCR redis load failed for {cassette_path}; cached "
f"payload is corrupt, treating as cache miss: "
f"{type(exc).__name__}: {exc}"
)
_log.warning(msg)
warnings.warn(msg, VCRCassetteCacheWarning, stacklevel=2)
raise CassetteNotFoundError() from exc
# Slide the expiry forward on every successful read. A plain GET
# does not touch the key's TTL, so a cassette that is only ever
# replayed (HIT/NOOP, never re-recorded) expires exactly
# ``ttl_seconds`` after its last *write* no matter how often it is
# read — and whichever CI run happens to cross that boundary
# re-records it live, surfacing as a spurious VCR MISS that no
# amount of matcher tolerance can prevent. Refreshing the TTL on
# read keeps any cassette used at least once per TTL window alive
# indefinitely, so the second/third run of a day replays cleanly.
# Best-effort: a failed refresh must never turn a successful load
# into a miss.
try:
redis_client.expire(key, ttl_seconds)
except RedisError:
pass
return result
@staticmethod
def save_cassette(cassette_path, cassette_dict, serializer):
key = redis_key_for(cassette_path)
passed = _passed_by_cassette_key.pop(key, True)
episode_count = len(cassette_dict.get("requests", []) or [])
if episode_count > MAX_EPISODES_PER_CASSETTE:
_log.warning(
"VCR redis save refused for %s; cassette has %d episodes "
"(> MAX_EPISODES_PER_CASSETTE=%d). The test likely produces "
"non-deterministic request bodies (e.g. uuid) and is "
"appending instead of replaying. Opt it out with the "
"no-vcr list in conftest, or stabilize its request body.",
cassette_path,
episode_count,
MAX_EPISODES_PER_CASSETTE,
)
return
if not passed:
_log.info(
"VCR redis save skipped for %s; test did not pass — "
"leaving any prior cassette intact",
cassette_path,
)
return
data = serialize(cassette_dict, serializer)
payload = data.encode("utf-8") if isinstance(data, str) else data
try:
redis_client.set(key, payload, ex=ttl_seconds)
except RedisError as exc:
# Cassette persistence is strictly best-effort: connection
# blips, timeouts, OOM at the maxmemory cap, READONLY
# replicas, etc. should all degrade gracefully to "test
# passed but cassette not cached" rather than failing the
# test on teardown. We still want a loud signal so the
# failure shows up in pytest's warnings summary at the
# end of the session and feeds the session-end banner.
_record_cache_failure("save", exc)
msg = (
f"VCR redis save failed for {cassette_path}; cassette "
f"not persisted: {type(exc).__name__}: {exc}"
)
_log.warning(msg)
warnings.warn(msg, VCRCassetteCacheWarning, stacklevel=2)
return _RedisPersister
def filter_non_2xx_response(response):
if not isinstance(response, dict):
return response
status = response.get("status")
code = status.get("code") if isinstance(status, dict) else status
if not isinstance(code, int):
return response
return response if 200 <= code < 300 else None
_PATCHED_AIOHTTP_RECORD = False
def patch_vcrpy_aiohttp_record_path() -> None:
"""Re-feed the response body into aiohttp's StreamReader after vcrpy's
record_response drains it, so downstream consumers (e.g.
LiteLLMAiohttpTransport.AiohttpResponseStream) can still read it."""
global _PATCHED_AIOHTTP_RECORD
if _PATCHED_AIOHTTP_RECORD:
return
import vcr.stubs.aiohttp_stubs as _aiohttp_stubs
_orig_record_response = _aiohttp_stubs.record_response
async def _record_response_preserving_body(cassette, vcr_request, response):
await _orig_record_response(cassette, vcr_request, response)
body = getattr(response, "_body", None) or b""
if body:
response.content.unread_data(body)
_aiohttp_stubs.record_response = _record_response_preserving_body
_PATCHED_AIOHTTP_RECORD = True
def vcr_verbose_enabled() -> bool:
return os.environ.get(VCR_VERBOSE_ENV) == "1"
def format_vcr_verdict(cassette: Any) -> str:
if cassette is None:
return "[VCR NOOP]"
played = getattr(cassette, "play_count", 0) or 0
dirty = getattr(cassette, "dirty", False)
total = len(cassette) if hasattr(cassette, "__len__") else 0
if played == 0 and not dirty:
return "[VCR NOOP] (no http traffic)"
if played > 0 and not dirty:
return f"[VCR HIT] {played} replayed, 0 new ({total} cassette entries)"
if played == 0 and dirty:
return f"[VCR MISS] 0 replayed, recorded new ({total} cassette entries)"
return (
f"[VCR PARTIAL] {played} replayed + new recordings ({total} cassette entries)"
)