chore(callbacks): guard dynamic integration hosts

This commit is contained in:
user 2026-04-30 14:27:19 -07:00
parent 256e05e474
commit 15d4d51453
9 changed files with 399 additions and 51 deletions

View File

@ -90,6 +90,29 @@ def _extract_cache_read_input_tokens(usage_obj) -> int:
return cache_read_input_tokens
def resolve_langfuse_credentials(
langfuse_public_key=None,
langfuse_secret=None,
langfuse_secret_key=None,
langfuse_host=None,
allow_env_credentials: bool = True,
):
if allow_env_credentials is False and langfuse_host is not None:
secret_key = langfuse_secret or langfuse_secret_key
public_key = langfuse_public_key
else:
secret_key = (
langfuse_secret or langfuse_secret_key or os.getenv("LANGFUSE_SECRET_KEY")
)
public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
resolved_host = langfuse_host or os.getenv(
"LANGFUSE_HOST", "https://cloud.langfuse.com"
)
return public_key, secret_key, resolved_host
class LangFuseLogger:
# Class variables or attributes
def __init__(
@ -98,6 +121,7 @@ class LangFuseLogger:
langfuse_secret=None,
langfuse_host=None,
flush_interval=1,
allow_env_credentials: bool = True,
):
try:
import langfuse
@ -106,11 +130,13 @@ class LangFuseLogger:
raise Exception(
f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n{traceback.format_exc()}\033[0m"
)
# Instance variables
self.secret_key = langfuse_secret or os.getenv("LANGFUSE_SECRET_KEY")
self.public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
self.langfuse_host = langfuse_host or os.getenv(
"LANGFUSE_HOST", "https://cloud.langfuse.com"
self.public_key, self.secret_key, self.langfuse_host = (
resolve_langfuse_credentials(
langfuse_public_key=langfuse_public_key,
langfuse_secret=langfuse_secret,
langfuse_host=langfuse_host,
allow_env_credentials=allow_env_credentials,
)
)
if not (
self.langfuse_host.startswith("http://")

View File

@ -117,6 +117,7 @@ class LangFuseHandler:
langfuse_public_key=credentials.get("langfuse_public_key"),
langfuse_secret=credentials.get("langfuse_secret"),
langfuse_host=credentials.get("langfuse_host"),
allow_env_credentials=credentials.get("langfuse_host") is None,
)
in_memory_dynamic_logger_cache.set_cache(
credentials=credentials,

View File

@ -20,7 +20,7 @@ from ...litellm_core_utils.specialty_caches.dynamic_logging_cache import (
DynamicLoggingCache,
)
from ..prompt_management_base import PromptManagementBase
from .langfuse import LangFuseLogger
from .langfuse import LangFuseLogger, resolve_langfuse_credentials
from .langfuse_handler import LangFuseHandler
if TYPE_CHECKING:
@ -46,6 +46,7 @@ def langfuse_client_init(
langfuse_secret_key=None,
langfuse_host=None,
flush_interval=1,
allow_env_credentials: bool = True,
) -> LangfuseClass:
"""
Initialize Langfuse client with caching to prevent multiple initializations.
@ -70,14 +71,12 @@ def langfuse_client_init(
f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n\033[0m"
)
# Instance variables
secret_key = (
langfuse_secret or langfuse_secret_key or os.getenv("LANGFUSE_SECRET_KEY")
)
public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
langfuse_host = langfuse_host or os.getenv(
"LANGFUSE_HOST", "https://cloud.langfuse.com"
public_key, secret_key, langfuse_host = resolve_langfuse_credentials(
langfuse_public_key=langfuse_public_key,
langfuse_secret=langfuse_secret,
langfuse_secret_key=langfuse_secret_key,
langfuse_host=langfuse_host,
allow_env_credentials=allow_env_credentials,
)
if not (
@ -222,6 +221,7 @@ class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogge
langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
langfuse_host=dynamic_callback_params.get("langfuse_host"),
allow_env_credentials=dynamic_callback_params.get("langfuse_host") is None,
)
langfuse_prompt_client = self._get_prompt_from_id(
langfuse_prompt_id=prompt_id,
@ -246,6 +246,7 @@ class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogge
langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
langfuse_host=dynamic_callback_params.get("langfuse_host"),
allow_env_credentials=dynamic_callback_params.get("langfuse_host") is None,
)
langfuse_prompt_client = self._get_prompt_from_id(
langfuse_prompt_id=prompt_id,

View File

@ -112,17 +112,28 @@ class LangsmithLogger(CustomBatchLogger):
langsmith_project: Optional[str] = None,
langsmith_base_url: Optional[str] = None,
langsmith_tenant_id: Optional[str] = None,
allow_env_credentials: bool = True,
) -> LangsmithCredentialsObject:
_credentials_api_key = langsmith_api_key or os.getenv("LANGSMITH_API_KEY")
_credentials_project = (
langsmith_project or os.getenv("LANGSMITH_PROJECT") or "litellm-completion"
)
_credentials_base_url = (
langsmith_base_url
or os.getenv("LANGSMITH_BASE_URL")
or "https://api.smith.langchain.com"
)
_credentials_tenant_id = langsmith_tenant_id or os.getenv("LANGSMITH_TENANT_ID")
if allow_env_credentials is False and langsmith_base_url is not None:
_credentials_api_key = langsmith_api_key
_credentials_project = langsmith_project or "litellm-completion"
_credentials_base_url = langsmith_base_url
_credentials_tenant_id = langsmith_tenant_id
else:
_credentials_api_key = langsmith_api_key or os.getenv("LANGSMITH_API_KEY")
_credentials_project = (
langsmith_project
or os.getenv("LANGSMITH_PROJECT")
or "litellm-completion"
)
_credentials_base_url = (
langsmith_base_url
or os.getenv("LANGSMITH_BASE_URL")
or "https://api.smith.langchain.com"
)
_credentials_tenant_id = langsmith_tenant_id or os.getenv(
"LANGSMITH_TENANT_ID"
)
return LangsmithCredentialsObject(
LANGSMITH_API_KEY=_credentials_api_key,
@ -540,6 +551,10 @@ class LangsmithLogger(CustomBatchLogger):
langsmith_tenant_id=standard_callback_dynamic_params.get(
"langsmith_tenant_id", None
),
allow_env_credentials=standard_callback_dynamic_params.get(
"langsmith_base_url", None
)
is None,
)
else:
credentials = self.default_credentials

View File

@ -3242,10 +3242,15 @@ class Logging(LiteLLMLoggingBaseClass):
),
langfuse_secret=self.standard_callback_dynamic_params.get(
"langfuse_secret"
),
)
or self.standard_callback_dynamic_params.get("langfuse_secret_key"),
langfuse_host=self.standard_callback_dynamic_params.get(
"langfuse_host"
),
allow_env_credentials=self.standard_callback_dynamic_params.get(
"langfuse_host"
)
is None,
)
return langFuseLogger

View File

@ -12,11 +12,13 @@ import base64
import os
from base64 import b64encode
from typing import Optional
from urllib.parse import unquote
import httpx
from fastapi import APIRouter, Request, Response
from fastapi import APIRouter, HTTPException, Request, Response, status
import litellm
from litellm.litellm_core_utils.url_utils import SSRFError, validate_url
from litellm.proxy._types import *
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.common_utils.http_parsing_utils import _safe_get_request_headers
@ -27,6 +29,7 @@ from litellm.proxy.pass_through_endpoints.pass_through_endpoints import (
router = APIRouter()
default_vertex_config = None
_DEFAULT_LANGFUSE_HOST = "https://cloud.langfuse.com"
def create_request_copy(request: Request):
@ -39,6 +42,116 @@ def create_request_copy(request: Request):
}
def _decode_to_convergence(value: str) -> str:
previous = value
while True:
decoded = unquote(previous)
if decoded == previous:
return decoded
previous = decoded
def _normalize_langfuse_base_url(base_target_url: str) -> str:
if not (
base_target_url.startswith("http://") or base_target_url.startswith("https://")
):
# Existing behavior allows host-only Langfuse settings.
base_target_url = "http://" + base_target_url
try:
base_url = httpx.URL(base_target_url)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": f"Invalid Langfuse host: {str(e)}"},
)
if base_url.scheme not in ("http", "https") or not base_url.host:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "Invalid Langfuse host"},
)
if base_url.userinfo:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "Langfuse host must not include credentials"},
)
return str(base_url)
def _validate_langfuse_proxy_path(endpoint: str) -> str:
decoded_endpoint = _decode_to_convergence(endpoint)
if any(ord(char) < 32 for char in decoded_endpoint):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "Invalid Langfuse endpoint path"},
)
if "\\" in decoded_endpoint or decoded_endpoint.startswith("//"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "Invalid Langfuse endpoint path"},
)
endpoint_path = "/" + decoded_endpoint.lstrip("/")
if any(segment in (".", "..") for segment in endpoint_path.split("/")):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "Invalid Langfuse endpoint path"},
)
return endpoint_path
def _get_langfuse_proxy_credentials(
*,
dynamic_host_supplied: bool,
dynamic_langfuse_public_key: Optional[str],
dynamic_langfuse_secret_key: Optional[str],
):
if dynamic_host_supplied:
if not dynamic_langfuse_public_key or not dynamic_langfuse_secret_key:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "Dynamic Langfuse hosts must include dynamic Langfuse credentials"
},
)
return dynamic_langfuse_public_key, dynamic_langfuse_secret_key
return (
dynamic_langfuse_public_key
or litellm.utils.get_secret(secret_name="LANGFUSE_PUBLIC_KEY"),
dynamic_langfuse_secret_key
or litellm.utils.get_secret(secret_name="LANGFUSE_SECRET_KEY"),
)
def _build_langfuse_proxy_target(
*,
endpoint: str,
base_target_url: str,
dynamic_host_supplied: bool,
):
endpoint_path = _validate_langfuse_proxy_path(endpoint)
base_url = httpx.URL(_normalize_langfuse_base_url(base_target_url))
updated_url = base_url.copy_with(path=endpoint_path)
custom_headers = {}
if dynamic_host_supplied and getattr(litellm, "user_url_validation", True):
try:
target_url, host_header = validate_url(str(updated_url))
except SSRFError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": f"Invalid Langfuse host: {str(e)}"},
)
custom_headers["Host"] = host_header
return target_url, custom_headers
return str(updated_url), custom_headers
@router.api_route(
"/langfuse/{endpoint:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
@ -91,44 +204,33 @@ async def langfuse_proxy_route(
elif k == "langfuse_host":
dynamic_langfuse_host = v
dynamic_host_supplied = dynamic_langfuse_host is not None
base_target_url: str = (
dynamic_langfuse_host
or os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
or "https://cloud.langfuse.com"
or os.getenv("LANGFUSE_HOST", _DEFAULT_LANGFUSE_HOST)
or _DEFAULT_LANGFUSE_HOST
)
if not (
base_target_url.startswith("http://") or base_target_url.startswith("https://")
):
# add http:// if unset, assume communicating over private network - e.g. render
base_target_url = "http://" + base_target_url
encoded_endpoint = httpx.URL(endpoint).path
# Ensure endpoint starts with '/' for proper URL construction
if not encoded_endpoint.startswith("/"):
encoded_endpoint = "/" + encoded_endpoint
# Construct the full target URL using httpx
base_url = httpx.URL(base_target_url)
updated_url = base_url.copy_with(path=encoded_endpoint)
# Add or update query parameters
langfuse_public_key = dynamic_langfuse_public_key or litellm.utils.get_secret(
secret_name="LANGFUSE_PUBLIC_KEY"
langfuse_public_key, langfuse_secret_key = _get_langfuse_proxy_credentials(
dynamic_host_supplied=dynamic_host_supplied,
dynamic_langfuse_public_key=dynamic_langfuse_public_key,
dynamic_langfuse_secret_key=dynamic_langfuse_secret_key,
)
langfuse_secret_key = dynamic_langfuse_secret_key or litellm.utils.get_secret(
secret_name="LANGFUSE_SECRET_KEY"
target_url, target_headers = _build_langfuse_proxy_target(
endpoint=endpoint,
base_target_url=base_target_url,
dynamic_host_supplied=dynamic_host_supplied,
)
langfuse_combined_key = "Basic " + b64encode(
f"{langfuse_public_key}:{langfuse_secret_key}".encode("utf-8")
).decode("ascii")
target_headers["Authorization"] = langfuse_combined_key
## CREATE PASS-THROUGH
endpoint_func = create_pass_through_route(
endpoint=endpoint,
target=str(updated_url),
custom_headers={"Authorization": langfuse_combined_key},
target=target_url,
custom_headers=target_headers,
query_params=dict(request.query_params), # type: ignore
) # dynamically construct pass-through endpoint based on incoming path
received_value = await endpoint_func(

View File

@ -0,0 +1,46 @@
from litellm.integrations.langfuse.langfuse import resolve_langfuse_credentials
def test_resolve_langfuse_credentials_does_not_use_env_for_dynamic_host(monkeypatch):
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "global-public")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "global-secret")
public_key, secret_key, host = resolve_langfuse_credentials(
langfuse_host="https://attacker.example",
allow_env_credentials=False,
)
assert public_key is None
assert secret_key is None
assert host == "https://attacker.example"
def test_resolve_langfuse_credentials_accepts_secret_key_alias_for_dynamic_host(
monkeypatch,
):
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "global-secret")
public_key, secret_key, host = resolve_langfuse_credentials(
langfuse_public_key="dynamic-public",
langfuse_secret_key="dynamic-secret",
langfuse_host="https://team-langfuse.example",
allow_env_credentials=False,
)
assert public_key == "dynamic-public"
assert secret_key == "dynamic-secret"
assert host == "https://team-langfuse.example"
def test_resolve_langfuse_credentials_keeps_env_for_global_config(monkeypatch):
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "global-public")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "global-secret")
public_key, secret_key, host = resolve_langfuse_credentials(
langfuse_host="https://admin-configured.example",
allow_env_credentials=True,
)
assert public_key == "global-public"
assert secret_key == "global-secret"
assert host == "https://admin-configured.example"

View File

@ -0,0 +1,50 @@
import pytest
from litellm.integrations.langsmith import LangsmithLogger
@pytest.mark.asyncio
async def test_get_credentials_from_env_does_not_use_env_for_dynamic_base_url(
monkeypatch,
):
monkeypatch.setenv("LANGSMITH_API_KEY", "global-key")
monkeypatch.setenv("LANGSMITH_PROJECT", "global-project")
monkeypatch.setenv("LANGSMITH_TENANT_ID", "global-tenant")
logger = LangsmithLogger(
langsmith_api_key="default-key",
langsmith_project="default-project",
langsmith_base_url="https://default.example",
)
credentials = logger.get_credentials_from_env(
langsmith_base_url="https://attacker.example",
allow_env_credentials=False,
)
assert credentials["LANGSMITH_API_KEY"] is None
assert credentials["LANGSMITH_PROJECT"] == "litellm-completion"
assert credentials["LANGSMITH_BASE_URL"] == "https://attacker.example"
assert credentials["LANGSMITH_TENANT_ID"] is None
@pytest.mark.asyncio
async def test_dynamic_langsmith_base_url_does_not_inherit_default_api_key(
monkeypatch,
):
monkeypatch.setenv("LANGSMITH_API_KEY", "global-key")
logger = LangsmithLogger(
langsmith_api_key="default-key",
langsmith_project="default-project",
langsmith_base_url="https://default.example",
)
credentials = logger._get_credentials_to_use_for_request(
kwargs={
"standard_callback_dynamic_params": {
"langsmith_base_url": "https://attacker.example"
}
}
)
assert credentials["LANGSMITH_API_KEY"] is None
assert credentials["LANGSMITH_BASE_URL"] == "https://attacker.example"

View File

@ -0,0 +1,102 @@
import socket
import pytest
from fastapi import HTTPException
import litellm
from litellm.proxy.vertex_ai_endpoints.langfuse_endpoints import (
_build_langfuse_proxy_target,
_get_langfuse_proxy_credentials,
)
def test_dynamic_langfuse_host_requires_dynamic_credentials(monkeypatch):
monkeypatch.setattr(litellm, "user_url_validation", True, raising=False)
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "global-public")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "global-secret")
with pytest.raises(HTTPException) as exc:
_get_langfuse_proxy_credentials(
dynamic_host_supplied=True,
dynamic_langfuse_public_key=None,
dynamic_langfuse_secret_key=None,
)
assert exc.value.status_code == 400
def test_global_langfuse_host_can_use_env_credentials(monkeypatch):
monkeypatch.setenv("LANGFUSE_PUBLIC_KEY", "global-public")
monkeypatch.setenv("LANGFUSE_SECRET_KEY", "global-secret")
public_key, secret_key = _get_langfuse_proxy_credentials(
dynamic_host_supplied=False,
dynamic_langfuse_public_key=None,
dynamic_langfuse_secret_key=None,
)
assert public_key == "global-public"
assert secret_key == "global-secret"
@pytest.mark.parametrize(
"endpoint",
[
"../api/public/projects",
"%2e%2e/api/public/projects",
"%252e%252e%252fapi/public/projects",
"api\\public\\projects",
"%2f%2fattacker.example/api",
],
)
def test_langfuse_proxy_target_rejects_traversal_paths(endpoint):
with pytest.raises(HTTPException) as exc:
_build_langfuse_proxy_target(
endpoint=endpoint,
base_target_url="https://cloud.langfuse.com",
dynamic_host_supplied=False,
)
assert exc.value.status_code == 400
def test_dynamic_langfuse_proxy_target_rejects_internal_host(monkeypatch):
monkeypatch.setattr(litellm, "user_url_validation", True, raising=False)
with pytest.raises(HTTPException) as exc:
_build_langfuse_proxy_target(
endpoint="api/public/projects",
base_target_url="http://127.0.0.1:3000",
dynamic_host_supplied=True,
)
assert exc.value.status_code == 400
def test_dynamic_langfuse_proxy_target_preserves_host_header_for_http(monkeypatch):
monkeypatch.setattr(litellm, "user_url_validation", True, raising=False)
def fake_getaddrinfo(host, port, proto):
assert host == "langfuse.example"
assert port == 80
assert proto == socket.IPPROTO_TCP
return [
(
socket.AF_INET,
socket.SOCK_STREAM,
socket.IPPROTO_TCP,
"",
("8.8.8.8", 80),
)
]
monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo)
target_url, headers = _build_langfuse_proxy_target(
endpoint="api/public/projects",
base_target_url="http://langfuse.example",
dynamic_host_supplied=True,
)
assert target_url == "http://8.8.8.8/api/public/projects"
assert headers["Host"] == "langfuse.example"