fix(datadog): split oversized batches on 413 instead of re-queueing forever (#29444)

This commit is contained in:
Yassin Kortam 2026-06-01 14:01:31 -07:00 committed by GitHub
parent f7c029d4a0
commit fe108580d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 246 additions and 34 deletions

View File

@ -41,6 +41,7 @@ from litellm.integrations.datadog.datadog_handler import (
)
from litellm.litellm_core_utils.dd_tracing import tracer
from litellm.llms.custom_httpx.http_handler import (
MaskedHTTPStatusError,
_get_httpx_client,
get_async_httpx_client,
httpxSpecialProvider,
@ -68,6 +69,22 @@ DD_LOGGED_SUCCESS_SERVICE_TYPES = [
]
def _resolve_dd_batch_size() -> int:
raw = os.getenv("DD_BATCH_SIZE")
if raw is None:
return DD_MAX_BATCH_SIZE
try:
value = int(raw)
except ValueError:
verbose_logger.warning(
"Datadog: ignoring invalid DD_BATCH_SIZE=%r, using %s",
raw,
DD_MAX_BATCH_SIZE,
)
return DD_MAX_BATCH_SIZE
return max(1, min(value, DD_MAX_BATCH_SIZE))
class DataDogLogger(
CustomBatchLogger,
AdditionalLoggingUtils,
@ -128,7 +145,9 @@ class DataDogLogger(
asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock()
super().__init__(
**kwargs, flush_lock=self.flush_lock, batch_size=DD_MAX_BATCH_SIZE
**kwargs,
flush_lock=self.flush_lock,
batch_size=_resolve_dd_batch_size(),
)
except Exception as e:
verbose_logger.exception(
@ -339,28 +358,14 @@ class DataDogLogger(
"[DATADOG MOCK] Mock mode enabled - API calls will be intercepted"
)
response = await self.async_send_compressed_data(batch_to_send)
if response.status_code == 413:
verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value)
self.log_queue = batch_to_send + self.log_queue
return
response.raise_for_status()
if response.status_code != 202:
raise Exception(
f"Response from datadog API status_code: {response.status_code}, text: {response.text}"
)
undelivered = await self._send_with_413_split(batch_to_send)
if undelivered:
self.log_queue = undelivered + self.log_queue
if self.is_mock_mode:
verbose_logger.debug(
f"[DATADOG MOCK] Batch of {len(batch_to_send)} events successfully mocked"
)
else:
verbose_logger.debug(
"Datadog: Response from datadog API status_code: %s, text: %s",
response.status_code,
response.text,
)
except Exception as e:
self.log_queue = batch_to_send + self.log_queue
@ -368,6 +373,62 @@ class DataDogLogger(
f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}"
)
async def _send_with_413_split(self, batch: List) -> List:
"""
Send a batch, halving any sub-batch that 413s (payload too large) and retrying the
halves, since Datadog enforces a 5MB uncompressed limit per request.
A 413 surfaces as a raised MaskedHTTPStatusError (httpx raise_for_status), not a
returned response, so both paths are handled. A lone event that still 413s is
dropped to avoid wedging the queue on an undeliverable payload. Returns the events
that could not be delivered because of a non-413 (transient) error, so the caller
re-queues only those and never the events already accepted by Datadog.
"""
pending: List[List] = [batch]
while pending:
chunk = pending.pop()
if not chunk:
continue
try:
response = await self.async_send_compressed_data(chunk)
except Exception as e:
if isinstance(e, MaskedHTTPStatusError) and e.status_code == 413:
response = e.response
else:
verbose_logger.exception(
f"Datadog Error sending batch API - {str(e)}"
)
return self._undelivered(chunk, pending)
if response.status_code == 413:
if len(chunk) == 1:
verbose_logger.error(DD_ERRORS.DATADOG_413_ERROR.value)
continue
mid = len(chunk) // 2
pending.append(chunk[mid:])
pending.append(chunk[:mid])
continue
if response.status_code != 202:
verbose_logger.error(
"Datadog: unexpected response status_code=%s, text=%s",
response.status_code,
response.text,
)
return self._undelivered(chunk, pending)
verbose_logger.debug(
"Datadog: delivered %s events, status_code=%s, text=%s",
len(chunk),
response.status_code,
response.text,
)
return []
@staticmethod
def _undelivered(chunk: List, pending: List[List]) -> List:
return chunk + [event for remaining in reversed(pending) for event in remaining]
async def flush_queue(self):
if self.flush_lock is None:
return

View File

@ -1,10 +1,49 @@
from unittest.mock import AsyncMock, Mock, patch
import httpx
import pytest
from httpx import Request, Response
from litellm.integrations.datadog.datadog import DataDogLogger
from litellm.types.integrations.datadog import DatadogPayload
from litellm.llms.custom_httpx.http_handler import MaskedHTTPStatusError
from litellm.types.integrations.datadog import DD_MAX_BATCH_SIZE, DatadogPayload
def _payloads(n):
return [
DatadogPayload(
ddsource="litellm",
ddtags="env:test",
hostname="host",
message=f'{{"event": {i}}}',
service="svc",
status="info",
)
for i in range(n)
]
def _raised_413():
request = Request("POST", "https://example.com")
response = Response(413, request=request, text="Payload Too Large")
return MaskedHTTPStatusError(
httpx.HTTPStatusError("413", request=request, response=response)
)
def _make_send(max_ok, delivered, *, raise_413=True):
"""Datadog double: 413 batches larger than max_ok, 202 (recording delivery) otherwise."""
async def _send(data):
request = Request("POST", "https://example.com")
if len(data) > max_ok:
if raise_413:
raise _raised_413()
return Response(413, request=request, text="Payload Too Large")
delivered.extend(event["message"] for event in data)
return Response(202, request=request, text="Accepted")
return _send
@pytest.fixture
@ -75,40 +114,152 @@ async def test_failure_hook_threshold_flush_uses_flush_queue(datadog_env):
@pytest.mark.asyncio
async def test_async_send_batch_requeues_events_on_413(datadog_env):
async def test_413_splits_oversized_batch_and_delivers_every_event(datadog_env):
"""A raised 413 (the real httpx path) halves the batch until each piece is accepted."""
with patch("asyncio.create_task"):
logger = DataDogLogger()
logger.log_queue = [
DatadogPayload(
ddsource="litellm",
ddtags="env:test",
hostname="host",
message=f'{{"event": {i}}}',
service="svc",
status="info",
logger.log_queue = _payloads(4)
delivered: list = []
logger.async_send_compressed_data = AsyncMock(side_effect=_make_send(1, delivered))
await logger.async_send_batch()
assert sorted(delivered) == [f'{{"event": {i}}}' for i in range(4)]
assert logger.log_queue == []
@pytest.mark.asyncio
async def test_413_does_not_requeue_oversized_batch(datadog_env):
"""Regression for the infinite 413 loop: an undeliverable batch must not be re-queued."""
with patch("asyncio.create_task"):
logger = DataDogLogger()
logger.log_queue = _payloads(4)
logger.async_send_compressed_data = AsyncMock(side_effect=_make_send(0, []))
await logger.async_send_batch()
await logger.async_send_batch()
assert logger.log_queue == []
@pytest.mark.asyncio
async def test_413_drops_single_oversized_event(datadog_env):
with patch("asyncio.create_task"):
logger = DataDogLogger()
logger.log_queue = _payloads(1)
send = AsyncMock(side_effect=_make_send(0, []))
logger.async_send_compressed_data = send
await logger.async_send_batch()
assert send.await_count == 1
assert logger.log_queue == []
@pytest.mark.asyncio
async def test_413_returned_response_also_splits(datadog_env):
"""Defensive path: a 413 returned (not raised) is handled the same way."""
with patch("asyncio.create_task"):
logger = DataDogLogger()
logger.log_queue = _payloads(4)
delivered: list = []
logger.async_send_compressed_data = AsyncMock(
side_effect=_make_send(1, delivered, raise_413=False)
)
await logger.async_send_batch()
assert sorted(delivered) == [f'{{"event": {i}}}' for i in range(4)]
assert logger.log_queue == []
@pytest.mark.asyncio
async def test_partial_delivery_then_transient_error_requeues_only_undelivered(
datadog_env,
):
"""A transient error after a partial split delivery must not duplicate delivered events."""
with patch("asyncio.create_task"):
logger = DataDogLogger()
logger.log_queue = _payloads(4)
delivered: list = []
async def _send(data):
messages = [event["message"] for event in data]
if len(data) > 2:
raise _raised_413()
if messages == ['{"event": 2}', '{"event": 3}']:
raise RuntimeError("transient network error")
delivered.extend(messages)
return Response(
202, request=Request("POST", "https://example.com"), text="Accepted"
)
for i in range(2)
logger.async_send_compressed_data = AsyncMock(side_effect=_send)
await logger.async_send_batch()
assert delivered == ['{"event": 0}', '{"event": 1}']
assert [event["message"] for event in logger.log_queue] == [
'{"event": 2}',
'{"event": 3}',
]
@pytest.mark.asyncio
async def test_unexpected_non_202_status_requeues(datadog_env):
"""A non-413, non-202 response is treated as undelivered and re-queued."""
with patch("asyncio.create_task"):
logger = DataDogLogger()
logger.log_queue = _payloads(2)
logger.async_send_compressed_data = AsyncMock(
return_value=Response(
413,
request=Request("POST", "https://example.com"),
text="Payload Too Large",
200, request=Request("POST", "https://example.com"), text="OK"
)
)
await logger.async_send_batch()
assert logger.async_send_compressed_data.await_count == 1
assert len(logger.log_queue) == 2
assert [event["message"] for event in logger.log_queue] == [
'{"event": 0}',
'{"event": 1}',
]
@pytest.mark.parametrize(
"value, expected",
[
("50", 50),
("1", 1),
("0", 1),
("-5", 1),
(str(DD_MAX_BATCH_SIZE + 100), DD_MAX_BATCH_SIZE),
("not_an_int", DD_MAX_BATCH_SIZE),
],
)
def test_dd_batch_size_env_resolution(monkeypatch, value, expected):
monkeypatch.setenv("DD_API_KEY", "test_api_key")
monkeypatch.setenv("DD_SITE", "test.datadoghq.com")
monkeypatch.setenv("DD_BATCH_SIZE", value)
with patch("asyncio.create_task"):
logger = DataDogLogger()
assert logger.batch_size == expected
def test_dd_batch_size_defaults_to_max(monkeypatch):
monkeypatch.setenv("DD_API_KEY", "test_api_key")
monkeypatch.setenv("DD_SITE", "test.datadoghq.com")
monkeypatch.delenv("DD_BATCH_SIZE", raising=False)
with patch("asyncio.create_task"):
logger = DataDogLogger()
assert logger.batch_size == DD_MAX_BATCH_SIZE
@pytest.mark.asyncio
async def test_async_send_batch_handles_empty_queue(datadog_env):
with patch("asyncio.create_task"):