litellm/enterprise/litellm_enterprise/enterprise_callbacks/pagerduty/pagerduty.py
Krrish Dholakia f42ffed2bd
Litellm oss staging 04 02 2026 p1 (#25055)
* fix(vertex_ai): support pluggable (executable) credential_source for WIF auth (#24700)

The WIF credential dispatch in load_auth() only handled identity_pool and
aws credential types. When credential_source.executable was present (used
for Azure Managed Identity via Workload Identity Federation), it fell
through to identity_pool.Credentials which rejected it with MalformedError.

Add dispatch to google.auth.pluggable.Credentials for executable-type
credential sources, following the same pattern as the existing identity_pool
and aws helpers.

Fixes authentication for Azure Container Apps → GCP Vertex AI via WIF
with executable credential sources.

* feat(logging): add component and logger fields to JSON logs for 3rd p… (#24447)

* feat(logging): add component and logger fields to JSON logs for 3rd party filtering

* Let user-supplied extra fields win over auto-generated component/logger, tighten test assertions

* Feat - Add organization into the metrics metadata for org_id & org_alias (#24440)

* Add org_id and org_alias label names to Prometheus metric definitions

* Add user_api_key_org_alias to StandardLoggingUserAPIKeyMetadata

* Populate user_api_key_org_alias in pre-call metadata

* Pass org_id and org_alias into per-request Prometheus metric labels

* Add test for org labels on per-request Prometheus metrics

* chore: resolve test mockdata

* Address review: populate org_alias from DB view, add feature flag, use .get() for org metadata

* Add org labels to failure path and verify flag behavior in test

* Fix test: build flag-off enum_values without org fields

* Gate org labels behind feature flag in get_labels() instead of static metric lists

* Scope org label injection to metrics that carry team context, remove orphaned budget label defs, add test teardown

* Use explicit metric allowlist for org label injection instead of team heuristic

* Fix duplicate org label guard, move _org_label_metrics to class constant

* Reset custom_prometheus_metadata_labels after duplicate label assertion

* fix: emit org labels by default, remove flag, fix missing org_alias in all metadata paths

* fix: emit org labels by default, no opt-in flag required

* fix: write org_alias to metadata unconditionally in proxy_server.py

* fix: 429s from batch creation being converted to 500 (#24703)

* add us gov models (#24660)

* add us gov models

* added max tokens

* Litellm dev 04 02 2026 p1 (#25052)

* fix: replace hardcoded url

* fix: Anthropic web search cost not tracked for Chat Completions

The ModelResponse branch in response_object_includes_web_search_call()
only checked url_citation annotations and prompt_tokens_details, missing
Anthropic's server_tool_use.web_search_requests field. This caused
_handle_web_search_cost() to never fire for Anthropic Claude models.

Also routes vertex_ai/claude-* models to the Anthropic cost calculator
instead of the Gemini one, since Claude on Vertex uses the same
server_tool_use billing structure as the direct Anthropic API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* fix(anthropic): pass logging_obj to client.post for litellm_overhead_time_ms (#24071)

When LITELLM_DETAILED_TIMING=true, litellm_overhead_time_ms was null for
Anthropic because the handler did not pass logging_obj to client.post(),
so track_llm_api_timing could not set llm_api_duration_ms. Pass
logging_obj=logging_obj at all four post() call sites (make_call,
make_sync_call, acompletion, completion). Add test to ensure make_call
passes logging_obj to client.post.

Made-with: Cursor

* sap - add additional parameters for grounding

- additional parameter for grounding added for the sap provider

* sap - fix models

* (sap) add filtering, masking, translation SAP GEN AI Hub modules

* (sap) add tests and docs for new SAP modules

* (sap) add support of multiple modules config

* (sap) code refactoring

* (sap) rename file

* test(): add safeguard tests

* (sap) update tests

* (sap) update docs, solve merge conflict in transformation.py

* (sap) linter fix

* (sap) Align embedding request transformation with current API

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) mock commit

* (sap) run black formater

* (sap) add literals to models, add negative tests, fix test for tool transformation

* (sap) fix formating

* (sap) fix models

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) commit for rerun bot review

* (sap) minor improve

* (sap) fix after bot review

* (sap) lint fix

* docs(sap): update documentation

* fix(sap): change creds priority

* fix(sap): change creds priority

* fix(sap): fix sap creds unit test

* fix(sap): linter fix

* fix(sap): linter fix

* linter fix

* (sap) update logic of fetching creds, add additional tests

* (sap) clean up code

* (sap) fix after review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) add a possibility to put the service key by both variants

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) update test

* (sap) update service key resolve function

* (sap) run black formater

* (sap) fix validate credentials, add negative tests for credential fetching

* (sap) fix validate credentials, add negative tests for credential fetching

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) fix after bot review

* (sap) lint fix

* (sap) lint fix

* feat: support service_tier in gemini

* chore: add a service_tier field mapping from openai to gemini

* fix: use x-gemini-service-tier header in response

* docs: add service_tier to gemini docs

* chore: add defaut/standard mapping, and some tests

* chore: tidying up some case insensitivity

* chore: remove unnecessary guard

* fix: remove redundant test file

* fix: handle 'auto' case-insensitively

* fix: return service_tier on final steamed chunk

* chore: black

* feat: enable supports_service_tier to gemini models

* Fix get_standard_logging_metadata tests

* Fix test_get_model_info_bedrock_models

* Fix test_get_model_info_bedrock_models

* Fix remaining tests

* Fix mypy issues

* Fix tests

* Fix merge conflicts

* Fix code qa

* Fix code qa

* Fix code qa

* Fix greptile review

---------

Co-authored-by: michelligabriele <gabriele.michelli@icloud.com>
Co-authored-by: Josh <36064836+J-Byron@users.noreply.github.com>
Co-authored-by: mubashir1osmani <mubashir.osmani777@gmail.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: milan-berri <milan@berri.ai>
Co-authored-by: Alperen Kömürcü <alperen.koemuercue@sap.com>
Co-authored-by: Vasilisa Parshikova <vasilisa.parshikova@sap.com>
Co-authored-by: Lin Xu <lin.xu03@sap.com>
Co-authored-by: Mark McDonald <macd@google.com>
Co-authored-by: Sameer Kankute <sameer@berri.ai>
2026-04-08 21:37:10 -07:00

316 lines
13 KiB
Python

"""
PagerDuty Alerting Integration
Handles two types of alerts:
- High LLM API Failure Rate. Configure X fails in Y seconds to trigger an alert.
- High Number of Hanging LLM Requests. Configure X hangs in Y seconds to trigger an alert.
Note: This is a Free feature on the regular litellm docker image.
However, this is under the enterprise license
"""
import asyncio
import os
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Union
from litellm._logging import verbose_logger
from litellm.caching import DualCache
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler,
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.proxy._types import UserAPIKeyAuth
from litellm.types.integrations.pagerduty import (
AlertingConfig,
PagerDutyInternalEvent,
PagerDutyPayload,
PagerDutyRequestBody,
)
from litellm.types.utils import (
CallTypesLiteral,
StandardLoggingPayload,
StandardLoggingPayloadErrorInformation,
)
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD = 60
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS = 60
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS = 60
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS = 600
class PagerDutyAlerting(SlackAlerting):
"""
Tracks failed requests and hanging requests separately.
If threshold is crossed for either type, triggers a PagerDuty alert.
"""
def __init__(
self, alerting_args: Optional[Union[AlertingConfig, dict]] = None, **kwargs
):
super().__init__()
_api_key = os.getenv("PAGERDUTY_API_KEY")
if not _api_key:
raise ValueError("PAGERDUTY_API_KEY is not set")
self.api_key: str = _api_key
alerting_args = alerting_args or {}
self.pagerduty_alerting_args: AlertingConfig = AlertingConfig(
failure_threshold=alerting_args.get(
"failure_threshold", PAGERDUTY_DEFAULT_FAILURE_THRESHOLD
),
failure_threshold_window_seconds=alerting_args.get(
"failure_threshold_window_seconds",
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS,
),
hanging_threshold_seconds=alerting_args.get(
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
),
hanging_threshold_window_seconds=alerting_args.get(
"hanging_threshold_window_seconds",
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
),
)
# Separate storage for failures vs. hangs
self._failure_events: List[PagerDutyInternalEvent] = []
self._hanging_events: List[PagerDutyInternalEvent] = []
# ------------------ MAIN LOGIC ------------------ #
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""
Record a failure event. Only send an alert to PagerDuty if the
configured *failure* threshold is exceeded in the specified window.
"""
now = datetime.now(timezone.utc)
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object"
)
if not standard_logging_payload:
raise ValueError(
"standard_logging_object is required for PagerDutyAlerting"
)
# Extract error details
error_info: Optional[StandardLoggingPayloadErrorInformation] = (
standard_logging_payload.get("error_information") or {}
)
_meta = standard_logging_payload.get("metadata") or {}
self._failure_events.append(
PagerDutyInternalEvent(
failure_event_type="failed_response",
timestamp=now,
error_class=error_info.get("error_class"),
error_code=error_info.get("error_code"),
error_llm_provider=error_info.get("llm_provider"),
user_api_key_hash=_meta.get("user_api_key_hash"),
user_api_key_alias=_meta.get("user_api_key_alias"),
user_api_key_spend=_meta.get("user_api_key_spend"),
user_api_key_max_budget=_meta.get("user_api_key_max_budget"),
user_api_key_budget_reset_at=_meta.get("user_api_key_budget_reset_at"),
user_api_key_org_id=_meta.get("user_api_key_org_id"),
user_api_key_org_alias=_meta.get("user_api_key_org_alias"),
user_api_key_team_id=_meta.get("user_api_key_team_id"),
user_api_key_project_id=_meta.get("user_api_key_project_id"),
user_api_key_project_alias=_meta.get("user_api_key_project_alias"),
user_api_key_user_id=_meta.get("user_api_key_user_id"),
user_api_key_team_alias=_meta.get("user_api_key_team_alias"),
user_api_key_end_user_id=_meta.get("user_api_key_end_user_id"),
user_api_key_user_email=_meta.get("user_api_key_user_email"),
user_api_key_request_route=_meta.get("user_api_key_request_route"),
user_api_key_auth_metadata=_meta.get("user_api_key_auth_metadata"),
)
)
# Prune + Possibly alert
window_seconds = self.pagerduty_alerting_args.get(
"failure_threshold_window_seconds", 60
)
threshold = self.pagerduty_alerting_args.get("failure_threshold", 1)
# If threshold is crossed, send PD alert for failures
await self._send_alert_if_thresholds_crossed(
events=self._failure_events,
window_seconds=window_seconds,
threshold=threshold,
alert_prefix="High LLM API Failure Rate",
)
async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: CallTypesLiteral,
) -> Optional[Union[Exception, str, dict]]:
"""
Example of detecting hanging requests by waiting a given threshold.
If the request didn't finish by then, we treat it as 'hanging'.
"""
verbose_logger.info("Inside Proxy Logging Pre-call hook!")
asyncio.create_task(
self.hanging_response_handler(
request_data=data, user_api_key_dict=user_api_key_dict
)
)
return None
async def hanging_response_handler(
self, request_data: Optional[dict], user_api_key_dict: UserAPIKeyAuth
):
"""
Checks if request completed by the time 'hanging_threshold_seconds' elapses.
If not, we classify it as a hanging request.
"""
verbose_logger.debug(
f"Inside Hanging Response Handler!..sleeping for {self.pagerduty_alerting_args.get('hanging_threshold_seconds', PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS)} seconds"
)
await asyncio.sleep(
self.pagerduty_alerting_args.get(
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
)
)
if await self._request_is_completed(request_data=request_data):
return # It's not hanging if completed
# Otherwise, record it as hanging
self._hanging_events.append(
PagerDutyInternalEvent(
failure_event_type="hanging_response",
timestamp=datetime.now(timezone.utc),
error_class="HangingRequest",
error_code="HangingRequest",
error_llm_provider="HangingRequest",
user_api_key_hash=user_api_key_dict.api_key,
user_api_key_alias=user_api_key_dict.key_alias,
user_api_key_spend=user_api_key_dict.spend,
user_api_key_max_budget=user_api_key_dict.max_budget,
user_api_key_budget_reset_at=(
user_api_key_dict.budget_reset_at.isoformat()
if user_api_key_dict.budget_reset_at
else None
),
user_api_key_org_id=user_api_key_dict.org_id,
user_api_key_org_alias=user_api_key_dict.organization_alias,
user_api_key_team_id=user_api_key_dict.team_id,
user_api_key_project_id=user_api_key_dict.project_id,
user_api_key_project_alias=user_api_key_dict.project_alias,
user_api_key_user_id=user_api_key_dict.user_id,
user_api_key_team_alias=user_api_key_dict.team_alias,
user_api_key_end_user_id=user_api_key_dict.end_user_id,
user_api_key_user_email=user_api_key_dict.user_email,
user_api_key_request_route=user_api_key_dict.request_route,
user_api_key_auth_metadata=user_api_key_dict.metadata,
)
)
# Prune + Possibly alert
window_seconds = self.pagerduty_alerting_args.get(
"hanging_threshold_window_seconds",
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
)
threshold: int = self.pagerduty_alerting_args.get(
"hanging_threshold_fails", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
)
# If threshold is crossed, send PD alert for hangs
await self._send_alert_if_thresholds_crossed(
events=self._hanging_events,
window_seconds=window_seconds,
threshold=threshold,
alert_prefix="High Number of Hanging LLM Requests",
)
# ------------------ HELPERS ------------------ #
async def _send_alert_if_thresholds_crossed(
self,
events: List[PagerDutyInternalEvent],
window_seconds: int,
threshold: int,
alert_prefix: str,
):
"""
1. Prune old events
2. If threshold is reached, build alert, send to PagerDuty
3. Clear those events
"""
cutoff = datetime.now(timezone.utc) - timedelta(seconds=window_seconds)
pruned = [e for e in events if e.get("timestamp", datetime.min) > cutoff]
# Update the reference list
events.clear()
events.extend(pruned)
# Check threshold
verbose_logger.debug(
f"Have {len(events)} events in the last {window_seconds} seconds. Threshold is {threshold}"
)
if len(events) >= threshold:
# Build short summary of last N events
error_summaries = self._build_error_summaries(events, max_errors=5)
alert_message = (
f"{alert_prefix}: {len(events)} in the last {window_seconds} seconds."
)
custom_details = {"recent_errors": error_summaries}
await self.send_alert_to_pagerduty(
alert_message=alert_message,
custom_details=custom_details,
)
# Clear them after sending an alert, so we don't spam
events.clear()
def _build_error_summaries(
self, events: List[PagerDutyInternalEvent], max_errors: int = 5
) -> List[PagerDutyInternalEvent]:
"""
Build short text summaries for the last `max_errors`.
Example: "ValueError (code: 500, provider: openai)"
"""
recent = events[-max_errors:]
summaries = []
for fe in recent:
# If any of these is None, show "N/A" to avoid messing up the summary string
fe.pop("timestamp")
summaries.append(fe)
return summaries
async def send_alert_to_pagerduty(self, alert_message: str, custom_details: dict):
"""
Send [critical] Alert to PagerDuty
https://developer.pagerduty.com/api-reference/YXBpOjI3NDgyNjU-pager-duty-v2-events-api
"""
try:
verbose_logger.debug(f"Sending alert to PagerDuty: {alert_message}")
async_client: AsyncHTTPHandler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
payload: PagerDutyRequestBody = PagerDutyRequestBody(
payload=PagerDutyPayload(
summary=alert_message,
severity="critical",
source="LiteLLM Alert",
component="LiteLLM",
custom_details=custom_details,
),
routing_key=self.api_key,
event_action="trigger",
)
return await async_client.post(
url="https://events.pagerduty.com/v2/enqueue",
json=dict(payload),
headers={"Content-Type": "application/json"},
)
except Exception as e:
verbose_logger.exception(f"Error sending alert to PagerDuty: {e}")