mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 11:43:54 +00:00
(fix) GCS bucket logger - apply truncate_standard_logging_payload_content
to standard_logging_payload
and ensure GCS flushes queue on fails (#7500)
* use truncate_standard_logging_payload_content * update truncate_standard_logging_payload_content * update dd logger * update gcs async_send_batch * fix code check * test_datadog_payload_content_truncation * fix code quality
This commit is contained in:
parent
4e4495ae3d
commit
3317619357
6 changed files with 49 additions and 39 deletions
|
@ -256,10 +256,6 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from litellm.litellm_core_utils.litellm_logging import (
|
|
||||||
truncate_standard_logging_payload_content,
|
|
||||||
)
|
|
||||||
|
|
||||||
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get(
|
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get(
|
||||||
"standard_logging_object", None
|
"standard_logging_object", None
|
||||||
)
|
)
|
||||||
|
@ -271,7 +267,6 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
status = DataDogStatus.ERROR
|
status = DataDogStatus.ERROR
|
||||||
|
|
||||||
# Build the initial payload
|
# Build the initial payload
|
||||||
truncate_standard_logging_payload_content(standard_logging_object)
|
|
||||||
json_payload = json.dumps(standard_logging_object, default=str)
|
json_payload = json.dumps(standard_logging_object, default=str)
|
||||||
|
|
||||||
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
|
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
|
||||||
|
|
|
@ -138,11 +138,11 @@ class GCSBucketLogger(GCSBucketBase):
|
||||||
logging_payload=logging_payload,
|
logging_payload=logging_payload,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Clear the queue after processing
|
|
||||||
self.log_queue.clear()
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verbose_logger.exception(f"GCS Bucket batch logging error: {str(e)}")
|
verbose_logger.exception(f"GCS Bucket batch logging error: {str(e)}")
|
||||||
|
finally:
|
||||||
|
# Clear the queue after processing
|
||||||
|
self.log_queue.clear()
|
||||||
|
|
||||||
def _get_object_name(
|
def _get_object_name(
|
||||||
self, kwargs: Dict, logging_payload: StandardLoggingPayload, response_obj: Any
|
self, kwargs: Dict, logging_payload: StandardLoggingPayload, response_obj: Any
|
||||||
|
|
|
@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any, List, Optional, Union
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from litellm._logging import verbose_logger
|
from litellm._logging import verbose_logger
|
||||||
from litellm.types.llms.openai import AllMessageValues, ChatCompletionToolParam
|
from litellm.types.llms.openai import AllMessageValues
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from opentelemetry.trace import Span as _Span
|
from opentelemetry.trace import Span as _Span
|
||||||
|
|
|
@ -3026,6 +3026,8 @@ def get_standard_logging_object_payload(
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
truncate_standard_logging_payload_content(payload)
|
||||||
|
|
||||||
return payload
|
return payload
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verbose_logger.exception(
|
verbose_logger.exception(
|
||||||
|
@ -3040,20 +3042,26 @@ def truncate_standard_logging_payload_content(
|
||||||
"""
|
"""
|
||||||
Truncate error strings and message content in logging payload
|
Truncate error strings and message content in logging payload
|
||||||
|
|
||||||
Some loggers like DataDog have a limit on the size of the payload. (1MB)
|
Most logging integrations - DataDog / GCS Bucket / have a limit on the size of the payload. ~around(1MB)
|
||||||
|
|
||||||
This function truncates the error string and the message content if they exceed a certain length.
|
This function truncates the error string and the message content if they exceed a certain length.
|
||||||
"""
|
"""
|
||||||
MAX_STR_LENGTH = 10_000
|
try:
|
||||||
|
MAX_STR_LENGTH = 10_000
|
||||||
|
|
||||||
# Truncate fields that might exceed max length
|
# Truncate fields that might exceed max length
|
||||||
fields_to_truncate = ["error_str", "messages", "response"]
|
fields_to_truncate = ["error_str", "messages", "response"]
|
||||||
for field in fields_to_truncate:
|
for field in fields_to_truncate:
|
||||||
_truncate_field(
|
_truncate_field(
|
||||||
standard_logging_object=standard_logging_object,
|
standard_logging_object=standard_logging_object,
|
||||||
field_name=field,
|
field_name=field,
|
||||||
max_length=MAX_STR_LENGTH,
|
max_length=MAX_STR_LENGTH,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
verbose_logger.exception(
|
||||||
|
"Error truncating standard logging payload - {}".format(str(e))
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
def _truncate_text(text: str, max_length: int) -> str:
|
def _truncate_text(text: str, max_length: int) -> str:
|
||||||
|
|
|
@ -13,7 +13,7 @@ from litellm.llms.base_llm.chat.transformation import LiteLLMLoggingObj
|
||||||
from litellm.llms.openai.common_utils import drop_params_from_unprocessable_entity_error
|
from litellm.llms.openai.common_utils import drop_params_from_unprocessable_entity_error
|
||||||
from litellm.llms.openai.openai import OpenAIConfig
|
from litellm.llms.openai.openai import OpenAIConfig
|
||||||
from litellm.secret_managers.main import get_secret_str
|
from litellm.secret_managers.main import get_secret_str
|
||||||
from litellm.types.llms.openai import AllMessageValues, ChatCompletionToolParam
|
from litellm.types.llms.openai import AllMessageValues
|
||||||
from litellm.types.utils import ModelResponse, ProviderField
|
from litellm.types.utils import ModelResponse, ProviderField
|
||||||
from litellm.utils import _add_path_to_api_base
|
from litellm.utils import _add_path_to_api_base
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
|
from re import M
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from litellm.integrations.custom_logger import CustomLogger
|
||||||
|
|
||||||
|
|
||||||
sys.path.insert(0, os.path.abspath("../.."))
|
sys.path.insert(0, os.path.abspath("../.."))
|
||||||
|
|
||||||
|
@ -392,6 +395,14 @@ async def test_datadog_payload_environment_variables():
|
||||||
pytest.fail(f"Test failed with exception: {str(e)}")
|
pytest.fail(f"Test failed with exception: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
class TestDDLogger(CustomLogger):
|
||||||
|
def __init__(self):
|
||||||
|
self.standard_logging_object: Optional[StandardLoggingPayload] = None
|
||||||
|
|
||||||
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
self.standard_logging_object = kwargs["standard_logging_object"]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_datadog_payload_content_truncation():
|
async def test_datadog_payload_content_truncation():
|
||||||
"""
|
"""
|
||||||
|
@ -399,15 +410,13 @@ async def test_datadog_payload_content_truncation():
|
||||||
|
|
||||||
DataDog has a limit of 1MB for the logged payload size.
|
DataDog has a limit of 1MB for the logged payload size.
|
||||||
"""
|
"""
|
||||||
dd_logger = DataDogLogger()
|
dd_logger = TestDDLogger()
|
||||||
|
litellm.callbacks = [dd_logger]
|
||||||
|
|
||||||
# Create a standard payload with very long content
|
|
||||||
standard_payload = create_standard_logging_payload()
|
|
||||||
long_content = "x" * 80_000 # Create string longer than MAX_STR_LENGTH (10_000)
|
long_content = "x" * 80_000 # Create string longer than MAX_STR_LENGTH (10_000)
|
||||||
|
|
||||||
# Modify payload with long content
|
# messages with long content
|
||||||
standard_payload["error_str"] = long_content
|
messages = [
|
||||||
standard_payload["messages"] = [
|
|
||||||
{
|
{
|
||||||
"role": "user",
|
"role": "user",
|
||||||
"content": [
|
"content": [
|
||||||
|
@ -421,28 +430,26 @@ async def test_datadog_payload_content_truncation():
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
standard_payload["response"] = {"choices": [{"message": {"content": long_content}}]}
|
await litellm.acompletion(
|
||||||
|
model="gpt-3.5-turbo",
|
||||||
# Create the payload
|
messages=messages,
|
||||||
dd_payload = dd_logger.create_datadog_logging_payload(
|
temperature=0.2,
|
||||||
kwargs={"standard_logging_object": standard_payload},
|
mock_response=long_content,
|
||||||
response_obj=None,
|
|
||||||
start_time=datetime.now(),
|
|
||||||
end_time=datetime.now(),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
print("dd_payload", json.dumps(dd_payload, indent=2))
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
# Parse the message back to dict to verify truncation
|
# Create the payload
|
||||||
message_dict = json.loads(dd_payload["message"])
|
standard_logging_payload = dd_logger.standard_logging_object
|
||||||
|
|
||||||
|
print("standard_logging_payload", json.dumps(standard_logging_payload, indent=2))
|
||||||
|
|
||||||
# Verify truncation of fields
|
# Verify truncation of fields
|
||||||
assert len(message_dict["error_str"]) < 10_100, "error_str not truncated correctly"
|
|
||||||
assert (
|
assert (
|
||||||
len(str(message_dict["messages"])) < 10_100
|
len(str(standard_logging_payload["messages"])) < 10_100
|
||||||
), "messages not truncated correctly"
|
), "messages not truncated correctly"
|
||||||
assert (
|
assert (
|
||||||
len(str(message_dict["response"])) < 10_100
|
len(str(standard_logging_payload["response"])) < 10_100
|
||||||
), "response not truncated correctly"
|
), "response not truncated correctly"
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue