diff --git a/db_scripts/partition_spend_logs.sql b/db_scripts/partition_spend_logs.sql new file mode 100644 index 0000000000..08fcbddb6f --- /dev/null +++ b/db_scripts/partition_spend_logs.sql @@ -0,0 +1,99 @@ +-- Converts an existing LiteLLM_SpendLogs table into a native Postgres +-- range-partitioned table keyed on "startTime". +-- +-- Why: at high request volume, retention via DELETE leaves dead tuples that +-- autovacuum cannot reclaim quickly enough, so the table keeps growing on disk +-- (seen at 450GB+ after ~1 month). With partitioning, retention drops whole +-- partitions, which is instant and returns disk to the OS immediately. +-- +-- This is an opt-in, manual operation. The default LiteLLM schema is NOT +-- partitioned, so existing installs are unaffected until you run this. +-- +-- IMPORTANT +-- * Test on a staging copy first and take a backup. +-- * Postgres cannot convert a populated table to partitioned in place, so this +-- renames the old table aside and creates a fresh partitioned table. +-- * The partition key ("startTime") must be part of the primary key, so the +-- PK becomes composite ("request_id", "startTime"). LiteLLM's write path uses +-- INSERT ... ON CONFLICT DO NOTHING, which is compatible with this. +-- * Choose a partition granularity ("day" is the recommended default for +-- high-volume tables) and keep it consistent with SPEND_LOG_PARTITION_INTERVAL. +-- +-- After running this, enable the feature and set a retention period in +-- proxy_config.yaml: +-- general_settings: +-- use_spend_logs_partitioning: true +-- maximum_spend_logs_retention_period: "30d" +-- The spend-log cleanup job then verifies the table is partitioned and reclaims +-- disk by dropping expired partitions instead of deleting rows. It also +-- pre-creates upcoming partitions on each run. To roll back, see +-- db_scripts/unpartition_spend_logs.sql. + +BEGIN; + +ALTER TABLE "LiteLLM_SpendLogs" RENAME TO "LiteLLM_SpendLogs_legacy"; + +-- Renaming a table does NOT rename its indexes, and index names are unique per +-- schema. Move the legacy table's indexes aside so the CREATE INDEX statements +-- below actually create indexes on the new partitioned table instead of being +-- silently skipped by IF NOT EXISTS, and so the new PK keeps the canonical +-- name instead of getting a "_pkey1" suffix. +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_pkey" + RENAME TO "LiteLLM_SpendLogs_legacy_pkey"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_startTime_idx" + RENAME TO "LiteLLM_SpendLogs_legacy_startTime_idx"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_startTime_request_id_idx" + RENAME TO "LiteLLM_SpendLogs_legacy_startTime_request_id_idx"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_end_user_idx" + RENAME TO "LiteLLM_SpendLogs_legacy_end_user_idx"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_session_id_idx" + RENAME TO "LiteLLM_SpendLogs_legacy_session_id_idx"; + +CREATE TABLE "LiteLLM_SpendLogs" ( + LIKE "LiteLLM_SpendLogs_legacy" INCLUDING DEFAULTS INCLUDING GENERATED +) PARTITION BY RANGE ("startTime"); + +ALTER TABLE "LiteLLM_SpendLogs" + ADD PRIMARY KEY ("request_id", "startTime"); + +-- Recreate every index Prisma defines on the table. LIKE ... INCLUDING DEFAULTS +-- INCLUDING GENERATED copies columns and defaults but NOT indexes, so without +-- these the admin-UI cost-reporting queries that filter by end_user/session_id +-- fall back to sequential scans. On a partitioned parent these propagate to +-- every current and future partition automatically. +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_startTime_idx" + ON "LiteLLM_SpendLogs" ("startTime"); + +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_startTime_request_id_idx" + ON "LiteLLM_SpendLogs" ("startTime", "request_id"); + +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_end_user_idx" + ON "LiteLLM_SpendLogs" ("end_user"); + +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_session_id_idx" + ON "LiteLLM_SpendLogs" ("session_id"); + +-- Safety net: any row whose startTime has no explicit partition lands here so +-- writes never fail. The cleanup job never drops the DEFAULT partition. +CREATE TABLE IF NOT EXISTS "LiteLLM_SpendLogs_pdefault" + PARTITION OF "LiteLLM_SpendLogs" DEFAULT; + +COMMIT; + +-- Backfill (optional). Rows route to the correct partition automatically. +-- For large legacy tables, copy in time-bounded batches during a low-traffic +-- window instead of one statement, or simply keep "LiteLLM_SpendLogs_legacy" +-- read-only until its data ages past your retention, then DROP it. +-- +-- Backfilled rows land in the DEFAULT partition until explicit partitions +-- cover their dates. Postgres refuses to create a partition whose range +-- overlaps rows already in DEFAULT, so the cleanup job may log a warning when +-- pre-creating today's partition right after a backfill; it recovers on its +-- own once those dates age out, and future partitions are unaffected because +-- they are always created ahead of writes. +-- +-- INSERT INTO "LiteLLM_SpendLogs" +-- SELECT * FROM "LiteLLM_SpendLogs_legacy" +-- WHERE "startTime" >= now() - interval '30 days'; +-- +-- DROP TABLE "LiteLLM_SpendLogs_legacy"; diff --git a/db_scripts/unpartition_spend_logs.sql b/db_scripts/unpartition_spend_logs.sql new file mode 100644 index 0000000000..0bd82513e4 --- /dev/null +++ b/db_scripts/unpartition_spend_logs.sql @@ -0,0 +1,69 @@ +-- Rolls back db_scripts/partition_spend_logs.sql: converts the native +-- range-partitioned "LiteLLM_SpendLogs" table back into a plain, +-- non-partitioned table matching the default LiteLLM schema. +-- +-- When/why: run this if you want to stop using partition-based retention and +-- return to DELETE-based cleanup, or to restore the original single-column +-- primary key ("request_id") that the partitioned layout had to widen to a +-- composite ("request_id", "startTime"). +-- +-- IMPORTANT +-- * Test on a staging copy first and take a backup. +-- * Postgres cannot convert a partitioned table back in place, so this +-- renames the partitioned table aside and creates a fresh plain table. +-- * The composite PK could in principle hold the same "request_id" in more +-- than one partition, so rows are copied with ON CONFLICT DO NOTHING to +-- restore the single-column PK without failing on such duplicates. +-- * For large tables the INSERT ... SELECT copies every surviving row and may +-- run long; do it during a low-traffic window. +-- * Also remove use_spend_logs_partitioning from proxy_config.yaml (or set it +-- to false) so the cleanup job returns to DELETE-based retention. + +BEGIN; + +ALTER TABLE "LiteLLM_SpendLogs" RENAME TO "LiteLLM_SpendLogs_partitioned"; + +-- Renaming a table does NOT rename its indexes, and index names are unique per +-- schema. Move the partitioned table's indexes aside so the CREATE INDEX +-- statements below actually create indexes on the new plain table instead of +-- being silently skipped by IF NOT EXISTS, and so the new PK keeps the +-- canonical name. +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_pkey" + RENAME TO "LiteLLM_SpendLogs_partitioned_pkey"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_pkey1" + RENAME TO "LiteLLM_SpendLogs_partitioned_pkey1"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_startTime_idx" + RENAME TO "LiteLLM_SpendLogs_partitioned_startTime_idx"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_startTime_request_id_idx" + RENAME TO "LiteLLM_SpendLogs_partitioned_startTime_request_id_idx"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_end_user_idx" + RENAME TO "LiteLLM_SpendLogs_partitioned_end_user_idx"; +ALTER INDEX IF EXISTS "LiteLLM_SpendLogs_session_id_idx" + RENAME TO "LiteLLM_SpendLogs_partitioned_session_id_idx"; + +CREATE TABLE "LiteLLM_SpendLogs" ( + LIKE "LiteLLM_SpendLogs_partitioned" INCLUDING DEFAULTS INCLUDING GENERATED +); + +ALTER TABLE "LiteLLM_SpendLogs" + ADD PRIMARY KEY ("request_id"); + +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_startTime_idx" + ON "LiteLLM_SpendLogs" ("startTime"); + +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_startTime_request_id_idx" + ON "LiteLLM_SpendLogs" ("startTime", "request_id"); + +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_end_user_idx" + ON "LiteLLM_SpendLogs" ("end_user"); + +CREATE INDEX IF NOT EXISTS "LiteLLM_SpendLogs_session_id_idx" + ON "LiteLLM_SpendLogs" ("session_id"); + +INSERT INTO "LiteLLM_SpendLogs" +SELECT * FROM "LiteLLM_SpendLogs_partitioned" +ON CONFLICT ("request_id") DO NOTHING; + +DROP TABLE "LiteLLM_SpendLogs_partitioned"; + +COMMIT; diff --git a/litellm/constants.py b/litellm/constants.py index ab8e57d735..a5e3926aa8 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -1497,6 +1497,10 @@ SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES = int( SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS = float( os.getenv("SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.5) ) +SPEND_LOG_PARTITION_INTERVAL = os.getenv("SPEND_LOG_PARTITION_INTERVAL", "day") +SPEND_LOG_PARTITION_PRECREATE_AHEAD = int( + os.getenv("SPEND_LOG_PARTITION_PRECREATE_AHEAD", 7) +) SPEND_LOG_QUEUE_SIZE_THRESHOLD = int(os.getenv("SPEND_LOG_QUEUE_SIZE_THRESHOLD", 100)) SPEND_LOG_QUEUE_POLL_INTERVAL = float(os.getenv("SPEND_LOG_QUEUE_POLL_INTERVAL", 2.0)) SPEND_COUNTER_RESEED_LOCKS_MAX_SIZE = int( diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index 1b594e20d3..35e9e0cd74 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -2300,6 +2300,10 @@ class ConfigGeneralSettings(LiteLLMPydanticObjectBase): None, description="Maximum retention period for spend logs (e.g., '7d' for 7 days). Logs older than this will be deleted.", ) + use_spend_logs_partitioning: Optional[bool] = Field( + None, + description="If True and LiteLLM_SpendLogs has been converted to a range-partitioned table (db_scripts/partition_spend_logs.sql), retention cleanup drops expired partitions instead of deleting rows, and pre-creates upcoming partitions. Default is False.", + ) mcp_internal_ip_ranges: Optional[List[str]] = Field( None, description="Custom CIDR ranges that define internal/private networks for MCP access control. When set, only these ranges are treated as internal. Defaults to RFC 1918 private ranges (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 127.0.0.0/8).", diff --git a/litellm/proxy/db/db_transaction_queue/spend_log_cleanup.py b/litellm/proxy/db/db_transaction_queue/spend_log_cleanup.py index 9475779cfd..a4c23937b9 100644 --- a/litellm/proxy/db/db_transaction_queue/spend_log_cleanup.py +++ b/litellm/proxy/db/db_transaction_queue/spend_log_cleanup.py @@ -12,19 +12,31 @@ from litellm.constants import ( SPEND_LOG_RUN_LOOPS, ) from litellm.litellm_core_utils.duration_parser import duration_in_seconds +from litellm.proxy.db.db_transaction_queue.spend_logs_partition_manager import ( + SpendLogsPartitionManager, +) from litellm.proxy.utils import PrismaClient class SpendLogCleanup: """ Handles cleaning up old spend logs based on maximum retention period. - Deletes logs in batches to prevent timeouts. + + When LiteLLM_SpendLogs is range-partitioned, expired data is reclaimed by + dropping whole partitions (instant, frees disk immediately). Otherwise it + falls back to deleting logs in batches. Uses PodLockManager to ensure only one pod runs cleanup in multi-pod deployments. """ - def __init__(self, general_settings=None, redis_cache: Optional[RedisCache] = None): + def __init__( + self, + general_settings=None, + redis_cache: Optional[RedisCache] = None, + partition_manager: Optional[SpendLogsPartitionManager] = None, + ): self.batch_size = SPEND_LOG_CLEANUP_BATCH_SIZE self.retention_seconds: Optional[int] = None + self.partition_manager = partition_manager or SpendLogsPartitionManager() from litellm.proxy.proxy_server import general_settings as default_settings self.general_settings = general_settings or default_settings @@ -89,8 +101,8 @@ class SpendLogCleanup: deleted_result = await prisma_client.db.execute_raw( """ DELETE FROM "LiteLLM_SpendLogs" - WHERE "request_id" IN ( - SELECT "request_id" FROM "LiteLLM_SpendLogs" + WHERE ("request_id", "startTime") IN ( + SELECT "request_id", "startTime" FROM "LiteLLM_SpendLogs" WHERE "startTime" < $1::timestamptz LIMIT $2 ) @@ -195,12 +207,32 @@ class SpendLogCleanup: seconds=float(self.retention_seconds) ) verbose_proxy_logger.info( - f"Deleting logs older than {cutoff_date.isoformat()}" + f"Removing logs older than {cutoff_date.isoformat()}" ) - # Perform the actual deletion - total_deleted = await self._delete_old_logs(prisma_client, cutoff_date) - verbose_proxy_logger.info(f"Deleted {total_deleted} logs") + if self.general_settings.get( + "use_spend_logs_partitioning", False + ) and await self.partition_manager.is_partitioned(prisma_client): + await self.partition_manager.ensure_partitions(prisma_client) + dropped = await self.partition_manager.drop_partitions_older_than( + prisma_client, cutoff_date + ) + verbose_proxy_logger.info( + "Dropped %d expired spend-log partitions: %s", + len(dropped), + dropped, + ) + # DROP only reclaims whole expired partitions. Expired rows can + # still sit in the DEFAULT partition (backfill, coverage gaps) + # or in a partition that spans the cutoff, so retention must + # also delete those stragglers row-wise. + total_deleted = await self._delete_old_logs(prisma_client, cutoff_date) + verbose_proxy_logger.info( + f"Deleted {total_deleted} expired logs not covered by dropped partitions" + ) + else: + total_deleted = await self._delete_old_logs(prisma_client, cutoff_date) + verbose_proxy_logger.info(f"Deleted {total_deleted} logs") except Exception as e: # .exception() captures the traceback; str(e) alone on a Prisma/DB diff --git a/litellm/proxy/db/db_transaction_queue/spend_logs_partition_manager.py b/litellm/proxy/db/db_transaction_queue/spend_logs_partition_manager.py new file mode 100644 index 0000000000..eee0f862b4 --- /dev/null +++ b/litellm/proxy/db/db_transaction_queue/spend_logs_partition_manager.py @@ -0,0 +1,208 @@ +""" +Manages native Postgres range partitions for the LiteLLM_SpendLogs table. + +At high request volume, retention via batched DELETE leaves dead tuples that +autovacuum cannot reclaim fast enough, so the table keeps growing on disk. When +the table is range-partitioned on startTime, dropping old data becomes a +DROP TABLE on a whole partition: an instant metadata operation that returns disk +to the OS immediately, with no tombstones and no vacuum. + +This manager only acts when use_spend_logs_partitioning is enabled in +general_settings AND the table is already partitioned (set up via the +db_scripts/partition_spend_logs.sql runbook). Without both, the cleanup job +keeps the batched-DELETE path, so existing deployments are untouched. +""" + +import re +from datetime import date, datetime, timedelta, timezone +from typing import List, Optional, Tuple + +from litellm._logging import verbose_proxy_logger +from litellm.constants import ( + SPEND_LOG_PARTITION_INTERVAL, + SPEND_LOG_PARTITION_PRECREATE_AHEAD, +) + +SPEND_LOGS_TABLE = "LiteLLM_SpendLogs" + +PartitionInterval = str # "day" | "week" | "month" + +VALID_PARTITION_INTERVALS = {"day", "week", "month"} + +_BOUND_UPPER_RE = re.compile(r"TO \('([^']+)'\)") + + +def period_start(day: date, interval: PartitionInterval) -> date: + """First day of the partition period that `day` falls into (UTC).""" + if interval == "day": + return day + if interval == "week": + return day - timedelta(days=day.weekday()) + if interval == "month": + return day.replace(day=1) + raise ValueError(f"Unsupported partition interval: {interval}") + + +def next_period_start(start: date, interval: PartitionInterval) -> date: + if interval == "day": + return start + timedelta(days=1) + if interval == "week": + return start + timedelta(days=7) + if interval == "month": + if start.month == 12: + return start.replace(year=start.year + 1, month=1) + return start.replace(month=start.month + 1) + raise ValueError(f"Unsupported partition interval: {interval}") + + +def partition_name(start: date) -> str: + return f"{SPEND_LOGS_TABLE}_p{start.strftime('%Y%m%d')}" + + +def upcoming_partitions( + today: date, interval: PartitionInterval, ahead: int +) -> List[Tuple[str, date, date]]: + """ + Specs (name, lower_inclusive, upper_exclusive) for the current period plus + the next `ahead` periods, so writes always have a partition to land in. + """ + specs: List[Tuple[str, date, date]] = [] + start = period_start(today, interval) + for _ in range(ahead + 1): + upper = next_period_start(start, interval) + specs.append((partition_name(start), start, upper)) + start = upper + return specs + + +def parse_partition_upper_bound(bound_expr: str) -> Optional[datetime]: + """ + Upper bound of a Postgres partition from its `pg_get_expr(relpartbound)` + string, e.g. "FOR VALUES FROM ('2026-06-01 00:00:00') TO ('2026-06-02 00:00:00')". + Returns None for the DEFAULT partition or anything we cannot parse, so such + partitions are never selected for dropping. + """ + if "DEFAULT" in bound_expr.upper(): + return None + match = _BOUND_UPPER_RE.search(bound_expr) + if match is None: + return None + try: + return datetime.fromisoformat(match.group(1)) + except ValueError: + return None + + +def select_partitions_to_drop( + partitions: List[Tuple[str, Optional[datetime]]], cutoff: datetime +) -> List[str]: + """ + Names of partitions whose entire range is older than `cutoff` (upper bound + <= cutoff). `cutoff` and the bounds are UTC-naive. Partitions without a + parseable upper bound (e.g. DEFAULT) are kept. + """ + return [name for name, upper in partitions if upper is not None and upper <= cutoff] + + +class SpendLogsPartitionManager: + def __init__( + self, + interval: PartitionInterval = SPEND_LOG_PARTITION_INTERVAL, + precreate_ahead: int = SPEND_LOG_PARTITION_PRECREATE_AHEAD, + ): + if interval not in VALID_PARTITION_INTERVALS: + verbose_proxy_logger.warning( + "Invalid SPEND_LOG_PARTITION_INTERVAL %r, falling back to 'day'. " + "Supported values: %s", + interval, + sorted(VALID_PARTITION_INTERVALS), + ) + interval = "day" + self.interval = interval + self.precreate_ahead = precreate_ahead + + async def is_partitioned(self, prisma_client) -> bool: + try: + rows = await prisma_client.db.query_raw( + """ + SELECT EXISTS ( + SELECT 1 + FROM pg_partitioned_table pt + JOIN pg_class c ON c.oid = pt.partrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = $1 + AND n.nspname = current_schema() + ) AS partitioned + """, + SPEND_LOGS_TABLE, + ) + except Exception as e: + verbose_proxy_logger.warning( + "Could not determine if %s is partitioned, assuming it is not: %s", + SPEND_LOGS_TABLE, + e, + ) + return False + return bool(rows and rows[0].get("partitioned")) + + async def ensure_partitions(self, prisma_client) -> List[str]: + """ + Ensure the current and upcoming partitions exist, returning the names + now present. CREATE TABLE IF NOT EXISTS is a no-op for partitions that + already exist, so this list is "ensured present", not "newly created". + """ + ensured: List[str] = [] + for name, lower, upper in upcoming_partitions( + datetime.now(timezone.utc).date(), self.interval, self.precreate_ahead + ): + try: + await prisma_client.db.execute_raw( + f'CREATE TABLE IF NOT EXISTS "{name}" ' + f'PARTITION OF "{SPEND_LOGS_TABLE}" ' + f"FOR VALUES FROM ('{lower.isoformat()}') TO ('{upper.isoformat()}')" + ) + ensured.append(name) + except Exception as e: + verbose_proxy_logger.warning( + "Failed to ensure spend-log partition %s: %s", name, e + ) + return ensured + + async def _list_partitions( + self, prisma_client + ) -> List[Tuple[str, Optional[datetime]]]: + rows = await prisma_client.db.query_raw( + """ + SELECT c.relname AS name, + pg_get_expr(c.relpartbound, c.oid) AS bound + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + JOIN pg_namespace n ON n.oid = p.relnamespace + WHERE p.relname = $1 + AND n.nspname = current_schema() + """, + SPEND_LOGS_TABLE, + ) + return [ + (row["name"], parse_partition_upper_bound(row.get("bound") or "")) + for row in rows + ] + + async def drop_partitions_older_than( + self, prisma_client, cutoff: datetime + ) -> List[str]: + """DROP every partition whose whole range is older than `cutoff`.""" + cutoff_naive = cutoff.astimezone(timezone.utc).replace(tzinfo=None) + partitions = await self._list_partitions(prisma_client) + to_drop = select_partitions_to_drop(partitions, cutoff_naive) + dropped: List[str] = [] + for name in to_drop: + try: + await prisma_client.db.execute_raw(f'DROP TABLE IF EXISTS "{name}"') + dropped.append(name) + except Exception as e: + verbose_proxy_logger.warning( + "Failed to drop spend-log partition %s: %s", name, e + ) + return dropped diff --git a/tests/test_litellm/proxy/db/db_transaction_queue/test_spend_logs_partition_manager.py b/tests/test_litellm/proxy/db/db_transaction_queue/test_spend_logs_partition_manager.py new file mode 100644 index 0000000000..289de70738 --- /dev/null +++ b/tests/test_litellm/proxy/db/db_transaction_queue/test_spend_logs_partition_manager.py @@ -0,0 +1,233 @@ +""" +Tests for SpendLogsPartitionManager: partition naming/bounds math, retention +selection, the non-partitioned no-op safety path, and the drop/ensure SQL flow. +""" + +from datetime import date, datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from litellm.proxy.db.db_transaction_queue.spend_logs_partition_manager import ( + SpendLogsPartitionManager, + next_period_start, + parse_partition_upper_bound, + partition_name, + period_start, + select_partitions_to_drop, + upcoming_partitions, +) + + +def test_period_start_per_interval(): + d = date(2026, 6, 3) # a Wednesday + assert period_start(d, "day") == date(2026, 6, 3) + assert period_start(d, "week") == date(2026, 6, 1) # Monday + assert period_start(d, "month") == date(2026, 6, 1) + + +def test_next_period_start_crosses_year_and_month_boundaries(): + assert next_period_start(date(2026, 6, 3), "day") == date(2026, 6, 4) + assert next_period_start(date(2026, 6, 1), "week") == date(2026, 6, 8) + assert next_period_start(date(2026, 12, 1), "month") == date(2027, 1, 1) + + +def test_partition_name_uses_period_start_date(): + assert partition_name(date(2026, 6, 1)) == "LiteLLM_SpendLogs_p20260601" + + +def test_upcoming_partitions_count_and_contiguous_ranges(): + specs = upcoming_partitions(date(2026, 6, 1), "day", ahead=3) + assert len(specs) == 4 # current + 3 ahead + names = [s[0] for s in specs] + assert names == [ + "LiteLLM_SpendLogs_p20260601", + "LiteLLM_SpendLogs_p20260602", + "LiteLLM_SpendLogs_p20260603", + "LiteLLM_SpendLogs_p20260604", + ] + # ranges must be contiguous and half-open: each upper is the next lower + for (_, _, upper), (_, next_lower, _) in zip(specs, specs[1:]): + assert upper == next_lower + + +def test_parse_partition_upper_bound_extracts_to_value(): + bound = "FOR VALUES FROM ('2026-06-01 00:00:00') TO ('2026-06-02 00:00:00')" + assert parse_partition_upper_bound(bound) == datetime(2026, 6, 2, 0, 0, 0) + + +def test_parse_partition_upper_bound_default_is_none(): + assert parse_partition_upper_bound("DEFAULT") is None + assert parse_partition_upper_bound("garbage") is None + + +def test_select_partitions_to_drop_only_fully_expired(): + cutoff = datetime(2026, 6, 10, 0, 0, 0) + partitions = [ + ("p_old", datetime(2026, 6, 9, 0, 0, 0)), # upper < cutoff -> drop + ("p_boundary", datetime(2026, 6, 10, 0, 0, 0)), # upper == cutoff -> drop + ("p_partial", datetime(2026, 6, 11, 0, 0, 0)), # straddles cutoff -> keep + ("p_default", None), # DEFAULT -> keep + ] + assert select_partitions_to_drop(partitions, cutoff) == ["p_old", "p_boundary"] + + +@pytest.mark.asyncio +async def test_is_partitioned_true_and_false(): + mgr = SpendLogsPartitionManager() + + client_true = MagicMock() + client_true.db.query_raw = AsyncMock(return_value=[{"partitioned": True}]) + assert await mgr.is_partitioned(client_true) is True + + client_false = MagicMock() + client_false.db.query_raw = AsyncMock(return_value=[{"partitioned": False}]) + assert await mgr.is_partitioned(client_false) is False + + +@pytest.mark.asyncio +async def test_catalog_queries_are_scoped_to_current_schema(): + """ + Both catalog lookups must filter by current_schema(); otherwise a same-named + table in another schema can flip is_partitioned or return foreign partitions. + """ + mgr = SpendLogsPartitionManager() + client = MagicMock() + client.db.query_raw = AsyncMock(return_value=[]) + + await mgr.is_partitioned(client) + is_partitioned_sql = client.db.query_raw.call_args.args[0] + assert "pg_namespace" in is_partitioned_sql + assert "current_schema()" in is_partitioned_sql + + await mgr._list_partitions(client) + list_sql = client.db.query_raw.call_args.args[0] + assert "pg_namespace" in list_sql + assert "current_schema()" in list_sql + + +@pytest.mark.asyncio +async def test_is_partitioned_swallows_errors_and_returns_false(): + """A catalog query failure must not crash cleanup; fall back to non-partitioned.""" + mgr = SpendLogsPartitionManager() + client = MagicMock() + client.db.query_raw = AsyncMock(side_effect=Exception("db down")) + assert await mgr.is_partitioned(client) is False + + +@pytest.mark.asyncio +async def test_drop_partitions_older_than_drops_expired_only(): + mgr = SpendLogsPartitionManager() + client = MagicMock() + client.db.query_raw = AsyncMock( + return_value=[ + { + "name": "LiteLLM_SpendLogs_p20260601", + "bound": "FOR VALUES FROM ('2026-06-01 00:00:00') TO ('2026-06-02 00:00:00')", + }, + { + "name": "LiteLLM_SpendLogs_p20260609", + "bound": "FOR VALUES FROM ('2026-06-09 00:00:00') TO ('2026-06-10 00:00:00')", + }, + {"name": "LiteLLM_SpendLogs_pdefault", "bound": "DEFAULT"}, + ] + ) + client.db.execute_raw = AsyncMock(return_value=0) + + cutoff = datetime(2026, 6, 5, 0, 0, 0, tzinfo=timezone.utc) + dropped = await mgr.drop_partitions_older_than(client, cutoff) + + assert dropped == ["LiteLLM_SpendLogs_p20260601"] + executed = " ".join(call.args[0] for call in client.db.execute_raw.call_args_list) + assert 'DROP TABLE IF EXISTS "LiteLLM_SpendLogs_p20260601"' in executed + assert "p20260609" not in executed + assert "pdefault" not in executed + + +@pytest.mark.asyncio +async def test_ensure_partitions_issues_create_for_each_period(): + mgr = SpendLogsPartitionManager(interval="day", precreate_ahead=2) + client = MagicMock() + client.db.execute_raw = AsyncMock(return_value=0) + + created = await mgr.ensure_partitions(client) + + assert len(created) == 3 # current + 2 ahead + assert client.db.execute_raw.await_count == 3 + first_sql = client.db.execute_raw.call_args_list[0].args[0] + assert 'PARTITION OF "LiteLLM_SpendLogs"' in first_sql + assert "CREATE TABLE IF NOT EXISTS" in first_sql + + +def test_unsupported_interval_raises(): + with pytest.raises(ValueError): + period_start(date(2026, 6, 1), "year") + with pytest.raises(ValueError): + next_period_start(date(2026, 6, 1), "year") + + +def test_parse_partition_upper_bound_unparseable_to_value_is_none(): + """A TO(...) value that is not a valid timestamp must not raise; return None.""" + assert ( + parse_partition_upper_bound("FOR VALUES FROM ('x') TO ('not-a-date')") is None + ) + + +@pytest.mark.asyncio +async def test_ensure_partitions_continues_when_one_create_fails(): + mgr = SpendLogsPartitionManager(interval="day", precreate_ahead=2) + client = MagicMock() + client.db.execute_raw = AsyncMock(side_effect=[0, Exception("overlap"), 0]) + + created = await mgr.ensure_partitions(client) + + # the failed partition is skipped, the others still created + assert len(created) == 2 + assert client.db.execute_raw.await_count == 3 + + +def test_invalid_interval_falls_back_to_day(): + """ + An invalid interval must not be stored as-is. Otherwise ensure_partitions + raises (via period_start) and aborts the cleanup run before retention drops + old partitions, silently skipping retention. + """ + mgr = SpendLogsPartitionManager(interval="year") + assert mgr.interval == "day" + + +@pytest.mark.asyncio +async def test_invalid_interval_does_not_abort_ensure_partitions(): + """With the fallback, ensure_partitions completes instead of raising ValueError.""" + mgr = SpendLogsPartitionManager(interval="fortnight", precreate_ahead=1) + client = MagicMock() + client.db.execute_raw = AsyncMock(return_value=0) + + created = await mgr.ensure_partitions(client) + + assert len(created) == 2 # current + 1 ahead, day-based fallback + + +@pytest.mark.asyncio +async def test_drop_partitions_continues_when_one_drop_fails(): + mgr = SpendLogsPartitionManager() + client = MagicMock() + client.db.query_raw = AsyncMock( + return_value=[ + { + "name": "LiteLLM_SpendLogs_p20260601", + "bound": "FOR VALUES FROM ('2026-06-01 00:00:00') TO ('2026-06-02 00:00:00')", + }, + { + "name": "LiteLLM_SpendLogs_p20260602", + "bound": "FOR VALUES FROM ('2026-06-02 00:00:00') TO ('2026-06-03 00:00:00')", + }, + ] + ) + client.db.execute_raw = AsyncMock(side_effect=[Exception("locked"), 0]) + + cutoff = datetime(2026, 6, 10, 0, 0, 0, tzinfo=timezone.utc) + dropped = await mgr.drop_partitions_older_than(client, cutoff) + + # both were eligible; the first drop failed so only the second is reported + assert dropped == ["LiteLLM_SpendLogs_p20260602"] diff --git a/tests/test_litellm/proxy/test_spend_log_cleanup.py b/tests/test_litellm/proxy/test_spend_log_cleanup.py index 42bb919295..a309dd6401 100644 --- a/tests/test_litellm/proxy/test_spend_log_cleanup.py +++ b/tests/test_litellm/proxy/test_spend_log_cleanup.py @@ -183,7 +183,10 @@ async def test_cleanup_old_spend_logs_batch_deletion(): # Check the first call argument call_args_sql = mock_db.execute_raw.call_args_list[0][0][0] assert 'DELETE FROM "LiteLLM_SpendLogs"' in call_args_sql - assert 'WHERE "request_id" IN' in call_args_sql + # must match on the full composite identity: on a partitioned table + # request_id alone is not unique, and deleting by it would let a client + # reusing x-litellm-call-id take out a fresh row alongside the expired one + assert 'WHERE ("request_id", "startTime") IN' in call_args_sql @pytest.mark.asyncio @@ -219,6 +222,109 @@ async def test_cleanup_old_spend_logs_retention_period_cutoff(): ) # Allow 1 second difference for test execution time +@pytest.mark.asyncio +async def test_cleanup_drops_partitions_when_enabled_and_partitioned(): + """ + With use_spend_logs_partitioning enabled and a partitioned table, cleanup + must reclaim disk by dropping partitions AND still delete expired rows the + drops cannot reach (DEFAULT partition, cutoff-spanning partitions), so + retention is never bypassed. + """ + from unittest.mock import AsyncMock, MagicMock + + mock_prisma_client = MagicMock() + mock_prisma_client.db.execute_raw = AsyncMock(return_value=0) + + partition_manager = MagicMock() + partition_manager.is_partitioned = AsyncMock(return_value=True) + partition_manager.ensure_partitions = AsyncMock(return_value=["p1"]) + partition_manager.drop_partitions_older_than = AsyncMock( + return_value=["LiteLLM_SpendLogs_p20260601"] + ) + + cleaner = SpendLogCleanup( + general_settings={ + "maximum_spend_logs_retention_period": "7d", + "use_spend_logs_partitioning": True, + }, + partition_manager=partition_manager, + ) + cleaner.pod_lock_manager = MagicMock() + cleaner.pod_lock_manager.redis_cache = None + + await cleaner.cleanup_old_spend_logs(mock_prisma_client) + + partition_manager.ensure_partitions.assert_awaited_once() + partition_manager.drop_partitions_older_than.assert_awaited_once() + delete_sql = mock_prisma_client.db.execute_raw.call_args_list[0][0][0] + assert 'DELETE FROM "LiteLLM_SpendLogs"' in delete_sql + + +@pytest.mark.asyncio +async def test_cleanup_uses_delete_when_partitioning_not_enabled(): + """ + Even against a partitioned table, the partition path must stay off until + use_spend_logs_partitioning is explicitly enabled, so existing deployments + see zero behavior change. The catalog must not even be queried. + """ + from unittest.mock import AsyncMock, MagicMock + + mock_prisma_client = MagicMock() + mock_prisma_client.db.execute_raw = AsyncMock(side_effect=[10, 0]) + + partition_manager = MagicMock() + partition_manager.is_partitioned = AsyncMock(return_value=True) + partition_manager.ensure_partitions = AsyncMock() + partition_manager.drop_partitions_older_than = AsyncMock() + + cleaner = SpendLogCleanup( + general_settings={"maximum_spend_logs_retention_period": "7d"}, + partition_manager=partition_manager, + ) + cleaner.pod_lock_manager = MagicMock() + cleaner.pod_lock_manager.redis_cache = None + + await cleaner.cleanup_old_spend_logs(mock_prisma_client) + + partition_manager.is_partitioned.assert_not_awaited() + partition_manager.drop_partitions_older_than.assert_not_awaited() + delete_sql = mock_prisma_client.db.execute_raw.call_args_list[0][0][0] + assert 'DELETE FROM "LiteLLM_SpendLogs"' in delete_sql + + +@pytest.mark.asyncio +async def test_cleanup_uses_delete_when_not_partitioned(): + """ + With the feature enabled but the table not actually partitioned (script not + run yet), cleanup must keep using the batched DELETE path. + """ + from unittest.mock import AsyncMock, MagicMock + + mock_prisma_client = MagicMock() + mock_prisma_client.db.execute_raw = AsyncMock(side_effect=[10, 0]) + + partition_manager = MagicMock() + partition_manager.is_partitioned = AsyncMock(return_value=False) + partition_manager.drop_partitions_older_than = AsyncMock() + + cleaner = SpendLogCleanup( + general_settings={ + "maximum_spend_logs_retention_period": "7d", + "use_spend_logs_partitioning": True, + }, + partition_manager=partition_manager, + ) + cleaner.pod_lock_manager = MagicMock() + cleaner.pod_lock_manager.redis_cache = None + + await cleaner.cleanup_old_spend_logs(mock_prisma_client) + + partition_manager.drop_partitions_older_than.assert_not_awaited() + assert mock_prisma_client.db.execute_raw.await_count == 2 + delete_sql = mock_prisma_client.db.execute_raw.call_args_list[0][0][0] + assert 'DELETE FROM "LiteLLM_SpendLogs"' in delete_sql + + @pytest.mark.asyncio async def test_cleanup_old_spend_logs_no_retention_period(): """ @@ -370,7 +476,9 @@ async def test_delete_old_logs_aborts_after_consecutive_failures(monkeypatch): import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module # Lower the threshold so the test is fast and deterministic. - monkeypatch.setattr(cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 3) + monkeypatch.setattr( + cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 3 + ) monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.0 ) @@ -400,7 +508,9 @@ async def test_delete_old_logs_resets_consecutive_failures_on_success(monkeypatc intermittent timeouts don't trip the abort threshold.""" import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module - monkeypatch.setattr(cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 3) + monkeypatch.setattr( + cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 3 + ) monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.0 ) @@ -471,7 +581,9 @@ async def test_cleanup_releases_lock_after_persistent_batch_failures(monkeypatch must still be released so the next scheduled run isn't permanently blocked.""" import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module - monkeypatch.setattr(cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 2) + monkeypatch.setattr( + cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 2 + ) monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.0 ) diff --git a/ui/litellm-dashboard/src/lib/http/schema.d.ts b/ui/litellm-dashboard/src/lib/http/schema.d.ts index 15123bcdbf..4aeb1cf3e8 100644 --- a/ui/litellm-dashboard/src/lib/http/schema.d.ts +++ b/ui/litellm-dashboard/src/lib/http/schema.d.ts @@ -22175,6 +22175,11 @@ export interface components { * @description decrypt keys with google kms */ use_google_kms?: boolean | null; + /** + * Use Spend Logs Partitioning + * @description If True and LiteLLM_SpendLogs has been converted to a range-partitioned table (db_scripts/partition_spend_logs.sql), retention cleanup drops expired partitions instead of deleting rows, and pre-creates upcoming partitions. Default is False. + */ + use_spend_logs_partitioning?: boolean | null; /** User Header Mappings */ user_header_mappings?: components["schemas"]["UserHeaderMapping"][] | null; /**