feat(context_management): compact_20260112 polyfill for non-Anthropic providers (#28868)

* feat(anthropic/messages): in-gateway context_management polyfill for non-Anthropic providers

- Add `context_management/` module with `clear_tool_uses_20250919` editor
  dispatched before chat-completions translation on `/v1/messages`
- Hard-protect most-recently completed tool_result from being cleared
- Attach `context_management.applied_edits` to both non-streaming and
  streaming (final `message_delta`) responses
- Bedrock Converse: forward `context_management`; filter to
  `compact_20260112`-only edits with `compact-2026-01-12` beta header
- token_counter: guard Anthropic-format tools (no `function` key) to
  prevent AttributeError during polyfill token counting
- Streaming: handle empty-choices usage-only trailing chunks
- Skip polyfill when `litellm.drop_params = True`

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(bedrock): pop None context_management before sending to Bedrock Converse

If context_management is forwarded as None (e.g. when mapping returns
None for an invalid format), _filter_context_management_for_bedrock_converse
previously returned early without removing the key, leaving
"context_management": null in the request and causing a validation
error. Pop the key when the value is not a dict.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(bedrock/converse): pop None context_management; extract helpers to fix PLR0915

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(anthropic/messages): check per-request drop_params alongside global

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(anthropic/messages): preserve drop_params for downstream and respect explicit False

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix: lazy debug logging in clear_tool_uses; remove unused context_management constants

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic/messages): guard context_management polyfill with try/except

Wrap apply_context_management() in a try/except so any failure (e.g.
litellm.token_counter raising on an unknown tokenizer or unexpected
message format) is logged but does not crash the underlying LLM
request. The polyfill is a best-effort additive feature; on failure we
forward the original messages without applied edits.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(token_counter): guard None input_schema in Anthropic tool fallback

Use `or {}` instead of `.get(..., {})` so explicit null parameters do not
raise AttributeError when formatting function definitions for token counting.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix: minimize context_management polyfill threading

- Use None (not empty list) for polyfill_applied_edits when context
  management isn't requested, so semantics of 'feature not requested'
  vs 'feature requested but no edits applied' are distinct.
- In the streaming iterator, only pass applied_edits to the per-chunk
  translator on the final (finish_reason) chunk; intermediate chunks
  ignore it anyway, and this makes intent explicit on both sync and
  async paths.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(context_management): align tool_use counts and normalize list spec

- _count_tool_uses now requires a string id, matching _collect_tool_use_ids_in_order so the tool_uses trigger can't fire on blocks that aren't clearable.
- apply_context_management dispatcher now accepts the OpenAI list form and normalizes it via AnthropicConfig.map_openai_context_management_to_anthropic, so the polyfill path no longer silently no-ops on list input.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* feat(context_management): add compact_20260112 polyfill for non-Anthropic providers

Implements an in-gateway compaction polyfill that summarizes long conversations
using a configurable model when `compact_20260112` is requested for non-Anthropic
targets (e.g. OpenAI, Gemini), matching Anthropic's context management beta
behaviour for those providers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(compact): skip tool_result-only user turns; bedrock: elif for context_management

- compact_20260112 Phase D: when keeping the last user turn after a full
  summary, skip role=user turns whose content is exclusively tool_result
  blocks. Such turns translate to OpenAI tool-role messages with no
  preceding assistant tool_calls (those got summarized away), which
  non-Anthropic providers reject. Fall back to a synthetic continuation
  prompt if no eligible user question exists, so the downstream call
  always has a non-empty user message.
- bedrock converse: chain the context_management param as elif so it
  follows the same if/elif pattern as the surrounding thinking/
  reasoning_effort checks.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic): post-compaction question selection, system type, sync stream merge

- compact.py: select last user question from effective_messages (post-compaction slice) instead of raw messages, so prior summarized turns aren't reintroduced
- handler.py: widen _prepare_completion_kwargs system parameter type to Union[str, List[Dict]] matching PolyfillResult.system
- streaming_iterator.py: mirror async hold-and-merge logic in sync __next__ so context_management is attached to the final merged message_delta when stop_reason and usage arrive in separate chunks

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic/messages): apply context_management on sync path; clear held stop_reason chunk in async iterator

- Sync `anthropic_messages_handler` was silently dropping the
  `context_management` kwarg via `ANTHROPIC_ONLY_REQUEST_KEYS` after the
  polyfill was moved into the async handler. Bridge to the async
  dispatcher with `run_async_function` so `litellm.messages.create()`
  callers keep working (regressed e.g. `clear_tool_uses_20250919`).
- In the streaming iterator's `__anext__` `StopIteration` handler, clear
  `self.holding_stop_reason_chunk` after capturing it (matches `__next__`)
  so a subsequent call doesn't re-emit the same chunk.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(bugfixes): bedrock None context_mgmt; stream per-instance queue; sync polyfill; trailing-chunk passthrough

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic): silently drop trailing chunks after usage; remove dead _polyfill_result key

- streaming_iterator: in sync __next__, after the usage chunk has been
  merged and emitted, silently consume any trailing provider events
  via 'continue' instead of forwarding them through the queue. Trailing
  chunks would translate to content_block_delta or message_delta and
  violate Anthropic SSE ordering after the final message_delta. The
  async __anext__ already drops these via 'if not self.queued_usage_chunk:'
  gating, so this aligns sync and async behavior.

- handler: drop unused '_polyfill_result' from ANTHROPIC_ONLY_REQUEST_KEYS.
  PolyfillResult is passed as an explicit arg to the adapter methods, never
  through extra_kwargs, so the entry was dead code.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* refactor(anthropic): extract usage-merge helper; guard empty slice-only compaction result

- Extract the duplicated hold-and-merge usage logic from the sync __next__ and
  async __anext__ paths into a shared _merge_usage_into_held_stop_reason_chunk
  helper so the subtle cache-token / context_management attachment lives in
  exactly one place.
- In the compact_20260112 slice-only path, fall back to _select_last_user_question
  when _strip_compaction_blocks produces an empty list (e.g. messages ending on
  an assistant turn whose only content was the compaction block) so the
  downstream API never receives an empty messages array.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* refactor(anthropic/context_management): streaming iterator compaction fixes and compact polyfill improvements

- Extract usage-merge helper; guard empty slice-only compaction result
- Silently drop trailing chunks after usage; remove dead _polyfill_result key
- Fix bedrock None context_mgmt; stream per-instance queue; sync polyfill; trailing-chunk passthrough
- Apply context_management on sync path; clear held stop_reason chunk in async iterator
- Fix post-compaction question selection, system type, sync stream merge
- Skip tool_result-only user turns; bedrock: elif for context_management
- Add streaming iterator compaction test suite

Co-authored-by: Cursor <cursoragent@cursor.com>

* revert(html): restore flat *.html naming in _experimental/out

Reverses the accidental rename from *.html → */index.html introduced in
15ea941fbe. All 35 files moved back to their original flat paths so the
directory structure matches litellm_internal_staging.

Co-authored-by: Cursor <cursoragent@cursor.com>

* revert(config): restore proxy_server_config.yaml to litellm_internal_staging

Co-authored-by: Cursor <cursoragent@cursor.com>

* Fix: skip client compaction pre-processing when compact_20260112 polyfill will run

The _prepare_context_managed_request helper unconditionally applied
apply_client_compaction_block_history before invoking the polyfill. When
the request also configured a compact_20260112 spec, that pre-processing
consumed the client-sent compaction block and collapsed the message history
to just the latest user question, starving the polyfill of conversation
context. The polyfill's own Phase A (_slice_around_compaction_block)
already handles client compaction blocks correctly and inspects the full
post-compaction tail for the token-threshold check, so the pre-processing
is both redundant and destructive in this case.

Now the pre-processing only runs when no compact_20260112 polyfill spec
will execute (no spec, drop_params on, or only non-compact edits like
clear_tool_uses_20250919).

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic): plug compaction-block leak + iteration-usage gaps in streaming adapter

- handler: when polyfill_will_run skipped client-history pre-processing
  and the polyfill ultimately returned None (best-effort swallow on
  unexpected error), apply the slice-only fallback before returning so
  Anthropic-specific 'compaction' content blocks don't leak to non-
  Anthropic backends that would reject them.
- streaming_iterator: precompute will_merge_into_held so we don't pass
  applied_edits into the translator when the resulting processed_chunk
  will be discarded by the held stop-reason merge path.
- streaming_iterator: augment processed_chunk with iterations usage in
  the holding_chunk branch (sync and async) for parity with the other
  emission branches; ensures usage.iterations is attached on the rare
  message_delta-reaches-holding_chunk path.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic): correct streaming usage iteration + translate tools for token counting

- streaming_iterator: skip the trailing "message" iteration entry in the
  final message_delta when the held stop_reason chunk carries placeholder
  zero usage (no separate usage chunk arrived). Reporting zero tokens was
  misleading and inconsistent with the non-streaming path which always
  has real usage data.
- streaming_iterator: drop two redundant type checks inside branches
  that are already guarded by an outer message_delta type check.
- compact._count_effective_tokens: translate Anthropic-shaped tools
  (input_schema) to OpenAI shape before passing to litellm.token_counter
  so threshold checks aren't skewed by tokenizer paths that expect the
  OpenAI tool wrapper.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* Fix lint

* fix(anthropic): plug content drop, compaction SSE shape, and compaction leak

- Sync streaming __next__ no longer drops a buffered holding_chunk when
  the usage-merge path has already fired. Restoring the prior unconditional
  flush behavior preserves provider-emitted content (the SSE-ordering nit
  of a trailing content delta is preferable to silent content loss).
- compaction content_block_start now carries the full block shape
  ({"type": "compaction", "content": ""}) to match the text-block
  pattern and Anthropic's native streaming shape, so clients that key off
  content_block_start see the field.
- apply_compact_20260112 now slices around / strips compaction blocks
  before the opt-in gate check. Previously, when summary_model was not
  configured the editor returned the raw messages, leaking Anthropic-only
  compaction content blocks to non-Anthropic providers that reject them.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic): resolve mypy types in context management polyfill

Use AppliedEdit and CompactionBlock consistently in the dispatcher and streaming adapter.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(anthropic): flush held content chunk in async streaming path

Mirror the sync __next__ behavior: always flush a buffered
holding_chunk after the stream ends, even when usage was already
merged + emitted. Previously the async __anext__ kept the flush
inside the 'if not self.queued_usage_chunk:' guard, silently
dropping the last content delta on the proxy's primary path.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic adapter): correct sync streaming, surface polyfill failures, decouple sync path from proxy router

- translate_completion_output_params_streaming: add is_async flag so the
  sync handler returns Iterator[bytes] instead of an unusable
  AsyncIterator. Async callers keep the existing behavior via the
  default is_async=True.
- _run_polyfill_if_enabled: when the polyfill crashes and the spec
  requested non-compact edits (e.g. clear_tool_uses_20250919), raise an
  AnthropicContextManagementError instead of silently returning None so
  those edits are not dropped without an error surface. The
  compaction-block-slicing safety net remains for compact-only specs.
- anthropic_messages_handler (sync): stop auto-attaching the proxy
  llm_router. run_async_function bridges to a new thread's event loop;
  reusing the proxy's loop-bound httpx clients there causes
  'Event loop is closed' errors. The summary editor falls back to
  litellm.acompletion when llm_router is None.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix: address bug detection findings in token counter and streaming iterator

- token_counter: guard against non-dict 'function' field in tool dicts
  and skip tools missing a name to avoid emitting 'type None = ...' which
  would produce inaccurate token counts.
- streaming_iterator: change sync __next__ generic-error path to raise
  StopIteration (was StopAsyncIteration), so sync iteration cleanly stops.
- streaming_iterator: centralize context_management attachment so the
  held-stop_reason direct-flush path defensively re-attaches applied_edits
  to match the merge path's guarantee.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* Fix lint

* fix: correct COMPACT_MIN_TRIGGER_TOKENS to 50_000

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* Fix lint

* Fix lint

* Fix lint

* fix(compact): reduce to last user question when summary_model not configured but prior compaction block exists

Aligns the summary_model_not_configured path with the under-threshold and
client-compaction-block paths, which both reduce post-compaction messages
to just the latest user question so the downstream provider doesn't get
the summary on system prefix AND the full post-compaction history.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(compact): forward caller system prompt to summary model call

The default summarization instructions reference "the initial task above"
and "the raw history above", but the system prompt that holds that task
was not being forwarded to the summary model. The summary call now
prepends an OpenAI-shaped system message translated from the original
Anthropic-shaped system (str or content-block list) so the summarizer
has the agent role and initial task in scope.

* fix(compact_20260112): set default max_tokens and merge prompt when last turn is user

- Set COMPACT_SUMMARY_MAX_TOKENS default for the summary call so providers
  like Anthropic (which require max_tokens) don't silently fail and degrade
  to summary_call_failed.
- When the trailing translated message is already a user turn, merge the
  summarization prompt into it instead of appending a second user turn.
  Avoids consecutive role=user messages that strict providers reject.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic adapter): move current_content_block_start to __init__

Move the default TextBlock dict from a class-level attribute to __init__ so
concurrent stream instances don't share the same mutable dict. The class-level
default could be mutated in-place via tool_block['name'] = original_name in
_should_start_new_content_block, leaking state across streams. This mirrors
the existing fix already applied to chunk_queue.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(compact_20260112): surface error states + strip tool_result blocks in last user question

applied_edits_for_response() now includes compact_20260112 edits that
carry an error field (summary_model_not_configured, summary_call_failed,
summary_extraction_failed) so clients and operators can see why
compaction was requested but not applied.

_select_last_user_question() now strips tool_result blocks from mixed
[tool_result, text] turns rather than passing them through as-is. After
compaction the paired tool_use assistant turn no longer exists, so
forwarding tool_result blocks translates to orphaned role=tool messages
on non-Anthropic providers and produces a 400.

* fix(compact_20260112): carry prior compaction summary into Phase C summary call

When a request already contains a compaction block, Phase A slices
`effective_messages` to the turns since that block. Previously Phase C
passed the original `system` to the summary model, so multi-round
compaction silently dropped accumulated history each time the polyfill
fired. Pass `augmented_system` (original system + prior summary
prefix) so the summary model can produce a comprehensive summary that
incorporates both the prior round's context and the current slice.
`summarized_system` for the downstream call stays built from the
original `system` + new `summary_text`.

* refactor: delegate handler spec normalization to dispatcher

_normalize_spec_edits in adapters/handler.py duplicated the spec-shape
normalization already implemented by _normalize_spec in
context_management/dispatcher.py. The two could drift: a change in one
(e.g. supporting a new spec shape) without the other would cause the
handler's polyfill_will_run prediction to disagree with the
dispatcher's actual behavior, breaking the client-history pre-processing
skip.

Have the handler delegate to the dispatcher's _normalize_spec while
keeping handler-specific concerns (drop_params short-circuit, swallow
mapping exceptions) at the wrapper level.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(compact_20260112): surface warning-only applied edits in response

`applied_edits_for_response()` previously hid `compact_20260112` edits when
they had only warnings (no compaction block, no error). This dropped
diagnostically important warnings such as
`unsupported_trigger_type_X_using_input_tokens` and
`pause_after_compaction_ignored` whenever the conversation was under the
trigger threshold. Operators now see these warnings in the response.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix: address two low-severity context_management edge cases

- streaming_iterator: keep `sent_content_block_finish` in sync with the
  compaction block's emitted start/delta/stop lifecycle and reset it when
  the next text block's start is queued.
- bedrock _map_context_management_param: match dispatcher `_normalize_spec`
  behavior — only run the OpenAI→Anthropic mapper on list inputs; pass
  dict inputs through unchanged so already-Anthropic-format values aren't
  silently dropped.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(compact_20260112): use beta-header constant; require type discriminator; skip sync bridge when idle

- bedrock: replace hardcoded "compact-2026-01-12" beta string with
  ANTHROPIC_BETA_HEADER_VALUES.COMPACT_2026_01_12.value in both
  Converse (_filter_context_management_for_bedrock_converse) and Invoke
  (anthropic_claude3) compact-edit handlers.
- types: mark the "type" discriminator as Required[...] on the new
  CompactionBlock and UsageIteration TypedDicts so the discriminator
  is not silently optional under total=False.
- adapters/handler: short-circuit the sync /v1/messages adapter path
  before spawning the run_async_function worker-thread event loop when
  the request has no context_management spec and no client-sent
  compaction block in the message history.

Test plan:
- uv run pytest tests/test_litellm/llms/anthropic/experimental_pass_through/     tests/test_litellm/llms/bedrock/test_converse_context_management.py -q
  (370 + 10 = 380 passed)
- uv run pytest tests/test_litellm/llms/azure_ai/claude/test_azure_anthropic_transformation.py     tests/test_litellm/llms/vertex_ai/vertex_ai_partner_models/anthropic/test_vertex_ai_partner_models_anthropic_transformation.py     -k compact (3 passed)

* fix(compact_20260112): include system prompt tokens in threshold check

The threshold check in Phase B previously counted only message tokens and
the compaction-block content, omitting the system prompt entirely. When
the system carried a prior compaction summary (via _augment_system_with_summary)
or was otherwise large, the threshold could fire later than intended,
allowing the conversation to exceed the model's context window before
compaction activated.

_count_effective_tokens now also counts the (augmented) system prompt
text. The caller passes compaction_block=None when augmented_system
already includes the prior summary, to avoid double-counting.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* Fix SSE ordering and compaction state machine bugs in AnthropicStreamWrapper

- Suppress holding_chunk flush after final message_delta has been emitted
  (queued_usage_chunk == True) so a trailing content_block_delta cannot
  follow message_delta, which strict Anthropic SDK clients may reject.
  When usage has not yet been merged, flush the holding_chunk *before*
  the held stop_reason chunk so SSE ordering remains correct.

- Replace _queue_compaction_block_events with _next_compaction_event,
  emitting the compaction start/delta/stop events one at a time. The
  state machine flags (sent_content_block_finish) and content block
  index now advance atomically with the terminal stop event actually
  being returned to the caller, eliminating the transient inconsistent
  state where flags say the block is finished while its stop event is
  still buffered.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(compact_20260112): enforce parent key/team allowlist on summary model

The compact_20260112 polyfill summary subrequest used llm_router.acompletion
directly, bypassing the proxy auth checks that gate model access for the
parent key/team. A caller whose key/team was not authorized for the
configured context_management_summary_model could still cause the proxy to
invoke that model and return its output as a compaction block.

Pull the parent's UserAPIKeyAuth out of litellm_metadata in the handler,
thread it through the dispatcher into apply_compact_20260112, and gate the
summary call on _can_object_call_model for both key-level and team-level
allowlists. Failures land as applied_edits[0].error =
summary_model_access_denied without raising. SDK callers (no UserAPIKeyAuth)
remain unaffected.

* fix(compact_20260112): distinguish access-denied from transient errors; greedy summary regex

- _check_summary_model_access now catches ProxyException explicitly for access
  denials and logs unexpected exceptions separately. Both still fail closed,
  but operators can now tell a denied key/team apart from a router internal
  raising during the check.
- _SUMMARY_TAG_RE switches from non-greedy to greedy so a stray </summary>
  inside the model's summary content no longer silently truncates the
  captured text.

* fix(compact_20260112): type object_type as Literal for mypy

* fix(compact_20260112): attribute summary subcall spend to parent key/team

The compact_20260112 polyfill summary subrequest propagated metadata via
the Anthropic-shape `metadata` parameter, which only carries `user_id`.
The proxy auth fields used for spend attribution (`user_api_key`,
`user_api_key_team_id`, `litellm_call_id`, ...) live in
`data["litellm_metadata"]`. As a result, summary subcalls landed on the
router with an empty propagated metadata and the resulting tokens were
not attributed to the caller's key/team budget.

Rename the polyfill chain's spend-propagation parameter to
`litellm_metadata` and pull it from `kwargs["litellm_metadata"]` in
both the async and sync handlers, so the post-call hooks see the parent
key/team and bill the summary tokens accordingly. Add an
`_extract_proxy_litellm_metadata` helper and refactor
`_extract_user_api_key_auth` to use it.

* chore(anthropic adapters): remove unused _extract_user_api_key_auth helper

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* chore(compact_20260112): non-greedy summary regex; use COMPACT_EDIT_TYPE in bedrock filter

- Make _SUMMARY_TAG_RE non-greedy so a response with multiple <summary>
  blocks captures only the first complete block.
- Replace the hardcoded 'compact_20260112' literal in
  _filter_context_management_for_bedrock_converse with the shared
  COMPACT_EDIT_TYPE constant.

* fix: bug fixes from PR review

- streaming_iterator: don't set sent_content_block_finish during compaction
  block lifecycle; that flag tracks the regular text/tool_use/thinking block
  state machine, conflating the two leaks bad state to introspection paths.
- compact._call_summary_model: send propagated proxy auth/spend-attribution
  fields as 'litellm_metadata' instead of 'metadata' so the router's post-call
  hooks attribute summary tokens to the caller's key/team budget.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(anthropic-streaming): insert content_block_stop between held delta and final message_delta

When the stream exhausts with both `holding_chunk` (a content_block_delta)
and `holding_stop_reason_chunk` (a message_delta) buffered, the after-loop
cleanup previously emitted them back-to-back, producing the invalid
Anthropic SSE sequence `content_block_delta -> message_delta`. Insert a
`content_block_stop` between them in both the sync `__next__` and async
`__anext__` paths so the emitted ordering remains
`content_block_delta -> content_block_stop -> message_delta`.

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* fix(compact_20260112): propagate allowed_model_region to summary subrequest

The router enforces region restrictions by reading allowed_model_region
from top-level request kwargs (Router._common_checks_available_deployment),
but the compact_20260112 summary subrequest only forwarded litellm_metadata.
A region-restricted caller could trigger compaction and have their conversation
summarized by a deployment outside the permitted region.

Extract allowed_model_region from user_api_key_auth and pass it through
_call_summary_model as a top-level kwarg so the router applies the same
region constraints the parent request would.

* fix(anthropic adapter): emit content_block_stop before held message_delta in drain paths

Co-authored-by: Yassin Kortam <yassin@berri.ai>

* feat(context_management): configurable summary max_tokens; surface ignored knobs

- compact_20260112: read summary max_tokens from general_settings
  (context_management_summary_max_tokens) so operators can fit the
  chosen summary model's output budget; falls back to the compiled
  default for missing or invalid values.

- clear_tool_uses_20250919: log unsupported knobs at warning level
  (was debug, which silently dropped misconfiguration) and surface
  them as warnings on the AppliedEdit so clients see what was ignored.

* fix(compact_20260112): bound _call_summary_model with timeout

A slow or unresponsive summary model previously hung the parent
/v1/messages request with no escape hatch. Pass a 60s timeout on the
litellm.acompletion / llm_router.acompletion subrequest; on timeout the
existing summary_call_failed path forwards the request without
compaction rather than blocking indefinitely.

* fix(compact_20260112): preserve post-compaction tail on slice-only path

When a prior compaction block is present and the request is under threshold,
the polyfill was reducing downstream messages to just the latest user
question. The prior summary only covers turns before the compaction block,
so dropping the post-compaction tail silently lost recent context — a
multi-turn conversation that stayed below the threshold would arrive at the
model with no memory of any turn after the prior compaction.

Forward the already-stripped post-compaction tail unchanged on both the
under-threshold path and apply_client_compaction_block_history. Fall
back to _select_last_user_question only when the strip leaves nothing
for the downstream call to answer.

* fix(compact_20260112): enforce user/project/team-member model scopes on summary subrequest

The local gate previously only checked the parent key's and team's
allowed-model lists. A caller restricted by a personal user, project,
or per-team-member allowed_models scope could still trigger the
configured summary model and receive its <summary> output as a
compaction block, because llm_router.acompletion bypasses the proxy
common_checks path. Extend _check_summary_model_access to also load
the user_object, project_object, and team_membership and run the
matching allowlist check at each scope before invoking the summary
model.

* fix(compact_20260112): enforce summary model per-model budget and propagate budget metadata

* fix(compact_20260112): forward post-compaction tail when summary model unconfigured

* fix(anthropic endpoints): run failure hook on 500-level context management errors

* fix(compact_20260112): enforce summary model rate limit before summary call

* fix(compact_20260112): propagate end-user/project budget scope to summary call

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Yassin Kortam <yassin@berri.ai>
Co-authored-by: mateo-berri <277851410+mateo-berri@users.noreply.github.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Sameer Kankute 2026-05-30 21:50:05 +05:30 committed by GitHub
parent 3be3c1dea1
commit 4cc3dd7aad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 6201 additions and 101 deletions

View File

@ -10,7 +10,7 @@ This is an __init__.py file to allow the following interface
"""
from typing import Any, AsyncIterator, Coroutine, Dict, List, Optional, Union
from typing import Any, AsyncIterator, Coroutine, Dict, Iterator, List, Optional, Union
from litellm.llms.anthropic.experimental_pass_through.messages.handler import (
anthropic_messages as _async_anthropic_messages,
@ -100,8 +100,11 @@ def create(
**kwargs,
) -> Union[
AnthropicMessagesResponse,
Iterator[bytes],
AsyncIterator[Any],
Coroutine[Any, Any, Union[AnthropicMessagesResponse, AsyncIterator[Any]]],
Coroutine[
Any, Any, Union[AnthropicMessagesResponse, AsyncIterator[Any], Iterator[bytes]]
],
]:
"""
Async wrapper for Anthropic's messages API

View File

@ -772,11 +772,29 @@ def _format_function_definitions(tools):
lines.append("namespace functions {")
lines.append("")
for tool in tools:
if not isinstance(tool, dict):
continue
function = tool.get("function")
if not isinstance(function, dict):
# Anthropic tool shape → OpenAI function dict for token counting.
params = tool.get("input_schema") or tool.get("parameters") or {}
if not isinstance(params, dict):
params = {}
function = {
"name": tool.get("name"),
"description": tool.get("description"),
"parameters": params,
}
function_name = function.get("name")
if not function_name:
# Skip malformed tools missing a name to avoid emitting
# ``type None = ...`` which would produce inaccurate token counts.
continue
if function_description := function.get("description"):
lines.append(f"// {function_description}")
function_name = function.get("name")
parameters = function.get("parameters", {})
parameters = function.get("parameters") or {}
if not isinstance(parameters, dict):
parameters = {}
properties = parameters.get("properties")
if properties and properties.keys():
lines.append(f"type {function_name} = (_: {{")

View File

