(fix) GCS bucket logger - apply truncate_standard_logging_payload_content to standard_logging_payload and ensure GCS flushes queue on fails (#7519)

* fix async_send_batch for gcs

* fix truncate GCS logger

* test_truncate_standard_logging_payload
This commit is contained in:
Ishaan Jaff 2025-01-03 08:09:03 -08:00 committed by GitHub
parent 6ffdd5c250
commit ec3bcf189f
5 changed files with 87 additions and 84 deletions

View file

@ -293,3 +293,60 @@ class CustomLogger: # https://docs.litellm.ai/docs/observability/custom_callbac
except Exception: except Exception:
print_verbose(f"Custom Logger Error - {traceback.format_exc()}") print_verbose(f"Custom Logger Error - {traceback.format_exc()}")
pass pass
# Useful helpers for custom logger classes
def truncate_standard_logging_payload_content(
self,
standard_logging_object: StandardLoggingPayload,
):
"""
Truncate error strings and message content in logging payload
Some loggers like DataDog/ GCS Bucket have a limit on the size of the payload. (1MB)
This function truncates the error string and the message content if they exceed a certain length.
"""
MAX_STR_LENGTH = 10_000
# Truncate fields that might exceed max length
fields_to_truncate = ["error_str", "messages", "response"]
for field in fields_to_truncate:
self._truncate_field(
standard_logging_object=standard_logging_object,
field_name=field,
max_length=MAX_STR_LENGTH,
)
def _truncate_field(
self,
standard_logging_object: StandardLoggingPayload,
field_name: str,
max_length: int,
) -> None:
"""
Helper function to truncate a field in the logging payload
This converts the field to a string and then truncates it if it exceeds the max length.
Why convert to string ?
1. User was sending a poorly formatted list for `messages` field, we could not predict where they would send content
- Converting to string and then truncating the logged content catches this
2. We want to avoid modifying the original `messages`, `response`, and `error_str` in the logging payload since these are in kwargs and could be returned to the user
"""
field_value = standard_logging_object.get(field_name) # type: ignore
if field_value:
str_value = str(field_value)
if len(str_value) > max_length:
standard_logging_object[field_name] = self._truncate_text( # type: ignore
text=str_value, max_length=max_length
)
def _truncate_text(self, text: str, max_length: int) -> str:
"""Truncate text if it exceeds max_length"""
return (
text[:max_length]
+ "...truncated by litellm, this logger does not support large content"
if len(text) > max_length
else text
)

View file

@ -256,10 +256,6 @@ class DataDogLogger(CustomBatchLogger):
""" """
import json import json
from litellm.litellm_core_utils.litellm_logging import (
truncate_standard_logging_payload_content,
)
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get( standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None "standard_logging_object", None
) )
@ -271,7 +267,7 @@ class DataDogLogger(CustomBatchLogger):
status = DataDogStatus.ERROR status = DataDogStatus.ERROR
# Build the initial payload # Build the initial payload
truncate_standard_logging_payload_content(standard_logging_object) self.truncate_standard_logging_payload_content(standard_logging_object)
json_payload = json.dumps(standard_logging_object, default=str) json_payload = json.dumps(standard_logging_object, default=str)
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)

View file

