diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py index c3e555f6e8..79a9219a39 100644 --- a/litellm/integrations/datadog/datadog.py +++ b/litellm/integrations/datadog/datadog.py @@ -41,6 +41,7 @@ from litellm.integrations.datadog.datadog_handler import ( ) from litellm.litellm_core_utils.dd_tracing import tracer from litellm.llms.custom_httpx.http_handler import ( + MaskedHTTPStatusError, _get_httpx_client, get_async_httpx_client, httpxSpecialProvider, @@ -68,6 +69,22 @@ DD_LOGGED_SUCCESS_SERVICE_TYPES = [ ] +def _resolve_dd_batch_size() -> int: + raw = os.getenv("DD_BATCH_SIZE") + if raw is None: + return DD_MAX_BATCH_SIZE + try: + value = int(raw) + except ValueError: + verbose_logger.warning( + "Datadog: ignoring invalid DD_BATCH_SIZE=%r, using %s", + raw, + DD_MAX_BATCH_SIZE, + ) + return DD_MAX_BATCH_SIZE + return max(1, min(value, DD_MAX_BATCH_SIZE)) + + class DataDogLogger( CustomBatchLogger, AdditionalLoggingUtils, @@ -128,7 +145,9 @@ class DataDogLogger( asyncio.create_task(self.periodic_flush()) self.flush_lock = asyncio.Lock() super().__init__( - **kwargs, flush_lock=self.flush_lock, batch_size=DD_MAX_BATCH_SIZE + **kwargs, + flush_lock=self.flush_lock, + batch_size=_resolve_dd_batch_size(), ) except Exception as e: verbose_logger.exception( @@ -339,28 +358,14 @@ class DataDogLogger( "[DATADOG MOCK] Mock mode enabled - API calls will be intercepted" ) - response = await self.async_send_compressed_data(batch_to_send) - if response.status_code == 413: - verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value) - self.log_queue = batch_to_send + self.log_queue - return - - response.raise_for_status() - if response.status_code != 202: - raise Exception( - f"Response from datadog API status_code: {response.status_code}, text: {response.text}" - ) + undelivered = await self._send_with_413_split(batch_to_send) + if undelivered: + self.log_queue = undelivered + self.log_queue if self.is_mock_mode: verbose_logger.debug( f"[DATADOG MOCK] Batch of {len(batch_to_send)} events successfully mocked" ) - else: - verbose_logger.debug( - "Datadog: Response from datadog API status_code: %s, text: %s", - response.status_code, - response.text, - ) except Exception as e: self.log_queue = batch_to_send + self.log_queue @@ -368,6 +373,62 @@ class DataDogLogger( f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}" ) + async def _send_with_413_split(self, batch: List) -> List: + """ + Send a batch, halving any sub-batch that 413s (payload too large) and retrying the + halves, since Datadog enforces a 5MB uncompressed limit per request. + + A 413 surfaces as a raised MaskedHTTPStatusError (httpx raise_for_status), not a + returned response, so both paths are handled. A lone event that still 413s is + dropped to avoid wedging the queue on an undeliverable payload. Returns the events + that could not be delivered because of a non-413 (transient) error, so the caller + re-queues only those and never the events already accepted by Datadog. + """ + pending: List[List] = [batch] + while pending: + chunk = pending.pop() + if not chunk: + continue + try: + response = await self.async_send_compressed_data(chunk) + except Exception as e: + if isinstance(e, MaskedHTTPStatusError) and e.status_code == 413: + response = e.response + else: + verbose_logger.exception( + f"Datadog Error sending batch API - {str(e)}" + ) + return self._undelivered(chunk, pending) + + if response.status_code == 413: + if len(chunk) == 1: + verbose_logger.error(DD_ERRORS.DATADOG_413_ERROR.value) + continue + mid = len(chunk) // 2 + pending.append(chunk[mid:]) + pending.append(chunk[:mid]) + continue + + if response.status_code != 202: + verbose_logger.error( + "Datadog: unexpected response status_code=%s, text=%s", + response.status_code, + response.text, + ) + return self._undelivered(chunk, pending) + + verbose_logger.debug( + "Datadog: delivered %s events, status_code=%s, text=%s", + len(chunk), + response.status_code, + response.text, + ) + return [] + + @staticmethod + def _undelivered(chunk: List, pending: List[List]) -> List: + return chunk + [event for remaining in reversed(pending) for event in remaining] + async def flush_queue(self): if self.flush_lock is None: return diff --git a/tests/test_litellm/integrations/datadog/test_datadog_logger_batching.py b/tests/test_litellm/integrations/datadog/test_datadog_logger_batching.py index e4d7227cc8..d1c7a4032f 100644 --- a/tests/test_litellm/integrations/datadog/test_datadog_logger_batching.py +++ b/tests/test_litellm/integrations/datadog/test_datadog_logger_batching.py @@ -1,10 +1,49 @@ from unittest.mock import AsyncMock, Mock, patch +import httpx import pytest from httpx import Request, Response from litellm.integrations.datadog.datadog import DataDogLogger -from litellm.types.integrations.datadog import DatadogPayload +from litellm.llms.custom_httpx.http_handler import MaskedHTTPStatusError +from litellm.types.integrations.datadog import DD_MAX_BATCH_SIZE, DatadogPayload + + +def _payloads(n): + return [ + DatadogPayload( + ddsource="litellm", + ddtags="env:test", + hostname="host", + message=f'{{"event": {i}}}', + service="svc", + status="info", + ) + for i in range(n) + ] + + +def _raised_413(): + request = Request("POST", "https://example.com") + response = Response(413, request=request, text="Payload Too Large") + return MaskedHTTPStatusError( + httpx.HTTPStatusError("413", request=request, response=response) + ) + + +def _make_send(max_ok, delivered, *, raise_413=True): + """Datadog double: 413 batches larger than max_ok, 202 (recording delivery) otherwise.""" + + async def _send(data): + request = Request("POST", "https://example.com") + if len(data) > max_ok: + if raise_413: + raise _raised_413() + return Response(413, request=request, text="Payload Too Large") + delivered.extend(event["message"] for event in data) + return Response(202, request=request, text="Accepted") + + return _send @pytest.fixture @@ -75,40 +114,152 @@ async def test_failure_hook_threshold_flush_uses_flush_queue(datadog_env): @pytest.mark.asyncio -async def test_async_send_batch_requeues_events_on_413(datadog_env): +async def test_413_splits_oversized_batch_and_delivers_every_event(datadog_env): + """A raised 413 (the real httpx path) halves the batch until each piece is accepted.""" with patch("asyncio.create_task"): logger = DataDogLogger() - logger.log_queue = [ - DatadogPayload( - ddsource="litellm", - ddtags="env:test", - hostname="host", - message=f'{{"event": {i}}}', - service="svc", - status="info", + logger.log_queue = _payloads(4) + delivered: list = [] + logger.async_send_compressed_data = AsyncMock(side_effect=_make_send(1, delivered)) + + await logger.async_send_batch() + + assert sorted(delivered) == [f'{{"event": {i}}}' for i in range(4)] + assert logger.log_queue == [] + + +@pytest.mark.asyncio +async def test_413_does_not_requeue_oversized_batch(datadog_env): + """Regression for the infinite 413 loop: an undeliverable batch must not be re-queued.""" + with patch("asyncio.create_task"): + logger = DataDogLogger() + + logger.log_queue = _payloads(4) + logger.async_send_compressed_data = AsyncMock(side_effect=_make_send(0, [])) + + await logger.async_send_batch() + await logger.async_send_batch() + + assert logger.log_queue == [] + + +@pytest.mark.asyncio +async def test_413_drops_single_oversized_event(datadog_env): + with patch("asyncio.create_task"): + logger = DataDogLogger() + + logger.log_queue = _payloads(1) + send = AsyncMock(side_effect=_make_send(0, [])) + logger.async_send_compressed_data = send + + await logger.async_send_batch() + + assert send.await_count == 1 + assert logger.log_queue == [] + + +@pytest.mark.asyncio +async def test_413_returned_response_also_splits(datadog_env): + """Defensive path: a 413 returned (not raised) is handled the same way.""" + with patch("asyncio.create_task"): + logger = DataDogLogger() + + logger.log_queue = _payloads(4) + delivered: list = [] + logger.async_send_compressed_data = AsyncMock( + side_effect=_make_send(1, delivered, raise_413=False) + ) + + await logger.async_send_batch() + + assert sorted(delivered) == [f'{{"event": {i}}}' for i in range(4)] + assert logger.log_queue == [] + + +@pytest.mark.asyncio +async def test_partial_delivery_then_transient_error_requeues_only_undelivered( + datadog_env, +): + """A transient error after a partial split delivery must not duplicate delivered events.""" + with patch("asyncio.create_task"): + logger = DataDogLogger() + + logger.log_queue = _payloads(4) + delivered: list = [] + + async def _send(data): + messages = [event["message"] for event in data] + if len(data) > 2: + raise _raised_413() + if messages == ['{"event": 2}', '{"event": 3}']: + raise RuntimeError("transient network error") + delivered.extend(messages) + return Response( + 202, request=Request("POST", "https://example.com"), text="Accepted" ) - for i in range(2) + + logger.async_send_compressed_data = AsyncMock(side_effect=_send) + + await logger.async_send_batch() + + assert delivered == ['{"event": 0}', '{"event": 1}'] + assert [event["message"] for event in logger.log_queue] == [ + '{"event": 2}', + '{"event": 3}', ] + +@pytest.mark.asyncio +async def test_unexpected_non_202_status_requeues(datadog_env): + """A non-413, non-202 response is treated as undelivered and re-queued.""" + with patch("asyncio.create_task"): + logger = DataDogLogger() + + logger.log_queue = _payloads(2) logger.async_send_compressed_data = AsyncMock( return_value=Response( - 413, - request=Request("POST", "https://example.com"), - text="Payload Too Large", + 200, request=Request("POST", "https://example.com"), text="OK" ) ) await logger.async_send_batch() - assert logger.async_send_compressed_data.await_count == 1 - assert len(logger.log_queue) == 2 assert [event["message"] for event in logger.log_queue] == [ '{"event": 0}', '{"event": 1}', ] +@pytest.mark.parametrize( + "value, expected", + [ + ("50", 50), + ("1", 1), + ("0", 1), + ("-5", 1), + (str(DD_MAX_BATCH_SIZE + 100), DD_MAX_BATCH_SIZE), + ("not_an_int", DD_MAX_BATCH_SIZE), + ], +) +def test_dd_batch_size_env_resolution(monkeypatch, value, expected): + monkeypatch.setenv("DD_API_KEY", "test_api_key") + monkeypatch.setenv("DD_SITE", "test.datadoghq.com") + monkeypatch.setenv("DD_BATCH_SIZE", value) + with patch("asyncio.create_task"): + logger = DataDogLogger() + assert logger.batch_size == expected + + +def test_dd_batch_size_defaults_to_max(monkeypatch): + monkeypatch.setenv("DD_API_KEY", "test_api_key") + monkeypatch.setenv("DD_SITE", "test.datadoghq.com") + monkeypatch.delenv("DD_BATCH_SIZE", raising=False) + with patch("asyncio.create_task"): + logger = DataDogLogger() + assert logger.batch_size == DD_MAX_BATCH_SIZE + + @pytest.mark.asyncio async def test_async_send_batch_handles_empty_queue(datadog_env): with patch("asyncio.create_task"):