@ -4,6 +4,7 @@ from typing import (
AsyncIterator,
Coroutine,
Dict,
Iterator,
List,
Optional,
Tuple,
@ -12,9 +13,16 @@ from typing import (
)
import litellm
from litellm._logging import verbose_logger
from litellm.litellm_core_utils.asyncify import run_async_function
from litellm.llms.anthropic.experimental_pass_through.adapters.transformation import (
AnthropicAdapter,
)
from litellm.llms.anthropic.experimental_pass_through.context_management import (
AnthropicContextManagementError,
PolyfillResult,
apply_context_management,
)
from litellm.llms.anthropic.experimental_pass_through.utils import (
is_reasoning_auto_summary_enabled,
)
@ -28,15 +36,266 @@ if TYPE_CHECKING:
pass
# Anthropic-only fields that the translator above already maps into the
# OpenAI-format completion_kwargs (output_config → reasoning_effort /
# response_format, etc.). They must be filtered out of the raw
# extra_kwargs re-merge below or non-Anthropic backends reject the call
# with 400 "Extra inputs are not permitted". Add new entries here when
# extending AnthropicMessagesRequestOptionalParams with another Anthropic-
# specific key.
# Anthropic-only keys already mapped by the translator; strip on extra_kwargs re-merge.
ANTHROPIC_ONLY_REQUEST_KEYS: frozenset[str] = frozenset({"output_config"})
def _messages_have_compaction_block(messages: List[Dict]) -> bool:
"""Return True when any message carries a ``compaction`` content block."""
for msg in messages:
content = msg.get("content")
if not isinstance(content, list):
continue
for block in content:
if isinstance(block, dict) and block.get("type") == "compaction":
return True
return False
def _extract_proxy_litellm_metadata(kwargs: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Return ``kwargs["litellm_metadata"]`` when it's a dict; ``None`` otherwise.
The proxy attaches its auth/spend-attribution fields (``user_api_key``,
``user_api_key_team_id``, ``litellm_call_id``, the full ``UserAPIKeyAuth``
object under ``user_api_key_auth``, ...) to ``data["litellm_metadata"]``
for ``/v1/messages`` (see
``LiteLLMProxyRequestSetup.add_user_api_key_auth_to_request_metadata`` and
``LITELLM_METADATA_ROUTES``). The Anthropic-shape ``metadata`` arg only
carries ``user_id`` and must not be conflated. Returns ``None`` for SDK
callers that bypass the proxy entirely.
"""
litellm_metadata = kwargs.get("litellm_metadata")
if not isinstance(litellm_metadata, dict):
return None
return litellm_metadata
async def _prepare_context_managed_request(
*,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]],
system: Optional[Any],
context_management_spec: Any,
litellm_metadata: Optional[Dict],
drop_params: Optional[bool],
llm_router: Any,
user_api_key_auth: Any = None,
) -> Optional[PolyfillResult]:
"""Apply client compaction history, then optional context_management polyfill."""
from litellm.llms.anthropic.experimental_pass_through.context_management.editors.compact import (
apply_client_compaction_block_history,
)
# Skip the client-history pre-processing when a ``compact_20260112``
# polyfill spec will run: that editor already slices around any client-sent
# compaction block in its Phase A (and uses the full post-compaction tail
# for its token-threshold check). Pre-collapsing to just the latest user
# question here would starve the polyfill of conversation context and
# silently drop intermediate turns.
polyfill_will_run = _polyfill_will_run(
context_management_spec=context_management_spec,
drop_params=drop_params,
)
if polyfill_will_run:
history_result: Optional[PolyfillResult] = None
working_messages: List[Dict] = messages
working_system: Optional[Any] = system
else:
history_result = apply_client_compaction_block_history(
messages=cast(List[Dict[str, Any]], messages),
system=system,
)
working_messages = (
history_result.messages if history_result is not None else messages
)
working_system = history_result.system if history_result is not None else system
polyfill_result = await _run_polyfill_if_enabled(
model=model,
messages=working_messages,
tools=tools,
system=working_system,
context_management_spec=context_management_spec,
litellm_metadata=litellm_metadata,
drop_params=drop_params,
llm_router=llm_router,
user_api_key_auth=user_api_key_auth,
)
if polyfill_result is not None:
return polyfill_result
# Safety net: if we skipped client-history pre-processing because a
# ``compact_20260112`` polyfill was expected to handle the compaction
# block itself but the polyfill ultimately did not produce a result
# (e.g. it crashed and was best-effort swallowed in
# ``_run_polyfill_if_enabled``), apply the slice-only fallback now so
# Anthropic-specific ``compaction`` content blocks don't leak through
# to non-Anthropic backends that would reject them.
if polyfill_will_run and history_result is None:
history_result = apply_client_compaction_block_history(
messages=cast(List[Dict[str, Any]], messages),
system=system,
)
return history_result
def _polyfill_will_run(
*,
context_management_spec: Any,
drop_params: Optional[bool],
) -> bool:
"""Return True when ``compact_20260112`` will run via the polyfill dispatcher.
Mirrors the gating in ``_run_polyfill_if_enabled``: an empty spec or
effective ``drop_params`` short-circuits the polyfill. The pre-processing
skip only applies when the dispatcher will actually invoke
``apply_compact_20260112`` (which has its own compaction-block slicing).
"""
edits = _normalize_spec_edits(
context_management_spec=context_management_spec,
drop_params=drop_params,
)
if edits is None:
return False
from litellm.llms.anthropic.experimental_pass_through.context_management.constants import (
COMPACT_EDIT_TYPE,
)
return any(
isinstance(edit, dict) and edit.get("type") == COMPACT_EDIT_TYPE
for edit in edits
)
def _spec_has_non_compact_edits(
*,
context_management_spec: Any,
drop_params: Optional[bool],
) -> bool:
"""Return True when the spec includes edits other than ``compact_20260112``.
Used to decide whether a polyfill failure can be silently swallowed
(compact-only specs have a safe compaction-block slicing fallback) or
must be surfaced (other editors like ``clear_tool_uses_20250919`` have
no slice-only fallback and would otherwise be dropped without notice).
"""
edits = _normalize_spec_edits(
context_management_spec=context_management_spec,
drop_params=drop_params,
)
if edits is None:
return False
from litellm.llms.anthropic.experimental_pass_through.context_management.constants import (
COMPACT_EDIT_TYPE,
)
return any(
isinstance(edit, dict)
and isinstance(edit.get("type"), str)
and edit.get("type") != COMPACT_EDIT_TYPE
for edit in edits
)
def _normalize_spec_edits(
*,
context_management_spec: Any,
drop_params: Optional[bool],
) -> Optional[List[Dict[str, Any]]]:
"""Return the normalized ``edits`` list, or ``None`` if the polyfill won't run.
Delegates spec-shape normalization to the dispatcher's ``_normalize_spec``
so the prediction here can't drift from what the dispatcher actually does.
"""
if not context_management_spec:
return None
effective_drop_params = (
drop_params if drop_params is not None else litellm.drop_params
)
if effective_drop_params:
return None
from litellm.llms.anthropic.experimental_pass_through.context_management.dispatcher import (
_normalize_spec,
)
try:
return _normalize_spec(context_management_spec)
except Exception:
return None
async def _run_polyfill_if_enabled(
*,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]],
system: Optional[Any],
context_management_spec: Any,
litellm_metadata: Optional[Dict],
drop_params: Optional[bool],
llm_router: Any,
user_api_key_auth: Any = None,
) -> Optional[PolyfillResult]:
"""Run the async context_management polyfill if a spec is present.
Returns ``None`` when the spec is empty or drop_params is on. Raises
``AnthropicContextManagementError`` so the /v1/messages endpoint can
emit an Anthropic-format 400. All other exceptions are best-effort
swallowed (matches v0 behavior).
"""
if not context_management_spec:
return None
effective_drop_params = (
drop_params if drop_params is not None else litellm.drop_params
)
if effective_drop_params:
return None
try:
return await apply_context_management(
model=model,
messages=messages,
tools=tools,
system=system,
context_management_spec=context_management_spec,
litellm_metadata=litellm_metadata,
llm_router=llm_router,
user_api_key_auth=user_api_key_auth,
)
except AnthropicContextManagementError:
# Surface validation errors so the endpoint can emit an Anthropic-format
# 400. Other exception types fall into the best-effort branch below.
raise
except Exception as e:
verbose_logger.exception(
"context_management polyfill: skipping edits due to error: %s", e
)
# Best-effort swallow is only safe for compact-only specs, where the
# caller's compaction-block-slicing safety net produces a correct
# (if degraded) result. When the spec also requested non-compact
# edits (e.g. ``clear_tool_uses_20250919``), the safety net does
# NOT re-run those editors, so silently returning ``None`` would
# drop them with no error surface. Raise instead so the endpoint
# emits an Anthropic-format error.
if _spec_has_non_compact_edits(
context_management_spec=context_management_spec,
drop_params=drop_params,
):
raise AnthropicContextManagementError(
status_code=500,
message=f"context_management polyfill failed: {e}",
) from e
return None
########################################################
# init adapter
ANTHROPIC_ADAPTER = AnthropicAdapter()
@ -163,7 +422,7 @@ class LiteLLMMessagesToCompletionTransformationHandler:
metadata: Optional[Dict] = None,
stop_sequences: Optional[List[str]] = None,
stream: Optional[bool] = False,
system: Optional[str] = None,
system: Optional[Union[str, List[Dict[str, Any]]]] = None,
temperature: Optional[float] = None,
thinking: Optional[Dict] = None,
tool_choice: Optional[Dict] = None,
@ -307,19 +566,56 @@ class LiteLLMMessagesToCompletionTransformationHandler:
top_p: Optional[float] = None,
output_format: Optional[Dict] = None,
**kwargs,
) -> Union[AnthropicMessagesResponse, AsyncIterator]:
) -> Union[AnthropicMessagesResponse, AsyncIterator[Any], Iterator[bytes]]:
"""Handle non-Anthropic models asynchronously using the adapter"""
context_management = kwargs.pop("context_management", None)
drop_params: Optional[bool] = kwargs.get("drop_params", None)
litellm_router = kwargs.pop("litellm_router", None)
if litellm_router is None:
try:
from litellm.proxy.proxy_server import llm_router as _proxy_router
litellm_router = _proxy_router
except Exception:
pass
proxy_litellm_metadata = _extract_proxy_litellm_metadata(kwargs)
user_api_key_auth = (
proxy_litellm_metadata.get("user_api_key_auth")
if proxy_litellm_metadata is not None
else None
)
polyfill_result = await _prepare_context_managed_request(
model=model,
messages=messages,
tools=tools,
system=system,
context_management_spec=context_management,
litellm_metadata=proxy_litellm_metadata,
drop_params=drop_params,
llm_router=litellm_router,
user_api_key_auth=user_api_key_auth,
)
effective_messages = (
polyfill_result.messages if polyfill_result is not None else messages
)
effective_system = (
polyfill_result.system if polyfill_result is not None else system
)
(
completion_kwargs,
tool_name_mapping,
) = LiteLLMMessagesToCompletionTransformationHandler._prepare_completion_kwargs(
max_tokens=max_tokens,
messages=messages,
messages=effective_messages,
model=model,
metadata=metadata,
stop_sequences=stop_sequences,
stream=stream,
system=system,
system=effective_system,
temperature=temperature,
thinking=thinking,
tool_choice=tool_choice,
@ -338,6 +634,8 @@ class LiteLLMMessagesToCompletionTransformationHandler:
completion_response,
model=model,
tool_name_mapping=tool_name_mapping,
polyfill_result=polyfill_result,
is_async=True,
)
)
if transformed_stream is not None:
@ -347,6 +645,7 @@ class LiteLLMMessagesToCompletionTransformationHandler:
anthropic_response = ANTHROPIC_ADAPTER.translate_completion_output_params(
cast(ModelResponse, completion_response),
tool_name_mapping=tool_name_mapping,
polyfill_result=polyfill_result,
)
if anthropic_response is not None:
return anthropic_response
@ -372,8 +671,13 @@ class LiteLLMMessagesToCompletionTransformationHandler:
**kwargs,
) -> Union[
AnthropicMessagesResponse,
Iterator[bytes],
AsyncIterator[Any],
Coroutine[Any, Any, Union[AnthropicMessagesResponse, AsyncIterator[Any]]],
Coroutine[
Any,
Any,
Union[AnthropicMessagesResponse, AsyncIterator[Any], Iterator[bytes]],
],
]:
"""Handle non-Anthropic models using the adapter."""
if _is_async is True:
@ -395,17 +699,72 @@ class LiteLLMMessagesToCompletionTransformationHandler:
**kwargs,
)
# Run the context_management polyfill on the sync path too so that
# ``litellm.messages.create()`` callers don't silently lose edits like
# ``clear_tool_uses_20250919``. The dispatcher is async (so the
# ``compact_20260112`` editor can ``await`` the summarization model);
# bridge to it via ``run_async_function``.
context_management = kwargs.pop("context_management", None)
drop_params: Optional[bool] = kwargs.get("drop_params", None)
# Deliberately do NOT auto-attach the proxy ``llm_router`` here:
# ``run_async_function`` spawns a new event loop in a worker thread
# to bridge to the async dispatcher, but the proxy router's httpx
# ``AsyncClient`` instances are bound to the proxy's main event loop.
# Reusing them from the new thread's loop violates httpx's single-loop
# invariant and can raise ``RuntimeError: Event loop is closed`` or
# produce stalled connections. The summary editor falls back to
# ``litellm.acompletion`` (which creates a fresh client per call) when
# ``llm_router`` is ``None``, which is safe to call from the bridged
# loop. The async ``async_anthropic_messages_handler`` path is
# unaffected because it ``await``s within the original event loop.
litellm_router = kwargs.pop("litellm_router", None)
# Skip the async bridge entirely when there is nothing for either the
# polyfill or the client-history slice-only fallback to do. The vast
# majority of sync ``litellm.messages.create()`` requests carry no
# ``context_management`` spec and no client-sent ``compaction`` block,
# and bridging through a worker-thread event loop just to discover
# there is no work is pure overhead.
if context_management is None and not _messages_have_compaction_block(messages):
polyfill_result: Optional[PolyfillResult] = None
else:
proxy_litellm_metadata = _extract_proxy_litellm_metadata(kwargs)
user_api_key_auth = (
proxy_litellm_metadata.get("user_api_key_auth")
if proxy_litellm_metadata is not None
else None
)
polyfill_result = run_async_function(
_prepare_context_managed_request,
model=model,
messages=messages,
tools=tools,
system=system,
context_management_spec=context_management,
litellm_metadata=proxy_litellm_metadata,
drop_params=drop_params,
llm_router=litellm_router,
user_api_key_auth=user_api_key_auth,
)
effective_messages = (
polyfill_result.messages if polyfill_result is not None else messages
)
effective_system = (
polyfill_result.system if polyfill_result is not None else system
)
(
completion_kwargs,
tool_name_mapping,
) = LiteLLMMessagesToCompletionTransformationHandler._prepare_completion_kwargs(
max_tokens=max_tokens,
messages=messages,
messages=effective_messages,
model=model,
metadata=metadata,
stop_sequences=stop_sequences,
stream=stream,
system=system,
system=effective_system,
temperature=temperature,
thinking=thinking,
tool_choice=tool_choice,
@ -424,6 +783,8 @@ class LiteLLMMessagesToCompletionTransformationHandler:
completion_response,
model=model,
tool_name_mapping=tool_name_mapping,
polyfill_result=polyfill_result,
is_async=False,
)
)
if transformed_stream is not None:
@ -433,6 +794,7 @@ class LiteLLMMessagesToCompletionTransformationHandler:
anthropic_response = ANTHROPIC_ADAPTER.translate_completion_output_params(
cast(ModelResponse, completion_response),
tool_name_mapping=tool_name_mapping,
polyfill_result=polyfill_result,
)
if anthropic_response is not None:
return anthropic_response

View File

@ -3,11 +3,26 @@
import json
import traceback
from collections import deque
from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, Iterator, Literal, Optional
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Dict,
Iterator,
List,
Literal,
Optional,
)
from litellm import verbose_logger
from litellm._logging import verbose_logger
from litellm._uuid import uuid
from litellm.types.llms.anthropic import UsageDelta
from litellm.types.llms.anthropic import (
AppliedEdit,
CompactionBlock,
ContextManagementResponse,
UsageDelta,
UsageIteration,
)
from litellm.types.utils import AdapterCompletionStreamWrapper
if TYPE_CHECKING:
@ -37,22 +52,208 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
holding_stop_reason_chunk: Optional[Any] = None
queued_usage_chunk: bool = False
current_content_block_index: int = 0
current_content_block_start: ContentBlockContentBlockDict = TextBlock(
type="text",
text="",
)
chunk_queue: deque = deque() # Queue for buffering multiple chunks
def __init__(
self,
completion_stream: Any,
model: str,
tool_name_mapping: Optional[Dict[str, str]] = None,
applied_edits: Optional[List[AppliedEdit]] = None,
compaction_block: Optional[CompactionBlock] = None,
iterations_usage: Optional[List[UsageIteration]] = None,
):
super().__init__(completion_stream)
self.model = model
# Mapping of truncated tool names to original names (for OpenAI's 64-char limit)
self.tool_name_mapping = tool_name_mapping or {}
# Polyfill applied_edits on final message_delta.
self.applied_edits: List[AppliedEdit] = list(applied_edits or [])
# Synthesized compaction block from compact_20260112 polyfill (streaming).
self.compaction_block = compaction_block
self.iterations_usage = iterations_usage
self.sent_compaction_block: bool = False
# Per-phase flags so the compaction block's start/delta/stop events
# are emitted (and the public state machine is advanced) in
# lock-step with the caller actually consuming each event. Pre-
# queuing all three would set ``sent_content_block_finish=True``
# before the client received ``content_block_stop``, leaving the
# observable state inconsistent during the drain window.
self.sent_compaction_block_start: bool = False
self.sent_compaction_block_delta: bool = False
# Per-instance queue for buffering multiple chunks. Must be initialized
# here (not at class level) so concurrent streams don't share the same
# deque and corrupt each other's SSE event order.
self.chunk_queue: deque = deque()
# Per-instance default content block. Must be initialized here (not at
# class level) so concurrent streams don't share the same mutable dict
# — `_should_start_new_content_block` mutates `tool_block["name"]` in
# place, which would otherwise leak across streams.
self.current_content_block_start: (
"AnthropicStreamWrapper.ContentBlockContentBlockDict"
) = self.TextBlock(
type="text",
text="",
)
def _merge_usage_into_held_stop_reason_chunk(self, chunk: Any) -> Dict[str, Any]:
"""Merge usage data from ``chunk`` into the held ``message_delta`` chunk.
Shared by both the sync ``__next__`` and async ``__anext__`` paths so
the subtle hold-and-merge logic (cache tokens, ``context_management``
attachment, ``UsageDelta`` shape) lives in exactly one place.
Caller is responsible for managing ``self.holding_stop_reason_chunk``
and ``self.queued_usage_chunk`` state and for queuing the returned
merged chunk.
"""
assert self.holding_stop_reason_chunk is not None
merged_chunk = self.holding_stop_reason_chunk.copy()
if "delta" not in merged_chunk:
merged_chunk["delta"] = {}
uncached_input_tokens = chunk.usage.prompt_tokens or 0
if (
hasattr(chunk.usage, "prompt_tokens_details")
and chunk.usage.prompt_tokens_details
):
cached_tokens = (
getattr(chunk.usage.prompt_tokens_details, "cached_tokens", 0) or 0
)
uncached_input_tokens -= cached_tokens
usage_dict: UsageDelta = {
"input_tokens": uncached_input_tokens,
"output_tokens": chunk.usage.completion_tokens or 0,
}
if (
hasattr(chunk.usage, "_cache_creation_input_tokens")
and chunk.usage._cache_creation_input_tokens > 0
):
usage_dict["cache_creation_input_tokens"] = (
chunk.usage._cache_creation_input_tokens
)
if (
hasattr(chunk.usage, "_cache_read_input_tokens")
and chunk.usage._cache_read_input_tokens > 0
):
usage_dict["cache_read_input_tokens"] = chunk.usage._cache_read_input_tokens
merged_chunk["usage"] = usage_dict
if self.applied_edits and "context_management" not in merged_chunk:
merged_chunk["context_management"] = ContextManagementResponse(
applied_edits=list(self.applied_edits)
)
return self._augment_message_delta_usage(merged_chunk)
def _ensure_context_management_attached(
self, message_delta_chunk: Dict[str, Any]
) -> Dict[str, Any]:
"""Attach ``context_management`` to a ``message_delta`` chunk if
``self.applied_edits`` is non-empty and the chunk does not already
carry it. Returns the (possibly new) chunk dict.
Centralizing this guard ensures every ``message_delta`` emission
path (merge-with-usage and direct-flush-of-held) consistently
surfaces ``applied_edits`` to the client.
"""
if not self.applied_edits or "context_management" in message_delta_chunk:
return message_delta_chunk
augmented = message_delta_chunk.copy()
augmented["context_management"] = ContextManagementResponse(
applied_edits=list(self.applied_edits)
)
return augmented
def _augment_message_delta_usage(
self, message_delta_chunk: Dict[str, Any]
) -> Dict[str, Any]:
"""Attach polyfill compaction iteration usage to the final message_delta.
Also defensively re-attaches ``context_management`` so the direct
held-chunk flush path stays in sync with the merge path's guarantee
when ``self.applied_edits`` is non-empty.
"""
message_delta_chunk = self._ensure_context_management_attached(
message_delta_chunk
)
if self.iterations_usage is None:
return message_delta_chunk
usage = message_delta_chunk.get("usage")
if not isinstance(usage, dict) or "iterations" in usage:
return message_delta_chunk
input_tokens = usage.get("input_tokens", 0) or 0
output_tokens = usage.get("output_tokens", 0) or 0
augmented = message_delta_chunk.copy()
augmented_usage = dict(usage)
iterations: List[UsageIteration] = list(self.iterations_usage)
# Only emit a ``message`` iteration when we have real token data.
# Without a separate usage chunk (e.g. provider sent finish_reason
# alone), the held ``message_delta`` carries placeholder zeros from
# the translate step; reporting a zero-token iteration would be
# misleading and inconsistent with the non-streaming path.
if input_tokens > 0 or output_tokens > 0:
message_iteration: UsageIteration = {
"type": "message",
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
iterations.append(message_iteration)
augmented_usage["iterations"] = iterations # type: ignore[typeddict-unknown-key]
augmented["usage"] = augmented_usage
return augmented
def _next_compaction_event(self) -> Optional[Dict[str, Any]]:
"""Return the next compaction content-block SSE event, or ``None``.
Anthropic delivers compaction as a single delta (no token-by-token
streaming), but we still surface it as a proper
start delta stop trio. Each call returns exactly one event so
the state machine (``sent_content_block_finish``,
``current_content_block_index``) is advanced *only* when the
terminal stop event is actually handed back to the caller. This
prevents an observable window where the flags claim the block is
finished while the stop event is still buffered.
"""
if self.compaction_block is None or self.sent_compaction_block:
return None
compaction_index = self.current_content_block_index
if not self.sent_compaction_block_start:
self.sent_compaction_block_start = True
return {
"type": "content_block_start",
"index": compaction_index,
# Mirror the text-block shape ({"type": "text", "text": ""}):
# send an empty ``content`` field so clients that introspect
# ``content_block_start`` see the full block schema. The
# actual summary text arrives via the ``content_block_delta``
# below.
"content_block": {"type": "compaction", "content": ""},
}
if not self.sent_compaction_block_delta:
self.sent_compaction_block_delta = True
summary_content = self.compaction_block.get("content") or ""
return {
"type": "content_block_delta",
"index": compaction_index,
"delta": {"type": "compaction_delta", "content": summary_content},
}
stop_event = {
"type": "content_block_stop",
"index": compaction_index,
}
# Don't touch ``sent_content_block_finish`` here: that flag is the
# state machine for the regular text/tool_use/thinking block and is
# independent of the synthetic compaction block lifecycle. Conflating
# them would let outside observers (subclass overrides, introspection
# hooks, exception paths) see ``sent_content_block_finish=True``
# without any regular content block ever having started.
self._increment_content_block_index()
self.sent_compaction_block = True
return stop_event
def _create_initial_usage_delta(self) -> UsageDelta:
"""
@ -75,7 +276,7 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
cache_read_input_tokens=0,
)
def __next__(self):
def __next__(self): # noqa: PLR0915
from .transformation import LiteLLMAnthropicMessagesAdapter
try:
@ -103,8 +304,17 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
)
return self.chunk_queue.popleft()
if (
self.sent_compaction_block is False
and self.compaction_block is not None
):
compaction_event = self._next_compaction_event()
if compaction_event is not None:
return compaction_event
if self.sent_content_block_start is False:
self.sent_content_block_start = True
self.sent_content_block_finish = False
self.chunk_queue.append(
{
"type": "content_block_start",
@ -122,11 +332,45 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
if should_start_new_block:
self._increment_content_block_index()
# applied_edits only needs to flow to the final message_delta
# (when finish_reason is set); skip threading it through every
# intermediate chunk. For the hold-and-merge path below,
# context_management is attached directly to the merged chunk,
# so the translated ``processed_chunk`` would be discarded —
# skip the applied_edits attachment in that case to avoid
# allocating a throwaway ``MessageBlockDelta``.
will_merge_into_held = (
self.holding_stop_reason_chunk is not None
and getattr(chunk, "usage", None) is not None
)
is_final_chunk = chunk.choices[0].finish_reason is not None
processed_chunk = LiteLLMAnthropicMessagesAdapter().translate_streaming_openai_response_to_anthropic(
response=chunk,
current_content_block_index=self.current_content_block_index,
applied_edits=(
self.applied_edits
if is_final_chunk and not will_merge_into_held
else None
),
)
# Check if this is a usage chunk and we have a held stop_reason chunk
if will_merge_into_held:
merged_chunk = self._merge_usage_into_held_stop_reason_chunk(chunk)
self.chunk_queue.append(merged_chunk)
self.queued_usage_chunk = True
self.holding_stop_reason_chunk = None
return self.chunk_queue.popleft()
if self.queued_usage_chunk:
# Usage has already been merged + emitted. Any trailing
# provider events would violate Anthropic SSE ordering
# (no chunks may follow the final ``message_delta``), so
# silently drop them — matches the async ``__anext__``
# behavior where the block-handling logic is gated on
# ``not self.queued_usage_chunk``.
continue
if should_start_new_block and not self.sent_content_block_finish:
# Queue the sequence: content_block_stop -> content_block_start
# For text blocks the trigger chunk is not emitted as a separate
@ -178,20 +422,64 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
}
)
self.sent_content_block_finish = True
self.chunk_queue.append(processed_chunk)
if processed_chunk.get("delta", {}).get("stop_reason") is not None:
self.holding_stop_reason_chunk = processed_chunk
else:
processed_chunk = self._augment_message_delta_usage(
processed_chunk
)
self.chunk_queue.append(processed_chunk)
return self.chunk_queue.popleft()
elif self.holding_chunk is not None:
self.chunk_queue.append(self.holding_chunk)
if processed_chunk.get("type") == "message_delta":
processed_chunk = self._augment_message_delta_usage(
processed_chunk
)
self.chunk_queue.append(processed_chunk)
self.holding_chunk = None
return self.chunk_queue.popleft()
else:
if processed_chunk.get("type") == "message_delta":
processed_chunk = self._augment_message_delta_usage(
processed_chunk
)
self.chunk_queue.append(processed_chunk)
return self.chunk_queue.popleft()
# Handle any remaining held chunks after stream ends
if self.holding_chunk is not None:
self.chunk_queue.append(self.holding_chunk)
# Handle any remaining held chunks after stream ends. The
# buffered ``holding_chunk`` (a ``content_block_delta``) must
# precede the final ``message_delta`` so Anthropic SSE event
# ordering is preserved. When ``queued_usage_chunk`` is True,
# the final ``message_delta`` has already been emitted; any
# buffered content delta is dropped rather than emitted after
# ``message_delta`` (which would violate SSE ordering and may
# confuse strict Anthropic SDK clients).
if not self.queued_usage_chunk:
if self.holding_chunk is not None:
self.chunk_queue.append(self.holding_chunk)
self.holding_chunk = None
if self.holding_stop_reason_chunk is not None:
# A final ``message_delta`` must be preceded by
# ``content_block_stop`` so the emitted SSE stays in
# valid Anthropic order (... -> content_block_stop ->
# message_delta). Emit ``content_block_stop`` here if
# the active content block was not already closed.
if not self.sent_content_block_finish:
self.chunk_queue.append(
{
"type": "content_block_stop",
"index": self.current_content_block_index,
}
)
self.sent_content_block_finish = True
self.chunk_queue.append(
self._augment_message_delta_usage(
self.holding_stop_reason_chunk
)
)
self.holding_stop_reason_chunk = None
else:
self.holding_chunk = None
if not self.sent_last_message:
@ -205,6 +493,26 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
except StopIteration:
if self.chunk_queue:
return self.chunk_queue.popleft()
# Handle any held stop_reason chunk. Emit ``content_block_stop``
# first if the active content block was not already closed, so
# Anthropic SSE ordering is preserved (content_block_stop ->
# message_delta).
if self.holding_stop_reason_chunk is not None:
if not self.sent_content_block_finish:
self.sent_content_block_finish = True
self.chunk_queue.append(
self._augment_message_delta_usage(
self.holding_stop_reason_chunk
)
)
self.holding_stop_reason_chunk = None
return {
"type": "content_block_stop",
"index": self.current_content_block_index,
}
held = self._augment_message_delta_usage(self.holding_stop_reason_chunk)
self.holding_stop_reason_chunk = None
return held
if self.sent_last_message is False:
self.sent_last_message = True
return {"type": "message_stop"}
@ -213,7 +521,7 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
verbose_logger.error(
"Anthropic Adapter - {}\n{}".format(e, traceback.format_exc())
)
raise StopAsyncIteration
raise StopIteration
async def __anext__(self): # noqa: PLR0915
from .transformation import LiteLLMAnthropicMessagesAdapter
@ -243,8 +551,17 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
)
return self.chunk_queue.popleft()
if (
self.sent_compaction_block is False
and self.compaction_block is not None
):
compaction_event = self._next_compaction_event()
if compaction_event is not None:
return compaction_event
if self.sent_content_block_start is False:
self.sent_content_block_start = True
self.sent_content_block_finish = False
self.chunk_queue.append(
{
"type": "content_block_start",
@ -263,57 +580,31 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
if should_start_new_block:
self._increment_content_block_index()
# applied_edits only needs to flow to the final message_delta
# (when finish_reason is set); skip threading it through every
# intermediate chunk. For the hold-and-merge path below,
# context_management is attached directly to the merged chunk,
# so the translated ``processed_chunk`` would be discarded —
# skip the applied_edits attachment in that case to avoid
# allocating a throwaway ``MessageBlockDelta``.
will_merge_into_held = (
self.holding_stop_reason_chunk is not None
and getattr(chunk, "usage", None) is not None
)
is_final_chunk = chunk.choices[0].finish_reason is not None
processed_chunk = LiteLLMAnthropicMessagesAdapter().translate_streaming_openai_response_to_anthropic(
response=chunk,
current_content_block_index=self.current_content_block_index,
applied_edits=(
self.applied_edits
if is_final_chunk and not will_merge_into_held
else None
),
)
# Check if this is a usage chunk and we have a held stop_reason chunk
if (
self.holding_stop_reason_chunk is not None
and getattr(chunk, "usage", None) is not None
):
# Merge usage into the held stop_reason chunk
merged_chunk = self.holding_stop_reason_chunk.copy()
if "delta" not in merged_chunk:
merged_chunk["delta"] = {}
# Add usage to the held chunk
uncached_input_tokens = chunk.usage.prompt_tokens or 0
if (
hasattr(chunk.usage, "prompt_tokens_details")
and chunk.usage.prompt_tokens_details
):
cached_tokens = (
getattr(
chunk.usage.prompt_tokens_details, "cached_tokens", 0
)
or 0
)
uncached_input_tokens -= cached_tokens
usage_dict: UsageDelta = {
"input_tokens": uncached_input_tokens,
"output_tokens": chunk.usage.completion_tokens or 0,
}
# Add cache tokens if available (for prompt caching support)
if (
hasattr(chunk.usage, "_cache_creation_input_tokens")
and chunk.usage._cache_creation_input_tokens > 0
):
usage_dict["cache_creation_input_tokens"] = (
chunk.usage._cache_creation_input_tokens
)
if (
hasattr(chunk.usage, "_cache_read_input_tokens")
and chunk.usage._cache_read_input_tokens > 0
):
usage_dict["cache_read_input_tokens"] = (
chunk.usage._cache_read_input_tokens
)
merged_chunk["usage"] = usage_dict
# Queue the merged chunk and reset
if will_merge_into_held:
merged_chunk = self._merge_usage_into_held_stop_reason_chunk(chunk)
self.chunk_queue.append(merged_chunk)
self.queued_usage_chunk = True
self.holding_stop_reason_chunk = None
@ -379,28 +670,63 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
):
self.holding_stop_reason_chunk = processed_chunk
else:
processed_chunk = self._augment_message_delta_usage(
processed_chunk
)
self.chunk_queue.append(processed_chunk)
return self.chunk_queue.popleft()
elif self.holding_chunk is not None:
# Queue both chunks
self.chunk_queue.append(self.holding_chunk)
if processed_chunk.get("type") == "message_delta":
processed_chunk = self._augment_message_delta_usage(
processed_chunk
)
self.chunk_queue.append(processed_chunk)
self.holding_chunk = None
return self.chunk_queue.popleft()
else:
# Queue the current chunk
if processed_chunk.get("type") == "message_delta":
processed_chunk = self._augment_message_delta_usage(
processed_chunk
)
self.chunk_queue.append(processed_chunk)
return self.chunk_queue.popleft()
# Handle any remaining held chunks after stream ends
# Handle any remaining held chunks after stream ends. The
# buffered ``holding_chunk`` (a ``content_block_delta``) must
# precede the final ``message_delta`` so Anthropic SSE event
# ordering is preserved. When ``queued_usage_chunk`` is True,
# the final ``message_delta`` has already been emitted; any
# buffered content delta is dropped rather than emitted after
# ``message_delta`` (which would violate SSE ordering and may
# confuse strict Anthropic SDK clients).
if not self.queued_usage_chunk:
if self.holding_stop_reason_chunk is not None:
self.chunk_queue.append(self.holding_stop_reason_chunk)
self.holding_stop_reason_chunk = None
if self.holding_chunk is not None:
self.chunk_queue.append(self.holding_chunk)
self.holding_chunk = None
if self.holding_stop_reason_chunk is not None:
# A final ``message_delta`` must be preceded by
# ``content_block_stop`` so the emitted SSE stays in
# valid Anthropic order (... -> content_block_stop ->
# message_delta). Emit ``content_block_stop`` here if
# the active content block was not already closed.
if not self.sent_content_block_finish:
self.chunk_queue.append(
{
"type": "content_block_stop",
"index": self.current_content_block_index,
}
)
self.sent_content_block_finish = True
self.chunk_queue.append(
self._augment_message_delta_usage(
self.holding_stop_reason_chunk
)
)
self.holding_stop_reason_chunk = None
else:
self.holding_chunk = None
if not self.sent_last_message:
self.sent_last_message = True
@ -416,9 +742,28 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
# Handle any remaining queued chunks before stopping
if self.chunk_queue:
return self.chunk_queue.popleft()
# Handle any held stop_reason chunk
# Handle any held stop_reason chunk — clear after capturing so a
# subsequent ``__anext__`` call doesn't re-emit the same chunk
# (matches the sync ``__next__`` path). Emit ``content_block_stop``
# first if the active content block was not already closed, so
# Anthropic SSE ordering is preserved (content_block_stop ->
# message_delta).
if self.holding_stop_reason_chunk is not None:
return self.holding_stop_reason_chunk
if not self.sent_content_block_finish:
self.sent_content_block_finish = True
self.chunk_queue.append(
self._augment_message_delta_usage(
self.holding_stop_reason_chunk
)
)
self.holding_stop_reason_chunk = None
return {
"type": "content_block_stop",
"index": self.current_content_block_index,
}
held = self._augment_message_delta_usage(self.holding_stop_reason_chunk)
self.holding_stop_reason_chunk = None
return held
if not self.sent_last_message:
self.sent_last_message = True
return {"type": "message_stop"}

View File

@ -6,6 +6,7 @@ from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Literal,
Optional,
@ -75,6 +76,9 @@ from litellm.litellm_core_utils.prompt_templates.common_utils import (
from litellm.litellm_core_utils.prompt_templates.factory import (
THOUGHT_SIGNATURE_SEPARATOR,
)
from litellm.llms.anthropic.experimental_pass_through.context_management import (
PolyfillResult,
)
from litellm.types.llms.anthropic import (
ANTHROPIC_HOSTED_TOOLS,
AllAnthropicToolsValues,
@ -87,14 +91,17 @@ from litellm.types.llms.anthropic import (
AnthropicResponseContentBlockText,
AnthropicResponseContentBlockThinking,
AnthropicResponseContentBlockToolUse,
AppliedEdit,
ContentBlockDelta,
ContentJsonBlockDelta,
ContentTextBlockDelta,
ContentThinkingBlockDelta,
ContentThinkingSignatureBlockDelta,
ContextManagementResponse,
MessageBlockDelta,
MessageDelta,
UsageDelta,
UsageIteration,
)
from litellm.types.llms.anthropic_messages.anthropic_response import (
AnthropicMessagesResponse,
@ -195,6 +202,7 @@ class AnthropicAdapter:
self,
response: ModelResponse,
tool_name_mapping: Optional[Dict[str, str]] = None,
polyfill_result: Optional[PolyfillResult] = None,
) -> Optional[AnthropicMessagesResponse]:
"""
Translate OpenAI response to Anthropic format.
@ -204,10 +212,12 @@ class AnthropicAdapter:
tool_name_mapping: Optional mapping of truncated tool names to original names.
Used to restore original names for tools that exceeded
OpenAI's 64-char limit.
polyfill_result: PolyfillResult from context_management polyfill.
"""
return LiteLLMAnthropicMessagesAdapter().translate_openai_response_to_anthropic(
response=response,
tool_name_mapping=tool_name_mapping,
polyfill_result=polyfill_result,
)
def translate_completion_output_params_streaming(
@ -215,7 +225,9 @@ class AnthropicAdapter:
completion_stream: Any,
model: str,
tool_name_mapping: Optional[Dict[str, str]] = None,
) -> Union[AsyncIterator[bytes], None]:
polyfill_result: Optional[PolyfillResult] = None,
is_async: bool = True,
) -> Union[AsyncIterator[bytes], Iterator[bytes], None]:
"""
Translate OpenAI streaming response to Anthropic format.
@ -223,14 +235,35 @@ class AnthropicAdapter:
completion_stream: The OpenAI streaming response
model: The model name
tool_name_mapping: Optional mapping of truncated tool names to original names.
polyfill_result: PolyfillResult from context_management polyfill.
is_async: When ``True`` (default, for back-compat with existing
async callers) returns an ``AsyncIterator[bytes]``. When
``False`` returns a sync ``Iterator[bytes]`` so sync callers
(e.g. ``litellm.anthropic.messages.create(stream=True)`` via
the sync handler) don't get back an async iterator they
can't iterate without an event loop.
"""
applied_edits = (
polyfill_result.applied_edits_for_response() if polyfill_result else None
)
compaction_block = (
polyfill_result.compaction_block if polyfill_result is not None else None
)
iterations_usage = (
polyfill_result.iterations_usage if polyfill_result is not None else None
)
anthropic_wrapper = AnthropicStreamWrapper(
completion_stream=completion_stream,
model=model,
tool_name_mapping=tool_name_mapping,
applied_edits=applied_edits,
compaction_block=compaction_block,
iterations_usage=iterations_usage,
)
# Return the SSE-wrapped version for proper event formatting
return anthropic_wrapper.async_anthropic_sse_wrapper()
# Return the SSE-wrapped version for proper event formatting.
if is_async:
return anthropic_wrapper.async_anthropic_sse_wrapper()
return anthropic_wrapper.anthropic_sse_wrapper()
class LiteLLMAnthropicMessagesAdapter:
@ -1342,6 +1375,7 @@ class LiteLLMAnthropicMessagesAdapter:
self,
response: ModelResponse,
tool_name_mapping: Optional[Dict[str, str]] = None,
polyfill_result: Optional[PolyfillResult] = None,
) -> AnthropicMessagesResponse:
"""
Translate OpenAI response to Anthropic format.
@ -1351,12 +1385,17 @@ class LiteLLMAnthropicMessagesAdapter:
tool_name_mapping: Optional mapping of truncated tool names to original names.
Used to restore original names for tools that exceeded
OpenAI's 64-char limit.
polyfill_result: PolyfillResult from context_management polyfill.
"""
## translate content block
anthropic_content = self._translate_openai_content_to_anthropic(
choices=response.choices, # type: ignore
tool_name_mapping=tool_name_mapping,
)
if polyfill_result is not None and polyfill_result.compaction_block is not None:
anthropic_content.insert(0, polyfill_result.compaction_block) # type: ignore[arg-type]
## extract finish reason
anthropic_finish_reason = self._translate_openai_finish_reason_to_anthropic(
openai_finish_reason=response.choices[0].finish_reason # type: ignore
@ -1385,6 +1424,14 @@ class LiteLLMAnthropicMessagesAdapter:
if cached_tokens > 0:
anthropic_usage["cache_read_input_tokens"] = cached_tokens
if polyfill_result is not None and polyfill_result.iterations_usage is not None:
message_iteration: UsageIteration = {
"type": "message",
"input_tokens": uncached_input_tokens,
"output_tokens": usage.completion_tokens or 0,
}
anthropic_usage["iterations"] = list(polyfill_result.iterations_usage) + [message_iteration] # type: ignore[typeddict-unknown-key]
translated_obj = AnthropicMessagesResponse(
id=response.id,
type="message",
@ -1396,6 +1443,14 @@ class LiteLLMAnthropicMessagesAdapter:
stop_reason=anthropic_finish_reason,
)
applied_edits = (
polyfill_result.applied_edits_for_response() if polyfill_result else None
)
if applied_edits:
translated_obj["context_management"] = ContextManagementResponse(
applied_edits=list(applied_edits)
)
return translated_obj
def _translate_streaming_openai_chunk_to_anthropic_content_block(
@ -1528,7 +1583,10 @@ class LiteLLMAnthropicMessagesAdapter:
return "text_delta", ContentTextBlockDelta(type="text_delta", text=text)
def translate_streaming_openai_response_to_anthropic(
self, response: ModelResponse, current_content_block_index: int
self,
response: ModelResponse,
current_content_block_index: int,
applied_edits: Optional[List[AppliedEdit]] = None,
) -> Union[ContentBlockDelta, MessageBlockDelta]:
## base case - final chunk w/ finish reason
if response.choices[0].finish_reason is not None:
@ -1578,9 +1636,14 @@ class LiteLLMAnthropicMessagesAdapter:
usage_delta["cache_read_input_tokens"] = cached_tokens
else:
usage_delta = UsageDelta(input_tokens=0, output_tokens=0)
return MessageBlockDelta(
message_block = MessageBlockDelta(
type="message_delta", delta=delta, usage=usage_delta # type: ignore
)
if applied_edits:
message_block["context_management"] = ContextManagementResponse(
applied_edits=list(applied_edits)
)
return message_block
(
type_of_content,
content_block_delta,

View File

@ -0,0 +1,11 @@
from .constants import CLEARED_TOOL_RESULT_PLACEHOLDER
from .dispatcher import apply_context_management
from .errors import AnthropicContextManagementError
from .result import PolyfillResult
__all__ = [
"apply_context_management",
"AnthropicContextManagementError",
"CLEARED_TOOL_RESULT_PLACEHOLDER",
"PolyfillResult",
]

View File

@ -0,0 +1,45 @@
"""Constants for the in-gateway context-management polyfill."""
CLEAR_TOOL_USES_EDIT_TYPE = "clear_tool_uses_20250919"
DEFAULT_INPUT_TOKENS_TRIGGER = 100_000
DEFAULT_KEEP_TOOL_USES = 3
CLEARED_TOOL_RESULT_PLACEHOLDER = "[Cleared by context management]"
# compact_20260112
COMPACT_EDIT_TYPE = "compact_20260112"
COMPACT_DEFAULT_TRIGGER_TOKENS = 150_000
COMPACT_MIN_TRIGGER_TOKENS = 50_000
# Default ``max_tokens`` for the summary call. Required by providers like
# Anthropic that reject requests without it; safely accepted by providers that
# don't strictly require it. Chosen to comfortably fit a long structured
# summary. Operators can override via
# ``general_settings.context_management_summary_max_tokens``.
COMPACT_SUMMARY_MAX_TOKENS = 4096
COMPACT_SUMMARY_MAX_TOKENS_SETTING_KEY = "context_management_summary_max_tokens"
# Wall-clock bound for the summary sub-call. Without this a slow or
# unresponsive summary model would hang the parent ``/v1/messages`` request
# with no escape hatch; on timeout the editor falls into the standard
# ``summary_call_failed`` path and forwards the request without compaction.
COMPACT_SUMMARY_TIMEOUT_SECONDS = 60.0
COMPACT_SUMMARY_MODEL_SETTING_KEY = "context_management_summary_model"
COMPACT_SUMMARY_SYSTEM_PREFIX = "Previous conversation summary: "
# Default summarization prompt from the Anthropic spec.
COMPACT_DEFAULT_INSTRUCTIONS = (
"You have written a partial transcript for the initial task above. Please "
"write a summary of the transcript. The purpose of this summary is to "
"provide continuity so you can continue to make progress towards solving "
"the task in a future context, where the raw history above may not be "
"accessible and will be replaced with this summary. Write down anything "
"that would be helpful, including the state, next steps, learnings etc. "
"You must wrap your summary in a <summary></summary> block."
)
# Appended to the default prompt when ``tools`` are present and the caller
# did not supply custom ``instructions``. Matches the guidance in the
# Anthropic docs under "Compaction might fail when tools are defined".
COMPACT_NO_TOOL_CALLS_SUFFIX = (
" Do not call any tools while writing this summary; respond with text only."
)

View File

@ -0,0 +1,127 @@
"""Dispatch ``context_management`` edits to registered polyfill editors."""
import inspect
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union, cast
from litellm._logging import verbose_logger
from litellm.types.llms.anthropic import AppliedEdit
from .constants import CLEAR_TOOL_USES_EDIT_TYPE, COMPACT_EDIT_TYPE
from .editors import apply_clear_tool_uses_20250919, apply_compact_20260112
from .result import PolyfillResult
EditorFn = Callable[..., Any]
_EDITOR_REGISTRY: Dict[str, EditorFn] = {
CLEAR_TOOL_USES_EDIT_TYPE: apply_clear_tool_uses_20250919,
COMPACT_EDIT_TYPE: apply_compact_20260112,
}
def _normalize_spec(
spec: Union[Dict[str, Any], List[Dict[str, Any]], None],
) -> Optional[List[Dict[str, Any]]]:
"""Accept Anthropic-native dict form or OpenAI list form; return edits list."""
if isinstance(spec, list):
# Local import to avoid an import cycle at module load.
from litellm.llms.anthropic.chat.transformation import AnthropicConfig
spec = AnthropicConfig.map_openai_context_management_to_anthropic(spec)
edits = spec.get("edits") if isinstance(spec, dict) else None
if not edits or not isinstance(edits, list):
return None
return [edit for edit in edits if isinstance(edit, dict)]
def _wrap_editor_return(raw: Any, *, fallback_system: Any) -> PolyfillResult:
"""Coerce an editor's native return shape into a ``PolyfillResult``.
v0 sync editors (e.g. ``clear_tool_uses_20250919``) return a 2-tuple
``(messages, Optional[AppliedEdit])``. The new async ``compact_20260112``
editor returns a ``PolyfillResult`` directly.
"""
if isinstance(raw, PolyfillResult):
return raw
# Legacy 2-tuple return — sync editors don't mutate ``system``, so
# carry the caller's value forward.
messages, applied = cast(Tuple[List[Dict[str, Any]], Any], raw)
return PolyfillResult(
messages=messages,
system=fallback_system,
applied_edits=[applied] if applied is not None else [],
)
async def apply_context_management(
*,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]],
system: Any,
context_management_spec: Union[Dict[str, Any], List[Dict[str, Any]], None],
litellm_metadata: Optional[Dict[str, Any]] = None,
llm_router: Any = None,
user_api_key_auth: Any = None,
) -> PolyfillResult:
"""Run edits in order; return a single ``PolyfillResult``.
The dispatcher is async so async editors (``compact_20260112``) can
``await`` the configured summarization model. Sync editors are called
inline ``inspect.iscoroutinefunction`` decides how each editor is
invoked.
"""
edits = _normalize_spec(context_management_spec)
if not edits:
return PolyfillResult(messages=messages, system=system, applied_edits=[])
current_messages = messages
current_system = system
aggregated_applied: List[AppliedEdit] = []
aggregated_compaction_block = None
aggregated_iterations_usage = None
for edit_spec in edits:
edit_type = edit_spec.get("type")
editor = _EDITOR_REGISTRY.get(edit_type) if isinstance(edit_type, str) else None
if editor is None:
verbose_logger.debug(
"context_management polyfill: unknown edit type '%s' — skipping",
edit_type,
)
continue
kwargs: Dict[str, Any] = {
"model": model,
"messages": current_messages,
"tools": tools,
"system": current_system,
"edit_spec": edit_spec,
}
# Only async editors accept these — passing them to sync v0 editors
# would break their signature.
if inspect.iscoroutinefunction(editor):
kwargs["litellm_metadata"] = litellm_metadata
kwargs["llm_router"] = llm_router
kwargs["user_api_key_auth"] = user_api_key_auth
raw_result = await cast(Callable[..., Awaitable[Any]], editor)(**kwargs)
else:
raw_result = editor(**kwargs)
result = _wrap_editor_return(raw_result, fallback_system=current_system)
current_messages = result.messages
current_system = result.system
aggregated_applied.extend(result.applied_edits)
if result.compaction_block is not None:
aggregated_compaction_block = result.compaction_block
if result.iterations_usage is not None:
aggregated_iterations_usage = result.iterations_usage
return PolyfillResult(
messages=current_messages,
system=current_system,
applied_edits=aggregated_applied,
compaction_block=aggregated_compaction_block,
iterations_usage=aggregated_iterations_usage,
)

View File

@ -0,0 +1,4 @@
from .clear_tool_uses import apply_clear_tool_uses_20250919
from .compact import apply_compact_20260112
__all__ = ["apply_clear_tool_uses_20250919", "apply_compact_20260112"]

View File

@ -0,0 +1,210 @@
"""``clear_tool_uses_20250919`` polyfill (v0: ``trigger`` and ``keep`` only)."""
from typing import Any, Dict, List, Optional, Tuple, cast
import litellm
from litellm._logging import verbose_logger
from litellm.types.llms.anthropic import AppliedEdit
from ..constants import (
CLEAR_TOOL_USES_EDIT_TYPE,
DEFAULT_INPUT_TOKENS_TRIGGER,
DEFAULT_KEEP_TOOL_USES,
)
from ..placeholders import build_cleared_tool_result_content
def _count_tool_uses(messages: List[Dict[str, Any]]) -> int:
"""Return the number of tool_use content blocks across all messages.
Only counts blocks with a string ``id`` to stay consistent with
:func:`_collect_tool_use_ids_in_order`, which is the source of truth for
which blocks are clearable.
"""
count = 0
for msg in messages:
content = msg.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
if isinstance(block.get("id"), str):
count += 1
return count
def _collect_tool_use_ids_in_order(messages: List[Dict[str, Any]]) -> List[str]:
"""Return tool_use ids in the chronological order they appear in messages."""
ids: List[str] = []
for msg in messages:
content = msg.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
block_id = block.get("id")
if isinstance(block_id, str):
ids.append(block_id)
return ids
def _trigger_met(
trigger: Dict[str, Any],
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]],
) -> Tuple[bool, Optional[int]]:
"""Return (trigger_met, input_tokens if counted for reuse)."""
trigger_type = trigger.get("type", "input_tokens")
threshold = trigger.get("value")
if trigger_type == "tool_uses":
if not isinstance(threshold, int):
return False, None
return _count_tool_uses(messages) > threshold, None
if not isinstance(threshold, int):
threshold = DEFAULT_INPUT_TOKENS_TRIGGER
current_tokens = litellm.token_counter(
model=model,
messages=messages,
tools=cast(Any, tools),
)
verbose_logger.debug(
f"context_management polyfill: current_tokens: {current_tokens}"
)
verbose_logger.debug(f"context_management polyfill: threshold: {threshold}")
return current_tokens > threshold, current_tokens
def _resolve_keep_count(keep: Dict[str, Any]) -> int:
keep_type = keep.get("type", "tool_uses")
if keep_type != "tool_uses":
return DEFAULT_KEEP_TOOL_USES
value = keep.get("value")
if not isinstance(value, int) or value < 0:
return DEFAULT_KEEP_TOOL_USES
return value
def _last_completed_tool_use_id(
messages: List[Dict[str, Any]],
) -> Optional[str]:
"""Latest completed tool_result id; never cleared."""
last_id: Optional[str] = None
for msg in messages:
content = msg.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
block_id = block.get("tool_use_id")
if isinstance(block_id, str):
last_id = block_id
return last_id
def _clear_tool_results(
messages: List[Dict[str, Any]], ids_to_clear: set
) -> Tuple[List[Dict[str, Any]], int]:
"""Clear matching tool_result content; return (messages, cleared_count)."""
cleared = 0
new_messages: List[Dict[str, Any]] = []
for msg in messages:
content = msg.get("content")
if not isinstance(content, list):
new_messages.append(msg)
continue
new_blocks: List[Any] = []
mutated = False
for block in content:
if (
isinstance(block, dict)
and block.get("type") == "tool_result"
and block.get("tool_use_id") in ids_to_clear
):
new_block = {
**block,
"content": build_cleared_tool_result_content(block.get("content")),
}
new_blocks.append(new_block)
mutated = True
cleared += 1
else:
new_blocks.append(block)
if mutated:
new_messages.append({**msg, "content": new_blocks})
else:
new_messages.append(msg)
return new_messages, cleared
def apply_clear_tool_uses_20250919(
*,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]],
system: Any,
edit_spec: Dict[str, Any],
) -> Tuple[List[Dict[str, Any]], Optional[AppliedEdit]]:
"""Apply clear_tool_uses; return (messages, AppliedEdit or None)."""
ignored_knobs = [
knob
for knob in ("clear_at_least", "exclude_tools", "clear_tool_inputs")
if knob in edit_spec
]
for ignored_knob in ignored_knobs:
verbose_logger.warning(
"context_management polyfill: ignoring '%s' on %s "
"(supported only on Anthropic-family forwarding path in v0)",
ignored_knob,
CLEAR_TOOL_USES_EDIT_TYPE,
)
trigger = edit_spec.get("trigger") or {
"type": "input_tokens",
"value": DEFAULT_INPUT_TOKENS_TRIGGER,
}
keep = edit_spec.get("keep") or {
"type": "tool_uses",
"value": DEFAULT_KEEP_TOOL_USES,
}
met, tokens_before = _trigger_met(trigger, model, messages, tools)
if not met:
return messages, None
keep_count = _resolve_keep_count(keep)
tool_use_ids = _collect_tool_use_ids_in_order(messages)
if len(tool_use_ids) <= keep_count:
return messages, None
ids_to_clear = set(tool_use_ids[: len(tool_use_ids) - keep_count])
# Never clear the latest completed tool_result (reply context).
last_completed_id = _last_completed_tool_use_id(messages)
if last_completed_id is not None:
ids_to_clear.discard(last_completed_id)
edited, cleared_count = _clear_tool_results(messages, ids_to_clear)
verbose_logger.debug("context_management polyfill: edited: %s", edited)
if cleared_count == 0:
return messages, None
if tokens_before is None:
tokens_before = litellm.token_counter(
model=model, messages=messages, tools=cast(Any, tools)
)
tokens_after = litellm.token_counter(
model=model, messages=edited, tools=cast(Any, tools)
)
cleared_input_tokens = max(tokens_before - tokens_after, 0)
applied: AppliedEdit = {
"type": CLEAR_TOOL_USES_EDIT_TYPE,
"cleared_tool_uses": cleared_count,
"cleared_input_tokens": cleared_input_tokens,
}
if ignored_knobs:
applied["warnings"] = [f"{knob}_ignored" for knob in ignored_knobs]
return edited, applied

View File

@ -0,0 +1,14 @@
"""Exceptions raised by the context_management polyfill."""
class AnthropicContextManagementError(Exception):
"""Validation error from the polyfill, surfaced as an Anthropic-format 4xx.
The `/v1/messages` endpoint catches this in its exception handler and
emits an Anthropic-shaped error body instead of the default OpenAI shape.
"""
def __init__(self, *, status_code: int, message: str) -> None:
super().__init__(message)
self.status_code = status_code
self.message = message

View File

@ -0,0 +1,14 @@
"""Placeholder content for cleared ``tool_result`` blocks (string or block list)."""
from typing import Any, List, Union
from .constants import CLEARED_TOOL_RESULT_PLACEHOLDER
def build_cleared_tool_result_content(
original_content: Any,
) -> Union[str, List[dict]]:
"""Return a string or single text block list, matching ``original_content`` shape."""
if isinstance(original_content, list):
return [{"type": "text", "text": CLEARED_TOOL_RESULT_PLACEHOLDER}]
return CLEARED_TOOL_RESULT_PLACEHOLDER

View File

@ -0,0 +1,53 @@
"""``PolyfillResult`` — the shape returned by the context-management dispatcher.
Threaded from the dispatcher through ``async_anthropic_messages_handler`` into
the adapter so it can prepend the ``compaction`` block to the response and
attach ``iterations`` to ``usage``.
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from litellm.types.llms.anthropic import (
AppliedEdit,
CompactionBlock,
UsageIteration,
)
from .constants import COMPACT_EDIT_TYPE
@dataclass
class PolyfillResult:
messages: List[Dict[str, Any]]
system: Optional[Union[str, List[Dict[str, Any]]]]
applied_edits: List[AppliedEdit] = field(default_factory=list)
compaction_block: Optional[CompactionBlock] = None
iterations_usage: Optional[List[UsageIteration]] = None
def applied_edits_for_response(self) -> Optional[List[AppliedEdit]]:
"""``applied_edits`` to attach on the client-visible response.
``compact_20260112`` is included when a new compaction block was
synthesized (success), when the edit carries an ``error`` field
(``summary_model_not_configured``, ``summary_call_failed``,
``summary_extraction_failed``), or when the edit carries
``warnings`` (e.g. ``unsupported_trigger_type_X_using_input_tokens``,
``pause_after_compaction_ignored``) operators and clients need to
see why compaction was requested but not applied as expected.
Slice-only / under-threshold paths that produced no edit at all
(no block, no error, no warnings) are omitted. Other edit types are
included when the editor returned an ``AppliedEdit``.
"""
visible: List[AppliedEdit] = []
for edit in self.applied_edits:
if edit.get("type") == COMPACT_EDIT_TYPE:
if (
self.compaction_block is not None
or edit.get("error")
or edit.get("warnings")
):
visible.append(edit)
else:
visible.append(edit)
return visible or None

View File

@ -8,7 +8,17 @@
import asyncio
import contextvars
from functools import partial
from typing import Any, AsyncIterator, Coroutine, Dict, List, Optional, Union, cast
from typing import (
Any,
AsyncIterator,
Coroutine,
Dict,
Iterator,
List,
Optional,
Union,
cast,
)
import litellm
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
@ -189,7 +199,7 @@ async def anthropic_messages(
client: Optional[AsyncHTTPHandler] = None,
custom_llm_provider: Optional[str] = None,
**kwargs,
) -> Union[AnthropicMessagesResponse, AsyncIterator]:
) -> Union[AnthropicMessagesResponse, Iterator[bytes], AsyncIterator[Any]]:
"""
Async: Make llm api request in Anthropic /messages API spec.
@ -346,8 +356,11 @@ def anthropic_messages_handler(
**kwargs,
) -> Union[
AnthropicMessagesResponse,
Iterator[bytes],
AsyncIterator[Any],
Coroutine[Any, Any, Union[AnthropicMessagesResponse, AsyncIterator[Any]]],
Coroutine[
Any, Any, Union[AnthropicMessagesResponse, AsyncIterator[Any], Iterator[bytes]]
],
]:
"""
Makes Anthropic `/v1/messages` API calls In the Anthropic API Spec
@ -456,9 +469,14 @@ def anthropic_messages_handler(
return LiteLLMMessagesToResponsesAPIHandler.anthropic_messages_handler(
**_shared_kwargs
)
# The in-gateway context_management polyfill runs inside
# ``async_anthropic_messages_handler`` so it can ``await`` the
# summarization model for ``compact_20260112``. ``context_management``
# is passed through as a regular kwarg.
return (
LiteLLMMessagesToCompletionTransformationHandler.anthropic_messages_handler(
**_shared_kwargs
**_shared_kwargs,
)
)

View File

@ -586,6 +586,9 @@ class AmazonConverseConfig(BaseConfig):
):
supported_params.append("thinking")
supported_params.append("reasoning_effort")
if base_model.startswith("anthropic"):
supported_params.append("context_management")
return supported_params
def map_tool_choice_values(
@ -947,10 +950,10 @@ class AmazonConverseConfig(BaseConfig):
self._handle_reasoning_effort_parameter(
model=model, reasoning_effort=value, optional_params=optional_params
)
elif param == "context_management" and isinstance(value, (dict, list)):
self._map_context_management_param(value, optional_params)
if param == "requestMetadata":
if value is not None and isinstance(value, dict):
self._validate_request_metadata(value) # type: ignore
optional_params["requestMetadata"] = value
self._map_request_metadata_param(value, optional_params)
if param == "service_tier" and isinstance(value, str):
self._map_service_tier_param(value, optional_params)
@ -983,6 +986,32 @@ class AmazonConverseConfig(BaseConfig):
return optional_params
def _map_request_metadata_param(self, value: Any, optional_params: dict) -> None:
if value is not None and isinstance(value, dict):
self._validate_request_metadata(value) # type: ignore
optional_params["requestMetadata"] = value
def _map_context_management_param(
self, value: Union[dict, list], optional_params: dict
) -> None:
# Match the dispatcher's ``_normalize_spec`` behavior: only run the
# OpenAI→Anthropic mapper for list inputs. Dict inputs are already in
# Anthropic-native shape (``{"edits": [...]}``) and should pass
# through unchanged so an Anthropic-format ``context_management``
# value isn't silently dropped when the mapper can't classify it.
if isinstance(value, list):
mapped = AnthropicConfig.map_openai_context_management_to_anthropic(
cast(Union[dict, list], value)
)
else:
mapped = value
# Skip when the mapper returned None for malformed input — leaving the
# key out is safer than passing `context_management: null` downstream,
# which Bedrock would reject and which can confuse intermediate checks
# before the final _filter_context_management_for_bedrock_converse step.
if mapped is not None:
optional_params["context_management"] = mapped
def _map_service_tier_param(self, value: str, optional_params: dict) -> None:
"""Map OpenAI service_tier (string) to Bedrock serviceTier (object).
@ -1488,6 +1517,11 @@ class AmazonConverseConfig(BaseConfig):
if ANTHROPIC_EFFORT_BETA_HEADER not in anthropic_beta_list:
anthropic_beta_list.append(ANTHROPIC_EFFORT_BETA_HEADER)
# Bedrock Converse: compact_20260112 edits only (+ beta header).
AmazonConverseConfig._filter_context_management_for_bedrock_converse(
additional_request_params, anthropic_beta_list
)
# Set anthropic_beta in additional_request_params if we have any beta features
# ONLY apply to Anthropic/Claude models - other models (e.g., Qwen, Llama) don't support this field
if anthropic_beta_list and base_model.startswith("anthropic"):
@ -1495,6 +1529,42 @@ class AmazonConverseConfig(BaseConfig):
return bedrock_tools, anthropic_beta_list
@staticmethod
def _filter_context_management_for_bedrock_converse(
additional_request_params: dict,
anthropic_beta_list: list,
) -> None:
"""Keep only compact_20260112 edits for Bedrock; add beta header or drop field."""
from litellm.llms.anthropic.experimental_pass_through.context_management.constants import (
COMPACT_EDIT_TYPE,
)
from litellm.types.llms.anthropic import ANTHROPIC_BETA_HEADER_VALUES
cm = additional_request_params.get("context_management")
if not isinstance(cm, dict):
additional_request_params.pop("context_management", None)
return
edits = cm.get("edits")
if not isinstance(edits, list):
additional_request_params.pop("context_management", None)
return
compact_edits = [
e
for e in edits
if isinstance(e, dict) and e.get("type") == COMPACT_EDIT_TYPE
]
if compact_edits:
compact_beta = ANTHROPIC_BETA_HEADER_VALUES.COMPACT_2026_01_12.value
if compact_beta not in anthropic_beta_list:
anthropic_beta_list.append(compact_beta)
additional_request_params["context_management"] = {
**cm,
"edits": compact_edits,
}
else:
additional_request_params.pop("context_management", None)
def _transform_request_helper(
self,
model: str,

View File

@ -41,7 +41,10 @@ from litellm.llms.bedrock.common_utils import (
pop_bedrock_invoke_output_config_format,
remove_custom_field_from_tools,
)
from litellm.types.llms.anthropic import ANTHROPIC_TOOL_SEARCH_BETA_HEADER
from litellm.types.llms.anthropic import (
ANTHROPIC_BETA_HEADER_VALUES,
ANTHROPIC_TOOL_SEARCH_BETA_HEADER,
)
from litellm.types.llms.bedrock import BedrockInvokeAnthropicMessagesRequest
from litellm.types.llms.openai import AllMessageValues
from litellm.types.router import GenericLiteLLMParams
@ -445,7 +448,7 @@ class AmazonAnthropicClaudeMessagesConfig(
if isinstance(e, dict) and e.get("type") == "compact_20260112"
]
if compact_edits:
beta_set.add("compact-2026-01-12")
beta_set.add(ANTHROPIC_BETA_HEADER_VALUES.COMPACT_2026_01_12.value)
anthropic_messages_request["context_management"] = {
**cm,
"edits": compact_edits,

View File

@ -3,10 +3,14 @@ Unified /v1/messages endpoint - (Anthropic Spec)
"""
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import JSONResponse
from litellm._logging import verbose_proxy_logger
from litellm.anthropic_interface.exceptions import AnthropicExceptionMapping
from litellm.integrations.custom_guardrail import ModifyResponseException
from litellm.llms.anthropic.experimental_pass_through.context_management import (
AnthropicContextManagementError,
)
from litellm.proxy._types import *
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.common_request_processing import (
@ -114,6 +118,21 @@ async def anthropic_response( # noqa: PLR0915
)
return _anthropic_response
except AnthropicContextManagementError as e:
if e.status_code >= 500:
# Server-side polyfill failures hit the failure hook for spend/alert
# parity with the generic handler; 4xx validation errors do not.
await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict,
original_exception=e,
request_data=data,
)
body = AnthropicExceptionMapping.transform_to_anthropic_error(
status_code=e.status_code,
raw_message=e.message,
request_id=request.headers.get("x-request-id"),
)
return JSONResponse(status_code=e.status_code, content=body)
except Exception as e:
await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data

View File

@ -2,7 +2,7 @@ from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Union
from pydantic import BaseModel, ConfigDict
from typing_extensions import Literal, Required, TypedDict
from typing_extensions import Literal, NotRequired, Required, TypedDict
from .openai import (
ChatCompletionCachedContent,
@ -515,6 +515,41 @@ class UsageDelta(TypedDict, total=False):
cache_read_input_tokens: int
class AppliedEdit(TypedDict, total=False):
"""One applied context_management edit (Anthropic response shape)."""
type: str
cleared_input_tokens: int
cleared_tool_uses: int
cleared_thinking_turns: int
# compact_20260112 fields
summary_input_tokens: int
summary_output_tokens: int
error: str
warnings: List[str]
class ContextManagementResponse(TypedDict, total=False):
"""Response ``context_management`` with ``applied_edits``."""
applied_edits: List[AppliedEdit]
class CompactionBlock(TypedDict, total=False):
"""Synthesized ``compaction`` content block (compact_20260112)."""
type: Required[Literal["compaction"]]
content: Optional[str]
class UsageIteration(TypedDict, total=False):
"""One sampling iteration's token usage (compact_20260112)."""
type: Required[Literal["compaction", "message"]]
input_tokens: int
output_tokens: int
class MessageBlockDelta(TypedDict):
"""
Anthropic
@ -524,6 +559,7 @@ class MessageBlockDelta(TypedDict):
type: Literal["message_delta"]
delta: MessageDelta
usage: UsageDelta
context_management: NotRequired[ContextManagementResponse]
class MessageChunk(TypedDict, total=False):

View File

@ -1,10 +1,11 @@
from typing import Any, Dict, List, Literal, Optional, Union
from typing_extensions import TypeAlias, TypedDict
from typing_extensions import NotRequired, TypeAlias, TypedDict
from litellm.types.llms.anthropic import (
AnthropicResponseContentBlockText,
AnthropicResponseContentBlockToolUse,
ContextManagementResponse,
)
@ -94,3 +95,4 @@ class AnthropicMessagesResponse(TypedDict, total=False):
stop_sequence: Optional[str]
type: Optional[Literal["message"]]
usage: Optional[AnthropicUsage]
context_management: NotRequired[ContextManagementResponse]

View File

@ -0,0 +1,272 @@
"""Integration tests for context_management polyfill on /v1/messages adapter path."""
import json
from unittest.mock import patch
import pytest
import litellm
from litellm.llms.anthropic.experimental_pass_through.context_management.constants import (
CLEARED_TOOL_RESULT_PLACEHOLDER,
)
from litellm.types.utils import (
Choices,
Message,
ModelResponse,
ModelResponseStream,
StreamingChoices,
Delta,
Usage,
)
MODEL = "xai/grok-4"
def _make_history(n_pairs: int, result_filler: str = "x" * 50):
messages = [{"role": "user", "content": "Compare weather across cities."}]
for i in range(n_pairs):
messages.append(
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": f"toolu_{i:02d}",
"name": "get_weather",
"input": {"location": f"City{i}"},
}
],
}
)
messages.append(
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": f"toolu_{i:02d}",
"content": f"Result {i}: {result_filler}",
}
],
}
)
return messages
def _mock_completion_response() -> ModelResponse:
return ModelResponse(
id="chatcmpl-test",
choices=[
Choices(
finish_reason="stop",
index=0,
message=Message(role="assistant", content="ok"),
)
],
created=0,
model="grok-4",
object="chat.completion",
usage=Usage(prompt_tokens=10, completion_tokens=2, total_tokens=12),
)
async def _mock_streaming_chunks():
yield ModelResponseStream(
id="chatcmpl-test",
created=0,
model="grok-4",
object="chat.completion.chunk",
choices=[
StreamingChoices(
finish_reason=None,
index=0,
delta=Delta(role="assistant", content="ok"),
)
],
)
yield ModelResponseStream(
id="chatcmpl-test",
created=0,
model="grok-4",
object="chat.completion.chunk",
choices=[
StreamingChoices(
finish_reason="stop",
index=0,
delta=Delta(),
)
],
usage=Usage(prompt_tokens=10, completion_tokens=2, total_tokens=12),
)
@pytest.mark.asyncio
async def test_polyfill_round_trip_non_streaming():
captured = {}
async def fake_acompletion(**kwargs):
captured.update(kwargs)
return _mock_completion_response()
with patch("litellm.acompletion", side_effect=fake_acompletion):
response = await litellm.anthropic.messages.acreate(
model=MODEL,
messages=_make_history(n_pairs=5),
max_tokens=128,
api_key="sk-test",
context_management={
"edits": [
{
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 1},
"keep": {"type": "tool_uses", "value": 2},
}
]
},
)
# 1. Downstream got the edited messages — older tool_result.content cleared.
downstream_messages = captured.get("messages")
assert downstream_messages is not None
cleared_ids = {"toolu_00", "toolu_01", "toolu_02"}
kept_ids = {"toolu_03", "toolu_04"}
found_cleared = 0
for msg in downstream_messages:
# The adapter may have translated the messages out of Anthropic shape;
# we accept either Anthropic-shape (tool_result block) or OpenAI-shape
# (tool-role message whose content is the placeholder).
if isinstance(msg, dict) and msg.get("role") == "tool":
if msg.get("tool_call_id") in cleared_ids:
content = msg.get("content")
if isinstance(content, str):
if CLEARED_TOOL_RESULT_PLACEHOLDER in content:
found_cleared += 1
elif isinstance(content, list):
text = "".join(
b.get("text", "") for b in content if isinstance(b, dict)
)
if CLEARED_TOOL_RESULT_PLACEHOLDER in text:
found_cleared += 1
elif msg.get("tool_call_id") in kept_ids:
content = msg.get("content")
if isinstance(content, str):
assert CLEARED_TOOL_RESULT_PLACEHOLDER not in content
assert found_cleared == 3
# 2. context_management must not leak into downstream kwargs.
assert "context_management" not in captured
# 3. Response carries the applied_edits in Anthropic's documented shape.
assert isinstance(response, dict)
cm = response.get("context_management")
assert cm is not None, f"context_management missing from response: {response}"
edits = cm.get("applied_edits")
assert isinstance(edits, list) and len(edits) == 1
edit = edits[0]
assert edit["type"] == "clear_tool_uses_20250919"
assert edit["cleared_tool_uses"] == 3
assert "cleared_input_tokens" in edit
@pytest.mark.asyncio
async def test_polyfill_trigger_not_met_passes_through_unchanged():
captured = {}
async def fake_acompletion(**kwargs):
captured.update(kwargs)
return _mock_completion_response()
with patch("litellm.acompletion", side_effect=fake_acompletion):
response = await litellm.anthropic.messages.acreate(
model=MODEL,
messages=_make_history(n_pairs=2),
max_tokens=128,
api_key="sk-test",
context_management={
"edits": [
{
"type": "clear_tool_uses_20250919",
"trigger": {"type": "input_tokens", "value": 10_000_000},
"keep": {"type": "tool_uses", "value": 1},
}
]
},
)
# Downstream still got the request, but no edits applied.
assert captured.get("messages") is not None
assert "context_management" not in captured
# Response shouldn't carry context_management when nothing fired.
assert isinstance(response, dict)
assert (
response.get("context_management") is None
or response.get("context_management") == {"applied_edits": []}
or "context_management" not in response
)
@pytest.mark.asyncio
async def test_polyfill_streaming_attaches_to_message_delta():
async def fake_acompletion(**kwargs):
return _mock_streaming_chunks()
with patch("litellm.acompletion", side_effect=fake_acompletion):
response = await litellm.anthropic.messages.acreate(
model=MODEL,
messages=_make_history(n_pairs=5),
max_tokens=128,
api_key="sk-test",
stream=True,
context_management={
"edits": [
{
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 1},
"keep": {"type": "tool_uses", "value": 2},
}
]
},
)
# Collect all SSE bytes.
collected = []
async for chunk in response: # type: ignore[union-attr]
if isinstance(chunk, (bytes, bytearray)):
collected.append(chunk.decode("utf-8"))
else:
collected.append(str(chunk))
sse_text = "".join(collected)
# Find the message_delta event payload and check it carries context_management
# as a sibling of `usage` per Anthropic's spec.
found_delta_with_cm = False
for block in sse_text.split("\n\n"):
if "message_delta" not in block:
continue
data_line = next(
(
line[len("data:") :].strip()
for line in block.splitlines()
if line.startswith("data:")
),
None,
)
if data_line is None:
continue
payload = json.loads(data_line)
if payload.get("type") != "message_delta":
continue
cm = payload.get("context_management")
if cm is None:
continue
assert "applied_edits" in cm
assert len(cm["applied_edits"]) == 1
assert cm["applied_edits"][0]["type"] == "clear_tool_uses_20250919"
assert cm["applied_edits"][0]["cleared_tool_uses"] == 3
found_delta_with_cm = True
break
assert found_delta_with_cm, (
"Expected `context_management` on the message_delta SSE event. "
f"SSE text was: {sse_text!r}"
)

View File

@ -2472,3 +2472,172 @@ def test_translate_anthropic_tool_choice_none():
result = adapter.translate_anthropic_tool_choice_to_openai({"type": "none"})
assert result == "none"
# ---------------------------------------------------------------------------
# PolyfillResult integration tests
# ---------------------------------------------------------------------------
def _make_simple_openai_response(
text: str = "Hello", prompt_tokens: int = 10, completion_tokens: int = 5
) -> ModelResponse:
return ModelResponse(
id="resp_polyfill_test",
model="gpt-4o",
choices=[
Choices(
finish_reason="stop",
message=Message(role="assistant", content=text),
)
],
usage=Usage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens),
)
def test_translate_openai_response_to_anthropic_with_polyfill_compaction_block():
"""compaction_block from PolyfillResult must be prepended to content at index 0."""
from litellm.llms.anthropic.experimental_pass_through.context_management.result import (
PolyfillResult,
)
compaction_block = {"type": "compaction", "content": "Summary of prior turns."}
polyfill = PolyfillResult(
messages=[],
system=None,
applied_edits=[{"type": "compact_20260112"}],
compaction_block=compaction_block,
iterations_usage=None,
)
response = _make_simple_openai_response(text="Hello after compaction.")
adapter = LiteLLMAnthropicMessagesAdapter()
result = adapter.translate_openai_response_to_anthropic(
response=response, polyfill_result=polyfill
)
content = result.get("content")
assert content is not None
assert content[0]["type"] == "compaction"
assert content[0]["content"] == "Summary of prior turns."
assert content[1]["type"] == "text"
assert content[1]["text"] == "Hello after compaction."
# applied_edits must surface on context_management
cm = result.get("context_management")
assert cm is not None
assert cm["applied_edits"][0]["type"] == "compact_20260112"
def test_translate_openai_response_to_anthropic_with_polyfill_iterations_usage():
"""iterations_usage from PolyfillResult must produce usage['iterations'] with a message entry."""
from litellm.llms.anthropic.experimental_pass_through.context_management.result import (
PolyfillResult,
)
polyfill = PolyfillResult(
messages=[],
system=None,
applied_edits=[{"type": "compact_20260112"}],
compaction_block=None,
iterations_usage=[
{"type": "compaction", "input_tokens": 200, "output_tokens": 50},
],
)
response = _make_simple_openai_response(prompt_tokens=100, completion_tokens=30)
adapter = LiteLLMAnthropicMessagesAdapter()
result = adapter.translate_openai_response_to_anthropic(
response=response, polyfill_result=polyfill
)
usage = result.get("usage")
assert usage is not None
iterations = usage.get("iterations")
assert iterations is not None
assert len(iterations) == 2
assert iterations[0] == {
"type": "compaction",
"input_tokens": 200,
"output_tokens": 50,
}
assert iterations[1]["type"] == "message"
assert iterations[1]["input_tokens"] == 100
assert iterations[1]["output_tokens"] == 30
# Top-level tokens must still reflect the message iteration
assert usage["input_tokens"] == 100
assert usage["output_tokens"] == 30
def test_translate_openai_response_to_anthropic_no_polyfill_no_change():
"""Without a PolyfillResult the response must be unchanged (no compaction, no iterations)."""
response = _make_simple_openai_response()
adapter = LiteLLMAnthropicMessagesAdapter()
result = adapter.translate_openai_response_to_anthropic(response=response)
content = result.get("content")
assert content is not None
assert content[0]["type"] == "text"
usage = result.get("usage")
assert usage is not None
assert "iterations" not in usage
def test_translate_openai_response_to_anthropic_with_polyfill_both_compaction_and_iterations():
"""Full summary path: compaction_block and iterations_usage both present simultaneously."""
from litellm.llms.anthropic.experimental_pass_through.context_management.result import (
PolyfillResult,
)
compaction_block = {
"type": "compaction",
"content": "Summary of a long conversation.",
}
polyfill = PolyfillResult(
messages=[],
system=None,
applied_edits=[{"type": "compact_20260112"}],
compaction_block=compaction_block,
iterations_usage=[
{"type": "compaction", "input_tokens": 300, "output_tokens": 75},
],
)
response = _make_simple_openai_response(
text="After compaction.", prompt_tokens=120, completion_tokens=40
)
adapter = LiteLLMAnthropicMessagesAdapter()
result = adapter.translate_openai_response_to_anthropic(
response=response, polyfill_result=polyfill
)
# compaction block must come first
content = result.get("content")
assert content is not None
assert content[0]["type"] == "compaction"
assert content[0]["content"] == "Summary of a long conversation."
assert content[1]["type"] == "text"
assert content[1]["text"] == "After compaction."
# iterations: compaction entry + message entry
usage = result.get("usage")
assert usage is not None
iterations = usage.get("iterations")
assert iterations is not None
assert len(iterations) == 2
assert iterations[0] == {
"type": "compaction",
"input_tokens": 300,
"output_tokens": 75,
}
assert iterations[1]["type"] == "message"
assert iterations[1]["input_tokens"] == 120
assert iterations[1]["output_tokens"] == 40
# top-level tokens match the message iteration
assert usage["input_tokens"] == 120
assert usage["output_tokens"] == 40
# context_management applied_edits must surface
cm = result.get("context_management")
assert cm is not None
assert cm["applied_edits"][0]["type"] == "compact_20260112"

View File

@ -0,0 +1,193 @@
"""Compaction block SSE events from AnthropicStreamWrapper (compact_20260112 polyfill)."""
import os
import sys
from typing import List
from unittest.mock import MagicMock
import pytest
sys.path.insert(0, os.path.abspath("../../../../.."))
from litellm.llms.anthropic.experimental_pass_through.adapters.streaming_iterator import (
AnthropicStreamWrapper,
)
from litellm.types.utils import Delta, StreamingChoices, Usage
def _make_text_chunk(
text: str,
finish_reason: str = None,
usage: "Usage | None" = None,
) -> MagicMock:
chunk = MagicMock()
chunk.choices = [
StreamingChoices(
finish_reason=finish_reason,
index=0,
delta=Delta(
content=text, role="assistant" if text else None, tool_calls=None
),
logprobs=None,
)
]
chunk.usage = usage
chunk._hidden_params = {}
return chunk
async def _collect_events_async(wrapper: AnthropicStreamWrapper) -> List[dict]:
events = []
async for event in wrapper:
events.append(event)
return events
@pytest.mark.asyncio
async def test_stream_emits_compaction_block_before_text():
"""Polyfill compaction_block must surface as compaction SSE events at index 0."""
async def mock_stream():
yield _make_text_chunk("Hi")
yield _make_text_chunk(
"",
finish_reason="stop",
usage=Usage(prompt_tokens=10, completion_tokens=5, total_tokens=15),
)
compaction_block = {
"type": "compaction",
"content": "Summary of prior conversation turns.",
}
iterations_usage = [
{"type": "compaction", "input_tokens": 100, "output_tokens": 50},
]
wrapper = AnthropicStreamWrapper(
completion_stream=mock_stream(),
model="claude-sonnet-4-6",
compaction_block=compaction_block,
iterations_usage=iterations_usage,
applied_edits=[{"type": "compact_20260112"}],
)
events = await _collect_events_async(wrapper)
compaction_start = next(
e
for e in events
if e.get("type") == "content_block_start"
and e.get("content_block", {}).get("type") == "compaction"
)
assert compaction_start["index"] == 0
compaction_delta = next(
e
for e in events
if e.get("type") == "content_block_delta"
and e.get("delta", {}).get("type") == "compaction_delta"
)
assert compaction_delta["index"] == 0
assert (
compaction_delta["delta"]["content"] == "Summary of prior conversation turns."
)
compaction_stop = next(
e
for e in events
if e.get("type") == "content_block_stop" and e.get("index") == 0
)
assert compaction_stop is not None
text_start = next(
e
for e in events
if e.get("type") == "content_block_start"
and e.get("content_block", {}).get("type") == "text"
)
assert text_start["index"] == 1
message_delta = next(e for e in events if e.get("type") == "message_delta")
iterations = message_delta.get("usage", {}).get("iterations")
assert iterations is not None
assert iterations[0]["type"] == "compaction"
assert iterations[1]["type"] == "message"
assert iterations[1]["input_tokens"] == 10
assert iterations[1]["output_tokens"] == 5
@pytest.mark.asyncio
async def test_stream_omits_message_iteration_when_no_usage_chunk():
"""When provider sends finish_reason without usage, the held message_delta
carries placeholder zeros we must not emit a misleading zero-token
``message`` iteration entry."""
async def mock_stream():
yield _make_text_chunk("Hi")
yield _make_text_chunk("", finish_reason="stop")
iterations_usage = [
{"type": "compaction", "input_tokens": 100, "output_tokens": 50},
]
wrapper = AnthropicStreamWrapper(
completion_stream=mock_stream(),
model="claude-sonnet-4-6",
iterations_usage=iterations_usage,
)
events = await _collect_events_async(wrapper)
message_delta = next(e for e in events if e.get("type") == "message_delta")
iterations = message_delta.get("usage", {}).get("iterations")
assert iterations is not None
assert len(iterations) == 1
assert iterations[0]["type"] == "compaction"
@pytest.mark.asyncio
async def test_stream_omits_context_management_when_no_compaction_applied():
"""applied_edits without a compaction block must not emit context_management."""
async def mock_stream():
yield _make_text_chunk("Hello")
yield _make_text_chunk("", finish_reason="stop")
wrapper = AnthropicStreamWrapper(
completion_stream=mock_stream(),
model="claude-sonnet-4-6",
applied_edits=None,
)
events = await _collect_events_async(wrapper)
message_deltas = [e for e in events if e.get("type") == "message_delta"]
assert message_deltas
assert "context_management" not in message_deltas[-1]
@pytest.mark.asyncio
async def test_stream_without_compaction_block_unchanged():
"""No compaction_block means no compaction SSE events."""
async def mock_stream():
yield _make_text_chunk("Hello")
yield _make_text_chunk("", finish_reason="stop")
wrapper = AnthropicStreamWrapper(
completion_stream=mock_stream(),
model="claude-sonnet-4-6",
)
events = await _collect_events_async(wrapper)
assert not any(
e.get("content_block", {}).get("type") == "compaction"
for e in events
if e.get("type") == "content_block_start"
)
text_start = next(
e
for e in events
if e.get("type") == "content_block_start"
and e.get("content_block", {}).get("type") == "text"
)
assert text_start["index"] == 0

View File

@ -0,0 +1,307 @@
"""
Unit tests for the in-gateway `clear_tool_uses_20250919` polyfill editor.
"""
from copy import deepcopy
from litellm.llms.anthropic.experimental_pass_through.context_management.constants import (
CLEARED_TOOL_RESULT_PLACEHOLDER,
)
from litellm.llms.anthropic.experimental_pass_through.context_management.editors.clear_tool_uses import (
apply_clear_tool_uses_20250919,
)
MODEL = "xai/grok-4"
def _make_pair(tool_use_id: str, result_text: str, location: str = "Mumbai"):
"""Return an (assistant, user) message pair with one tool_use + tool_result."""
assistant_msg = {
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": tool_use_id,
"name": "get_weather",
"input": {"location": location},
}
],
}
user_msg = {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": result_text,
}
],
}
return assistant_msg, user_msg
def _make_history(n_pairs: int, result_filler: str = "x" * 200):
messages = [{"role": "user", "content": "Compare weather across cities."}]
for i in range(n_pairs):
assistant_msg, user_msg = _make_pair(
tool_use_id=f"toolu_{i:02d}",
result_text=f"Result {i}: {result_filler}",
location=f"City{i}",
)
messages.append(assistant_msg)
messages.append(user_msg)
return messages
def test_below_trigger_returns_unchanged():
"""If trigger threshold isn't exceeded, editor is a no-op."""
messages = _make_history(n_pairs=2)
original = deepcopy(messages)
new_messages, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "input_tokens", "value": 10_000_000},
"keep": {"type": "tool_uses", "value": 1},
},
)
assert applied is None
assert new_messages == original
def test_keep_preserves_most_recent_pairs():
"""With keep=2 and 5 pairs, the 3 oldest pairs are cleared."""
messages = _make_history(n_pairs=5)
new_messages, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 1},
"keep": {"type": "tool_uses", "value": 2},
},
)
assert applied is not None
assert applied["type"] == "clear_tool_uses_20250919"
assert applied["cleared_tool_uses"] == 3
# Tool results for the first 3 pairs should be the placeholder, last 2 untouched.
cleared_ids = {"toolu_00", "toolu_01", "toolu_02"}
kept_ids = {"toolu_03", "toolu_04"}
for msg in new_messages:
if msg.get("role") != "user":
continue
content = msg.get("content")
if not isinstance(content, list):
continue
for block in content:
if block.get("type") != "tool_result":
continue
if block["tool_use_id"] in cleared_ids:
assert block["content"] == CLEARED_TOOL_RESULT_PLACEHOLDER
elif block["tool_use_id"] in kept_ids:
assert "Result" in block["content"]
def test_tool_use_input_is_not_cleared():
"""clear_tool_inputs defaults to false — tool_use.input must remain intact."""
messages = _make_history(n_pairs=3)
new_messages, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 0},
"keep": {"type": "tool_uses", "value": 1},
},
)
assert applied is not None
# Every tool_use block still has its original `input`.
for msg in new_messages:
if msg.get("role") != "assistant":
continue
for block in msg.get("content", []):
if block.get("type") == "tool_use":
assert block["input"] == {"location": block["input"]["location"]}
assert block["input"]["location"].startswith("City")
def test_message_array_length_and_roles_preserved():
messages = _make_history(n_pairs=4)
original_roles = [m["role"] for m in messages]
new_messages, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 0},
"keep": {"type": "tool_uses", "value": 1},
},
)
assert applied is not None
assert len(new_messages) == len(messages)
assert [m["role"] for m in new_messages] == original_roles
def test_defaults_applied_when_knobs_omitted():
"""No trigger/keep specified — defaults are 100k input_tokens / 3 tool_uses."""
messages = _make_history(n_pairs=2)
# Below 100k tokens; should not fire.
new_messages, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={"type": "clear_tool_uses_20250919"},
)
assert applied is None
assert new_messages == messages
def test_tool_uses_trigger_variant():
"""Trigger by raw count of tool_use blocks, not tokens."""
messages = _make_history(n_pairs=4)
_, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 2},
"keep": {"type": "tool_uses", "value": 1},
},
)
assert applied is not None
# 4 total - 1 kept = 3 cleared
assert applied["cleared_tool_uses"] == 3
def test_cleared_input_tokens_is_nonnegative():
messages = _make_history(n_pairs=4)
_, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 1},
"keep": {"type": "tool_uses", "value": 1},
},
)
assert applied is not None
assert applied["cleared_input_tokens"] >= 0
def test_ignored_knobs_do_not_alter_behavior():
"""clear_at_least / exclude_tools / clear_tool_inputs are accepted but ignored in v0."""
messages = _make_history(n_pairs=3)
_, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 0},
"keep": {"type": "tool_uses", "value": 1},
"clear_at_least": {"type": "input_tokens", "value": 999_999_999},
"exclude_tools": ["get_weather"],
"clear_tool_inputs": True,
},
)
# Despite clear_at_least being huge, polyfill still applies (knob ignored).
# Despite clear_tool_inputs=True, inputs are NOT cleared (knob ignored).
assert applied is not None
assert applied["cleared_tool_uses"] == 2
# Ignored knobs surface as warnings on the AppliedEdit so operators can
# see what was dropped (the v0 polyfill silently dropping them at debug
# log level made misconfiguration invisible from the response).
assert set(applied.get("warnings", [])) == {
"clear_at_least_ignored",
"exclude_tools_ignored",
"clear_tool_inputs_ignored",
}
def test_no_ignored_knobs_omits_warnings_field():
"""When the caller doesn't pass any unsupported knobs, no ``warnings`` are added."""
messages = _make_history(n_pairs=3)
_, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 0},
"keep": {"type": "tool_uses", "value": 1},
},
)
assert applied is not None
assert "warnings" not in applied
def test_tool_result_list_content_shape_preserved():
"""When tool_result.content is a list of blocks, replacement returns a list shape."""
messages = [
{"role": "user", "content": "Hi"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_a", "name": "f", "input": {}}
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "toolu_a",
"content": [{"type": "text", "text": "huge result"}],
}
],
},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_b", "name": "f", "input": {}}
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "toolu_b",
"content": [{"type": "text", "text": "keep me"}],
}
],
},
]
new_messages, applied = apply_clear_tool_uses_20250919(
model=MODEL,
messages=messages,
tools=None,
system=None,
edit_spec={
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 0},
"keep": {"type": "tool_uses", "value": 1},
},
)
assert applied is not None
cleared_block = new_messages[2]["content"][0]
assert isinstance(cleared_block["content"], list)
assert cleared_block["content"][0]["type"] == "text"
assert cleared_block["content"][0]["text"] == CLEARED_TOOL_RESULT_PLACEHOLDER

View File

@ -0,0 +1,131 @@
"""
Unit tests for the context_management polyfill dispatcher.
"""
from litellm.llms.anthropic.experimental_pass_through.context_management import (
apply_context_management,
)
MODEL = "xai/grok-4"
def _history_with_two_tool_pairs():
return [
{"role": "user", "content": "Hi"},
{
"role": "assistant",
"content": [{"type": "tool_use", "id": "t1", "name": "f", "input": {}}],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "t1",
"content": "first result",
}
],
},
{
"role": "assistant",
"content": [{"type": "tool_use", "id": "t2", "name": "f", "input": {}}],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "t2",
"content": "second result",
}
],
},
]
async def test_unknown_edit_type_is_noop():
messages = _history_with_two_tool_pairs()
result = await apply_context_management(
model=MODEL,
messages=messages,
tools=None,
system=None,
context_management_spec={
"edits": [{"type": "totally_not_a_real_edit_20999999"}]
},
)
assert result.applied_edits == []
assert result.messages == messages
async def test_known_edit_is_applied():
messages = _history_with_two_tool_pairs()
result = await apply_context_management(
model=MODEL,
messages=messages,
tools=None,
system=None,
context_management_spec={
"edits": [
{
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 1},
"keep": {"type": "tool_uses", "value": 1},
}
]
},
)
assert len(result.applied_edits) == 1
assert result.applied_edits[0]["type"] == "clear_tool_uses_20250919"
assert result.applied_edits[0]["cleared_tool_uses"] == 1
async def test_mixed_known_unknown_only_known_applied():
messages = _history_with_two_tool_pairs()
result = await apply_context_management(
model=MODEL,
messages=messages,
tools=None,
system=None,
context_management_spec={
"edits": [
{"type": "unknown_foo"},
{
"type": "clear_tool_uses_20250919",
"trigger": {"type": "tool_uses", "value": 0},
"keep": {"type": "tool_uses", "value": 1},
},
{"type": "another_unknown"},
]
},
)
assert len(result.applied_edits) == 1
assert result.applied_edits[0]["type"] == "clear_tool_uses_20250919"
async def test_empty_or_missing_edits_list():
messages = _history_with_two_tool_pairs()
for spec in [{}, {"edits": None}, {"edits": []}, None]:
result = await apply_context_management(
model=MODEL,
messages=messages,
tools=None,
system=None,
context_management_spec=spec, # type: ignore[arg-type]
)
assert result.applied_edits == []
assert result.messages == messages
async def test_malformed_edit_entries_are_skipped():
"""Non-dict entries in `edits` list should be silently skipped."""
messages = _history_with_two_tool_pairs()
result = await apply_context_management(
model=MODEL,
messages=messages,
tools=None,
system=None,
context_management_spec={"edits": ["not a dict", 42, None, {"type": None}]},
)
assert result.applied_edits == []
assert result.messages == messages