@ -64,7 +64,7 @@ class GCSBucketLogger(GCSBucketBase):
) )
if logging_payload is None: if logging_payload is None:
raise ValueError("standard_logging_object not found in kwargs") raise ValueError("standard_logging_object not found in kwargs")
self.truncate_standard_logging_payload_content(logging_payload)
# Add to logging queue - this will be flushed periodically # Add to logging queue - this will be flushed periodically
self.log_queue.append( self.log_queue.append(
GCSLogQueueItem( GCSLogQueueItem(
@ -88,7 +88,7 @@ class GCSBucketLogger(GCSBucketBase):
) )
if logging_payload is None: if logging_payload is None:
raise ValueError("standard_logging_object not found in kwargs") raise ValueError("standard_logging_object not found in kwargs")
self.truncate_standard_logging_payload_content(logging_payload)
# Add to logging queue - this will be flushed periodically # Add to logging queue - this will be flushed periodically
self.log_queue.append( self.log_queue.append(
GCSLogQueueItem( GCSLogQueueItem(
@ -114,35 +114,37 @@ class GCSBucketLogger(GCSBucketBase):
if not self.log_queue: if not self.log_queue:
return return
try: for log_item in self.log_queue:
for log_item in self.log_queue: logging_payload = log_item["payload"]
logging_payload = log_item["payload"] kwargs = log_item["kwargs"]
kwargs = log_item["kwargs"] response_obj = log_item.get("response_obj", None) or {}
response_obj = log_item.get("response_obj", None) or {}
gcs_logging_config: GCSLoggingConfig = ( gcs_logging_config: GCSLoggingConfig = await self.get_gcs_logging_config(
await self.get_gcs_logging_config(kwargs) kwargs
) )
headers = await self.construct_request_headers( headers = await self.construct_request_headers(
vertex_instance=gcs_logging_config["vertex_instance"], vertex_instance=gcs_logging_config["vertex_instance"],
service_account_json=gcs_logging_config["path_service_account"], service_account_json=gcs_logging_config["path_service_account"],
) )
bucket_name = gcs_logging_config["bucket_name"] bucket_name = gcs_logging_config["bucket_name"]
object_name = self._get_object_name( object_name = self._get_object_name(kwargs, logging_payload, response_obj)
kwargs, logging_payload, response_obj
) try:
await self._log_json_data_on_gcs( await self._log_json_data_on_gcs(
headers=headers, headers=headers,
bucket_name=bucket_name, bucket_name=bucket_name,
object_name=object_name, object_name=object_name,
logging_payload=logging_payload, logging_payload=logging_payload,
) )
except Exception as e:
# don't let one log item fail the entire batch
verbose_logger.exception(
f"GCS Bucket error logging payload to GCS bucket: {str(e)}"
)
pass
# Clear the queue after processing # Clear the queue after processing
self.log_queue.clear() self.log_queue.clear()
except Exception as e:
verbose_logger.exception(f"GCS Bucket batch logging error: {str(e)}")
def _get_object_name( def _get_object_name(
self, kwargs: Dict, logging_payload: StandardLoggingPayload, response_obj: Any self, kwargs: Dict, logging_payload: StandardLoggingPayload, response_obj: Any

View file

@ -3034,60 +3034,6 @@ def get_standard_logging_object_payload(
return None return None
def truncate_standard_logging_payload_content(
standard_logging_object: StandardLoggingPayload,
):
"""
Truncate error strings and message content in logging payload
Some loggers like DataDog have a limit on the size of the payload. (1MB)
This function truncates the error string and the message content if they exceed a certain length.
"""
MAX_STR_LENGTH = 10_000
# Truncate fields that might exceed max length
fields_to_truncate = ["error_str", "messages", "response"]
for field in fields_to_truncate:
_truncate_field(
standard_logging_object=standard_logging_object,
field_name=field,
max_length=MAX_STR_LENGTH,
)
def _truncate_text(text: str, max_length: int) -> str:
"""Truncate text if it exceeds max_length"""
return (
text[:max_length]
+ "...truncated by litellm, this logger does not support large content"
if len(text) > max_length
else text
)
def _truncate_field(
standard_logging_object: StandardLoggingPayload, field_name: str, max_length: int
) -> None:
"""
Helper function to truncate a field in the logging payload
This converts the field to a string and then truncates it if it exceeds the max length.
Why convert to string ?
1. User was sending a poorly formatted list for `messages` field, we could not predict where they would send content
- Converting to string and then truncating the logged content catches this
2. We want to avoid modifying the original `messages`, `response`, and `error_str` in the logging payload since these are in kwargs and could be returned to the user
"""
field_value = standard_logging_object.get(field_name) # type: ignore
if field_value:
str_value = str(field_value)
if len(str_value) > max_length:
standard_logging_object[field_name] = _truncate_text( # type: ignore
text=str_value, max_length=max_length
)
def get_standard_logging_metadata( def get_standard_logging_metadata(
metadata: Optional[Dict[str, Any]] metadata: Optional[Dict[str, Any]]
) -> StandardLoggingMetadata: ) -> StandardLoggingMetadata:

View file

@ -28,9 +28,10 @@ from create_mock_standard_logging_payload import (
) )
from litellm.litellm_core_utils.litellm_logging import ( from litellm.litellm_core_utils.litellm_logging import (
StandardLoggingPayloadSetup, StandardLoggingPayloadSetup,
truncate_standard_logging_payload_content,
) )
from litellm.integrations.custom_logger import CustomLogger
@pytest.mark.parametrize( @pytest.mark.parametrize(
"response_obj,expected_values", "response_obj,expected_values",
@ -332,6 +333,7 @@ def test_truncate_standard_logging_payload():
1. original messages, response, and error_str should NOT BE MODIFIED, since these are from kwargs 1. original messages, response, and error_str should NOT BE MODIFIED, since these are from kwargs
2. the `messages`, `response`, and `error_str` in new standard_logging_payload should be truncated 2. the `messages`, `response`, and `error_str` in new standard_logging_payload should be truncated
""" """
_custom_logger = CustomLogger()
standard_logging_payload: StandardLoggingPayload = ( standard_logging_payload: StandardLoggingPayload = (
create_standard_logging_payload_with_long_content() create_standard_logging_payload_with_long_content()
) )
@ -342,7 +344,7 @@ def test_truncate_standard_logging_payload():
original_error_str = standard_logging_payload["error_str"] original_error_str = standard_logging_payload["error_str"]
len_original_error_str = len(str(original_error_str)) len_original_error_str = len(str(original_error_str))
truncate_standard_logging_payload_content(standard_logging_payload) _custom_logger.truncate_standard_logging_payload_content(standard_logging_payload)
# Original messages, response, and error_str should NOT BE MODIFIED # Original messages, response, and error_str should NOT BE MODIFIED
assert standard_logging_payload["messages"] != original_messages assert standard_logging_payload["messages"] != original_messages