""" Test cases for spend log cleanup functionality """ from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock import pytest from litellm.proxy.db.db_transaction_queue.spend_log_cleanup import SpendLogCleanup def test_spend_log_cleanup_cron_scheduling(): """Test that cron expressions are correctly parsed for spend log cleanup scheduling""" from apscheduler.triggers.cron import CronTrigger # Valid cron expressions cron_expr = "0 4 * * *" # 4:00 AM daily trigger = CronTrigger.from_crontab(cron_expr) assert trigger is not None # Every minute (useful for testing) trigger_minute = CronTrigger.from_crontab("*/1 * * * *") assert trigger_minute is not None # Specific day and hour trigger_weekly = CronTrigger.from_crontab("0 3 * * 0") # 3 AM every Sunday assert trigger_weekly is not None # Invalid cron expression should raise ValueError with pytest.raises(ValueError): CronTrigger.from_crontab("invalid cron") with pytest.raises(ValueError): CronTrigger.from_crontab("60 25 * * *") # Invalid minute and hour def test_spend_log_cleanup_cron_scheduler_integration(): """ Integration test: Verify the proxy_server scheduler logic correctly adds cron-based cleanup job when maximum_spend_logs_cleanup_cron is configured. This tests the logic in proxy_server.py lines 4671-4717 without requiring a real database connection. """ from unittest.mock import MagicMock from apscheduler.triggers.cron import CronTrigger # Mock scheduler mock_scheduler = MagicMock() mock_prisma_client = MagicMock() mock_cleanup_instance = MagicMock() # Test Case 1: Cron-based scheduling general_settings_cron = { "maximum_spend_logs_retention_period": "7d", "maximum_spend_logs_cleanup_cron": "0 4 * * *", # 4 AM daily } cleanup_cron = general_settings_cron.get("maximum_spend_logs_cleanup_cron") assert cleanup_cron is not None # Simulate the scheduler logic from proxy_server.py cron_trigger = CronTrigger.from_crontab(cleanup_cron) mock_scheduler.add_job( mock_cleanup_instance.cleanup_old_spend_logs, cron_trigger, args=[mock_prisma_client], id="spend_log_cleanup_job", replace_existing=True, misfire_grace_time=3600, ) # Verify scheduler was called correctly mock_scheduler.add_job.assert_called_once() call_args = mock_scheduler.add_job.call_args # Verify the trigger is a CronTrigger assert isinstance(call_args[0][1], CronTrigger) # Verify job ID assert call_args[1]["id"] == "spend_log_cleanup_job" assert call_args[1]["replace_existing"] is True # Test Case 2: Interval-based scheduling (fallback) mock_scheduler.reset_mock() general_settings_interval = { "maximum_spend_logs_retention_period": "7d", # No cron, so it should fall back to interval } cleanup_cron_fallback = general_settings_interval.get( "maximum_spend_logs_cleanup_cron" ) assert cleanup_cron_fallback is None # No cron configured # Simulate interval-based scheduling fallback retention_interval = general_settings_interval.get( "maximum_spend_logs_retention_interval", "1d" ) from litellm.litellm_core_utils.duration_parser import duration_in_seconds interval_seconds = duration_in_seconds(retention_interval) mock_scheduler.add_job( mock_cleanup_instance.cleanup_old_spend_logs, "interval", seconds=interval_seconds, args=[mock_prisma_client], id="spend_log_cleanup_job", replace_existing=True, ) # Verify interval scheduling was called mock_scheduler.add_job.assert_called_once() interval_call_args = mock_scheduler.add_job.call_args assert interval_call_args[0][1] == "interval" assert interval_call_args[1]["seconds"] == 86400 # 1 day in seconds @pytest.mark.asyncio async def test_should_delete_spend_logs(): # Test case 1: No retention set cleaner = SpendLogCleanup(general_settings={}) assert cleaner._should_delete_spend_logs() is False # Test case 2: Valid seconds string cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "3600s"} ) assert cleaner._should_delete_spend_logs() is True # Test case 3: Valid days string cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "30d"} ) assert cleaner._should_delete_spend_logs() is True # Test case 4: Valid hours string cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "24h"} ) assert cleaner._should_delete_spend_logs() is True # Test case 5: Invalid format cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "invalid"} ) assert cleaner._should_delete_spend_logs() is False @pytest.mark.asyncio async def test_cleanup_old_spend_logs_batch_deletion(): from unittest.mock import AsyncMock, MagicMock # Setup Prisma client mock_prisma_client = MagicMock() mock_db = MagicMock() # Mock execute_raw to return deleted counts mock_db.execute_raw = AsyncMock(side_effect=[1000, 500, 0]) # Wire up mocks mock_prisma_client.db = mock_db # Mock Redis cache and pod_lock_manager mock_redis_cache = MagicMock() mock_pod_lock_manager = MagicMock() mock_pod_lock_manager.redis_cache = mock_redis_cache mock_pod_lock_manager.acquire_lock = AsyncMock(return_value=True) mock_pod_lock_manager.release_lock = AsyncMock() # Run cleanup with mocked pod_lock_manager test_settings = {"maximum_spend_logs_retention_period": "7d"} cleaner = SpendLogCleanup(general_settings=test_settings) cleaner.pod_lock_manager = mock_pod_lock_manager assert cleaner._should_delete_spend_logs() is True await cleaner.cleanup_old_spend_logs(mock_prisma_client) # Validate batching and deletion via raw SQL assert mock_db.execute_raw.call_count == 3 # Check the first call argument call_args_sql = mock_db.execute_raw.call_args_list[0][0][0] assert 'DELETE FROM "LiteLLM_SpendLogs"' in call_args_sql # must match on the full composite identity: on a partitioned table # request_id alone is not unique, and deleting by it would let a client # reusing x-litellm-call-id take out a fresh row alongside the expired one assert 'WHERE ("request_id", "startTime") IN' in call_args_sql @pytest.mark.asyncio async def test_cleanup_old_spend_logs_retention_period_cutoff(): """ Test that logs are filtered using correct cutoff based on retention """ # Setup Prisma client mock_prisma_client = MagicMock() mock_db = MagicMock() mock_db.execute_raw = AsyncMock(return_value=0) mock_prisma_client.db = mock_db # Mock Redis cache and pod_lock_manager mock_redis_cache = MagicMock() mock_pod_lock_manager = MagicMock() mock_pod_lock_manager.redis_cache = mock_redis_cache mock_pod_lock_manager.acquire_lock = AsyncMock(return_value=True) mock_pod_lock_manager.release_lock = AsyncMock() # Run cleanup with mocked pod_lock_manager test_settings = {"maximum_spend_logs_retention_period": "24h"} cleaner = SpendLogCleanup(general_settings=test_settings) cleaner.pod_lock_manager = mock_pod_lock_manager assert cleaner._should_delete_spend_logs() is True await cleaner.cleanup_old_spend_logs(mock_prisma_client) # Verify the cutoff date is correct cutoff_date = mock_db.execute_raw.call_args[0][1] expected_cutoff = datetime.now(timezone.utc) - timedelta(seconds=86400) assert ( abs((cutoff_date - expected_cutoff).total_seconds()) < 1 ) # Allow 1 second difference for test execution time @pytest.mark.asyncio async def test_cleanup_drops_partitions_when_enabled_and_partitioned(): """ With use_spend_logs_partitioning enabled and a partitioned table, cleanup must reclaim disk by dropping partitions AND still delete expired rows the drops cannot reach (DEFAULT partition, cutoff-spanning partitions), so retention is never bypassed. """ from unittest.mock import AsyncMock, MagicMock mock_prisma_client = MagicMock() mock_prisma_client.db.execute_raw = AsyncMock(return_value=0) partition_manager = MagicMock() partition_manager.is_partitioned = AsyncMock(return_value=True) partition_manager.ensure_partitions = AsyncMock(return_value=["p1"]) partition_manager.drop_partitions_older_than = AsyncMock( return_value=["LiteLLM_SpendLogs_p20260601"] ) cleaner = SpendLogCleanup( general_settings={ "maximum_spend_logs_retention_period": "7d", "use_spend_logs_partitioning": True, }, partition_manager=partition_manager, ) cleaner.pod_lock_manager = MagicMock() cleaner.pod_lock_manager.redis_cache = None await cleaner.cleanup_old_spend_logs(mock_prisma_client) partition_manager.ensure_partitions.assert_awaited_once() partition_manager.drop_partitions_older_than.assert_awaited_once() delete_sql = mock_prisma_client.db.execute_raw.call_args_list[0][0][0] assert 'DELETE FROM "LiteLLM_SpendLogs"' in delete_sql @pytest.mark.asyncio async def test_cleanup_uses_delete_when_partitioning_not_enabled(): """ Even against a partitioned table, the partition path must stay off until use_spend_logs_partitioning is explicitly enabled, so existing deployments see zero behavior change. The catalog must not even be queried. """ from unittest.mock import AsyncMock, MagicMock mock_prisma_client = MagicMock() mock_prisma_client.db.execute_raw = AsyncMock(side_effect=[10, 0]) partition_manager = MagicMock() partition_manager.is_partitioned = AsyncMock(return_value=True) partition_manager.ensure_partitions = AsyncMock() partition_manager.drop_partitions_older_than = AsyncMock() cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"}, partition_manager=partition_manager, ) cleaner.pod_lock_manager = MagicMock() cleaner.pod_lock_manager.redis_cache = None await cleaner.cleanup_old_spend_logs(mock_prisma_client) partition_manager.is_partitioned.assert_not_awaited() partition_manager.drop_partitions_older_than.assert_not_awaited() delete_sql = mock_prisma_client.db.execute_raw.call_args_list[0][0][0] assert 'DELETE FROM "LiteLLM_SpendLogs"' in delete_sql @pytest.mark.asyncio async def test_cleanup_uses_delete_when_not_partitioned(): """ With the feature enabled but the table not actually partitioned (script not run yet), cleanup must keep using the batched DELETE path. """ from unittest.mock import AsyncMock, MagicMock mock_prisma_client = MagicMock() mock_prisma_client.db.execute_raw = AsyncMock(side_effect=[10, 0]) partition_manager = MagicMock() partition_manager.is_partitioned = AsyncMock(return_value=False) partition_manager.drop_partitions_older_than = AsyncMock() cleaner = SpendLogCleanup( general_settings={ "maximum_spend_logs_retention_period": "7d", "use_spend_logs_partitioning": True, }, partition_manager=partition_manager, ) cleaner.pod_lock_manager = MagicMock() cleaner.pod_lock_manager.redis_cache = None await cleaner.cleanup_old_spend_logs(mock_prisma_client) partition_manager.drop_partitions_older_than.assert_not_awaited() assert mock_prisma_client.db.execute_raw.await_count == 2 delete_sql = mock_prisma_client.db.execute_raw.call_args_list[0][0][0] assert 'DELETE FROM "LiteLLM_SpendLogs"' in delete_sql @pytest.mark.asyncio async def test_cleanup_old_spend_logs_no_retention_period(): """ Test that no logs are deleted when no retention period is set """ mock_prisma_client = MagicMock() mock_prisma_client.db.execute_raw = AsyncMock() cleaner = SpendLogCleanup(general_settings={}) # no retention await cleaner.cleanup_old_spend_logs(mock_prisma_client) mock_prisma_client.db.execute_raw.assert_not_called() @pytest.mark.asyncio async def test_lock_not_released_when_not_acquired(): """ Lock release should be skipped when _should_delete_spend_logs returns False before the lock is ever acquired. """ mock_prisma_client = MagicMock() mock_prisma_client.db.execute_raw = AsyncMock() mock_redis_cache = MagicMock() mock_pod_lock_manager = MagicMock() mock_pod_lock_manager.redis_cache = mock_redis_cache mock_pod_lock_manager.acquire_lock = AsyncMock(return_value=True) mock_pod_lock_manager.release_lock = AsyncMock() # No retention setting → _should_delete_spend_logs() returns False before lock is acquired cleaner = SpendLogCleanup(general_settings={}) cleaner.pod_lock_manager = mock_pod_lock_manager await cleaner.cleanup_old_spend_logs(mock_prisma_client) mock_pod_lock_manager.acquire_lock.assert_not_called() mock_pod_lock_manager.release_lock.assert_not_called() @pytest.mark.asyncio async def test_integer_retention_treated_as_days(): """ An integer value for maximum_spend_logs_retention_period should be treated as days (e.g., 3 → '3d' → 259200 seconds). """ cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": 3} ) result = cleaner._should_delete_spend_logs() assert result is True assert cleaner.retention_seconds == 3 * 86400 # 3 days in seconds def test_string_retention_still_works(): """ String values like '3d', '24h', '3600s' should continue to parse correctly. """ cases = [ ("3d", 3 * 86400), ("24h", 24 * 3600), ("3600s", 3600), ("2w", 2 * 604800), ] for setting, expected_seconds in cases: cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": setting} ) assert cleaner._should_delete_spend_logs() is True, f"Failed for {setting}" assert ( cleaner.retention_seconds == expected_seconds ), f"Expected {expected_seconds} for {setting}, got {cleaner.retention_seconds}" @pytest.mark.asyncio async def test_delete_old_logs_aborts_on_non_int_execute_raw_return(): """should abort deletion loop immediately when execute_raw returns a non-int (e.g. None or dict), preventing an infinite loop.""" mock_prisma_client = MagicMock() mock_db = MagicMock() mock_db.execute_raw = AsyncMock(return_value=None) mock_prisma_client.db = mock_db cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"} ) cutoff_date = datetime.now(timezone.utc) - timedelta(days=7) total_deleted = await cleaner._delete_old_logs(mock_prisma_client, cutoff_date) assert mock_db.execute_raw.call_count == 1 assert total_deleted == 0 @pytest.mark.asyncio async def test_delete_old_logs_continues_on_valid_int_return(): """should continue deletion loop across batches when execute_raw returns valid int counts.""" mock_prisma_client = MagicMock() mock_db = MagicMock() mock_db.execute_raw = AsyncMock(side_effect=[500, 300, 0]) mock_prisma_client.db = mock_db cleaner = SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"} ) cutoff_date = datetime.now(timezone.utc) - timedelta(days=7) total_deleted = await cleaner._delete_old_logs(mock_prisma_client, cutoff_date) assert mock_db.execute_raw.call_count == 3 assert total_deleted == 800 @pytest.mark.asyncio async def test_delete_old_logs_continues_after_single_batch_failure(monkeypatch): """A single batch failure (e.g. DB timeout) must not abort the whole run — subsequent batches should still execute and their counts accumulate.""" import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module # Zero out the failure backoff so the test doesn't take ~0.5s of real sleep. monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.0 ) mock_prisma_client = MagicMock() mock_db = MagicMock() # batch 1 succeeds, batch 2 raises (one-off DB timeout), batches 3-4 succeed, # batch 5 returns 0 → loop exits naturally. mock_db.execute_raw = AsyncMock( side_effect=[100, TimeoutError("simulated DB timeout"), 200, 50, 0] ) mock_prisma_client.db = mock_db cleaner = cleanup_module.SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"} ) cutoff_date = datetime.now(timezone.utc) - timedelta(days=7) total_deleted = await cleaner._delete_old_logs(mock_prisma_client, cutoff_date) # All 5 batches should have been attempted; 100 + 200 + 50 = 350 deleted. assert mock_db.execute_raw.call_count == 5 assert total_deleted == 350 @pytest.mark.asyncio async def test_delete_old_logs_aborts_after_consecutive_failures(monkeypatch): """If batch failures persist for SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES in a row (e.g. DB is down), the loop must abort instead of hot-looping.""" import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module # Lower the threshold so the test is fast and deterministic. monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 3 ) monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.0 ) mock_prisma_client = MagicMock() mock_db = MagicMock() # Every batch raises — must abort after exactly 3 attempts, not loop forever. mock_db.execute_raw = AsyncMock( side_effect=ConnectionError("simulated persistent DB outage") ) mock_prisma_client.db = mock_db cleaner = cleanup_module.SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"} ) cutoff_date = datetime.now(timezone.utc) - timedelta(days=7) total_deleted = await cleaner._delete_old_logs(mock_prisma_client, cutoff_date) assert mock_db.execute_raw.call_count == 3 assert total_deleted == 0 @pytest.mark.asyncio async def test_delete_old_logs_resets_consecutive_failures_on_success(monkeypatch): """A success between failures must reset the consecutive-failure counter so intermittent timeouts don't trip the abort threshold.""" import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 3 ) monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.0 ) mock_prisma_client = MagicMock() mock_db = MagicMock() # Pattern: fail, fail, success (resets counter), fail, fail, success, done. # Without reset, three of these would trip abort; with reset, they don't. mock_db.execute_raw = AsyncMock( side_effect=[ TimeoutError("t1"), TimeoutError("t2"), 100, TimeoutError("t3"), TimeoutError("t4"), 50, 0, ] ) mock_prisma_client.db = mock_db cleaner = cleanup_module.SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"} ) cutoff_date = datetime.now(timezone.utc) - timedelta(days=7) total_deleted = await cleaner._delete_old_logs(mock_prisma_client, cutoff_date) assert mock_db.execute_raw.call_count == 7 assert total_deleted == 150 @pytest.mark.asyncio async def test_cleanup_uses_logger_exception_for_full_traceback(monkeypatch): """The outer error handler must call logger.exception() (not .error(str(e))) so Prisma/DB timeouts surface a full traceback and exception type.""" import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module mock_logger = MagicMock() monkeypatch.setattr(cleanup_module, "verbose_proxy_logger", mock_logger) mock_prisma_client = MagicMock() # Force the outer try/except to fire by making _should_delete_spend_logs raise. cleaner = cleanup_module.SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"} ) cleaner.pod_lock_manager = None def boom(): raise RuntimeError("simulated prisma timeout") cleaner._should_delete_spend_logs = boom # type: ignore[assignment] await cleaner.cleanup_old_spend_logs(mock_prisma_client) assert mock_logger.exception.called, "expected logger.exception() to be called" # The exception type name must appear in the formatted args so operators can # tell *what* failed, not just "Error during cleanup:". call_args = mock_logger.exception.call_args formatted = call_args[0][0] % call_args[0][1:] assert "RuntimeError" in formatted assert "simulated prisma timeout" in formatted @pytest.mark.asyncio async def test_cleanup_releases_lock_after_persistent_batch_failures(monkeypatch): """Even when batch deletion aborts due to consecutive failures, the pod lock must still be released so the next scheduled run isn't permanently blocked.""" import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_MAX_CONSECUTIVE_BATCH_FAILURES", 2 ) monkeypatch.setattr( cleanup_module, "SPEND_LOG_CLEANUP_BATCH_FAILURE_BACKOFF_SECONDS", 0.0 ) mock_prisma_client = MagicMock() mock_db = MagicMock() mock_db.execute_raw = AsyncMock(side_effect=TimeoutError("DB down")) mock_prisma_client.db = mock_db mock_pod_lock_manager = MagicMock() mock_pod_lock_manager.redis_cache = MagicMock() mock_pod_lock_manager.acquire_lock = AsyncMock(return_value=True) mock_pod_lock_manager.release_lock = AsyncMock() cleaner = cleanup_module.SpendLogCleanup( general_settings={"maximum_spend_logs_retention_period": "7d"} ) cleaner.pod_lock_manager = mock_pod_lock_manager await cleaner.cleanup_old_spend_logs(mock_prisma_client) # Cleanup didn't crash; the abort-after-failures path returned cleanly. mock_pod_lock_manager.release_lock.assert_awaited_once() def test_cleanup_batch_size_env_var(monkeypatch): """Ensure batch size is configurable via environment variable""" import importlib import litellm.constants as constants_module import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module # Set env var and reload modules to pick up new value monkeypatch.setenv("SPEND_LOG_CLEANUP_BATCH_SIZE", "25") importlib.reload(constants_module) importlib.reload(cleanup_module) cleaner = cleanup_module.SpendLogCleanup(general_settings={}) assert cleaner.batch_size == 25 # Remove env var and reload to restore default for other tests monkeypatch.delenv("SPEND_LOG_CLEANUP_BATCH_SIZE", raising=False) importlib.reload(constants_module) importlib.reload(cleanup_module)