View File

@ -0,0 +1,114 @@
"""Bedrock Converse context_management forwarding (compact_20260112 only)."""
from litellm.llms.bedrock.chat.converse_transformation import AmazonConverseConfig
CLAUDE_MODEL = "anthropic.claude-opus-4-7-20250115-v1:0"
def test_supported_params_include_context_management_for_anthropic():
cfg = AmazonConverseConfig()
params = cfg.get_supported_openai_params(CLAUDE_MODEL)
assert "context_management" in params
def test_supported_params_exclude_context_management_for_non_anthropic():
cfg = AmazonConverseConfig()
params = cfg.get_supported_openai_params("meta.llama3-70b-instruct-v1:0")
assert "context_management" not in params
def test_map_openai_params_forwards_anthropic_shape():
cfg = AmazonConverseConfig()
optional_params: dict = {}
cfg.map_openai_params(
non_default_params={
"context_management": {"edits": [{"type": "compact_20260112"}]}
},
optional_params=optional_params,
model=CLAUDE_MODEL,
drop_params=False,
)
assert optional_params.get("context_management") == {
"edits": [{"type": "compact_20260112"}]
}
def test_map_openai_params_normalizes_openai_list_shape():
"""OpenAI Responses-API style list of {type: "compaction"} normalizes to Anthropic dict."""
cfg = AmazonConverseConfig()
optional_params: dict = {}
cfg.map_openai_params(
non_default_params={"context_management": [{"type": "compaction"}]},
optional_params=optional_params,
model=CLAUDE_MODEL,
drop_params=False,
)
forwarded = optional_params.get("context_management")
assert isinstance(forwarded, dict)
edits = forwarded.get("edits")
assert isinstance(edits, list) and len(edits) == 1
assert edits[0].get("type") == "compact_20260112"
def test_filter_keeps_only_compact_edits_and_adds_beta_header():
additional = {
"context_management": {
"edits": [
{"type": "clear_tool_uses_20250919"},
{"type": "compact_20260112"},
{"type": "clear_thinking_20251015"},
]
}
}
betas: list = []
AmazonConverseConfig._filter_context_management_for_bedrock_converse(
additional, betas
)
assert additional["context_management"]["edits"] == [{"type": "compact_20260112"}]
assert "compact-2026-01-12" in betas
def test_filter_drops_field_when_no_compact_edit_remains():
additional = {
"context_management": {
"edits": [
{"type": "clear_tool_uses_20250919"},
{"type": "clear_thinking_20251015"},
]
}
}
betas: list = []
AmazonConverseConfig._filter_context_management_for_bedrock_converse(
additional, betas
)
assert "context_management" not in additional
assert betas == []
def test_filter_is_noop_when_field_absent():
additional: dict = {}
betas: list = []
AmazonConverseConfig._filter_context_management_for_bedrock_converse(
additional, betas
)
assert additional == {}
assert betas == []
def test_filter_drops_malformed_edits_list():
additional = {"context_management": {"edits": "not a list"}}
betas: list = []
AmazonConverseConfig._filter_context_management_for_bedrock_converse(
additional, betas
)
assert "context_management" not in additional
assert betas == []
def test_filter_does_not_duplicate_beta_header():
additional = {"context_management": {"edits": [{"type": "compact_20260112"}]}}
betas: list = ["compact-2026-01-12"]
AmazonConverseConfig._filter_context_management_for_bedrock_converse(
additional, betas
)
assert betas.count("compact-2026-01-12") == 1