From ec3bcf189f8f81c8df29a26637618e9937cb4a19 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Fri, 3 Jan 2025 08:09:03 -0800 Subject: [PATCH] (fix) GCS bucket logger - apply truncate_standard_logging_payload_content to standard_logging_payload and ensure GCS flushes queue on fails (#7519) * fix async_send_batch for gcs * fix truncate GCS logger * test_truncate_standard_logging_payload --- litellm/integrations/custom_logger.py | 57 +++++++++++++++++++ litellm/integrations/datadog/datadog.py | 6 +- litellm/integrations/gcs_bucket/gcs_bucket.py | 48 ++++++++-------- litellm/litellm_core_utils/litellm_logging.py | 54 ------------------ .../test_standard_logging_payload.py | 6 +- 5 files changed, 87 insertions(+), 84 deletions(-) diff --git a/litellm/integrations/custom_logger.py b/litellm/integrations/custom_logger.py index 6045244c4d..0164cbc322 100644 --- a/litellm/integrations/custom_logger.py +++ b/litellm/integrations/custom_logger.py @@ -293,3 +293,60 @@ class CustomLogger: # https://docs.litellm.ai/docs/observability/custom_callbac except Exception: print_verbose(f"Custom Logger Error - {traceback.format_exc()}") pass + + # Useful helpers for custom logger classes + + def truncate_standard_logging_payload_content( + self, + standard_logging_object: StandardLoggingPayload, + ): + """ + Truncate error strings and message content in logging payload + + Some loggers like DataDog/ GCS Bucket have a limit on the size of the payload. (1MB) + + This function truncates the error string and the message content if they exceed a certain length. + """ + MAX_STR_LENGTH = 10_000 + + # Truncate fields that might exceed max length + fields_to_truncate = ["error_str", "messages", "response"] + for field in fields_to_truncate: + self._truncate_field( + standard_logging_object=standard_logging_object, + field_name=field, + max_length=MAX_STR_LENGTH, + ) + + def _truncate_field( + self, + standard_logging_object: StandardLoggingPayload, + field_name: str, + max_length: int, + ) -> None: + """ + Helper function to truncate a field in the logging payload + + This converts the field to a string and then truncates it if it exceeds the max length. + + Why convert to string ? + 1. User was sending a poorly formatted list for `messages` field, we could not predict where they would send content + - Converting to string and then truncating the logged content catches this + 2. We want to avoid modifying the original `messages`, `response`, and `error_str` in the logging payload since these are in kwargs and could be returned to the user + """ + field_value = standard_logging_object.get(field_name) # type: ignore + if field_value: + str_value = str(field_value) + if len(str_value) > max_length: + standard_logging_object[field_name] = self._truncate_text( # type: ignore + text=str_value, max_length=max_length + ) + + def _truncate_text(self, text: str, max_length: int) -> str: + """Truncate text if it exceeds max_length""" + return ( + text[:max_length] + + "...truncated by litellm, this logger does not support large content" + if len(text) > max_length + else text + ) diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py index e8a74baa78..a1ee812917 100644 --- a/litellm/integrations/datadog/datadog.py +++ b/litellm/integrations/datadog/datadog.py @@ -256,10 +256,6 @@ class DataDogLogger(CustomBatchLogger): """ import json - from litellm.litellm_core_utils.litellm_logging import ( - truncate_standard_logging_payload_content, - ) - standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get( "standard_logging_object", None ) @@ -271,7 +267,7 @@ class DataDogLogger(CustomBatchLogger): status = DataDogStatus.ERROR # Build the initial payload - truncate_standard_logging_payload_content(standard_logging_object) + self.truncate_standard_logging_payload_content(standard_logging_object) json_payload = json.dumps(standard_logging_object, default=str) verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) diff --git a/litellm/integrations/gcs_bucket/gcs_bucket.py b/litellm/integrations/gcs_bucket/gcs_bucket.py index 0c59d0c93c..8510d6b03f 100644 --- a/litellm/integrations/gcs_bucket/gcs_bucket.py +++ b/litellm/integrations/gcs_bucket/gcs_bucket.py @@ -64,7 +64,7 @@ class GCSBucketLogger(GCSBucketBase): ) if logging_payload is None: raise ValueError("standard_logging_object not found in kwargs") - + self.truncate_standard_logging_payload_content(logging_payload) # Add to logging queue - this will be flushed periodically self.log_queue.append( GCSLogQueueItem( @@ -88,7 +88,7 @@ class GCSBucketLogger(GCSBucketBase): ) if logging_payload is None: raise ValueError("standard_logging_object not found in kwargs") - + self.truncate_standard_logging_payload_content(logging_payload) # Add to logging queue - this will be flushed periodically self.log_queue.append( GCSLogQueueItem( @@ -114,35 +114,37 @@ class GCSBucketLogger(GCSBucketBase): if not self.log_queue: return - try: - for log_item in self.log_queue: - logging_payload = log_item["payload"] - kwargs = log_item["kwargs"] - response_obj = log_item.get("response_obj", None) or {} + for log_item in self.log_queue: + logging_payload = log_item["payload"] + kwargs = log_item["kwargs"] + response_obj = log_item.get("response_obj", None) or {} - gcs_logging_config: GCSLoggingConfig = ( - await self.get_gcs_logging_config(kwargs) - ) - headers = await self.construct_request_headers( - vertex_instance=gcs_logging_config["vertex_instance"], - service_account_json=gcs_logging_config["path_service_account"], - ) - bucket_name = gcs_logging_config["bucket_name"] - object_name = self._get_object_name( - kwargs, logging_payload, response_obj - ) + gcs_logging_config: GCSLoggingConfig = await self.get_gcs_logging_config( + kwargs + ) + headers = await self.construct_request_headers( + vertex_instance=gcs_logging_config["vertex_instance"], + service_account_json=gcs_logging_config["path_service_account"], + ) + bucket_name = gcs_logging_config["bucket_name"] + object_name = self._get_object_name(kwargs, logging_payload, response_obj) + + try: await self._log_json_data_on_gcs( headers=headers, bucket_name=bucket_name, object_name=object_name, logging_payload=logging_payload, ) + except Exception as e: + # don't let one log item fail the entire batch + verbose_logger.exception( + f"GCS Bucket error logging payload to GCS bucket: {str(e)}" + ) + pass - # Clear the queue after processing - self.log_queue.clear() - - except Exception as e: - verbose_logger.exception(f"GCS Bucket batch logging error: {str(e)}") + # Clear the queue after processing + self.log_queue.clear() def _get_object_name( self, kwargs: Dict, logging_payload: StandardLoggingPayload, response_obj: Any diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index d783279444..f2b58255a2 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -3034,60 +3034,6 @@ def get_standard_logging_object_payload( return None -def truncate_standard_logging_payload_content( - standard_logging_object: StandardLoggingPayload, -): - """ - Truncate error strings and message content in logging payload - - Some loggers like DataDog have a limit on the size of the payload. (1MB) - - This function truncates the error string and the message content if they exceed a certain length. - """ - MAX_STR_LENGTH = 10_000 - - # Truncate fields that might exceed max length - fields_to_truncate = ["error_str", "messages", "response"] - for field in fields_to_truncate: - _truncate_field( - standard_logging_object=standard_logging_object, - field_name=field, - max_length=MAX_STR_LENGTH, - ) - - -def _truncate_text(text: str, max_length: int) -> str: - """Truncate text if it exceeds max_length""" - return ( - text[:max_length] - + "...truncated by litellm, this logger does not support large content" - if len(text) > max_length - else text - ) - - -def _truncate_field( - standard_logging_object: StandardLoggingPayload, field_name: str, max_length: int -) -> None: - """ - Helper function to truncate a field in the logging payload - - This converts the field to a string and then truncates it if it exceeds the max length. - - Why convert to string ? - 1. User was sending a poorly formatted list for `messages` field, we could not predict where they would send content - - Converting to string and then truncating the logged content catches this - 2. We want to avoid modifying the original `messages`, `response`, and `error_str` in the logging payload since these are in kwargs and could be returned to the user - """ - field_value = standard_logging_object.get(field_name) # type: ignore - if field_value: - str_value = str(field_value) - if len(str_value) > max_length: - standard_logging_object[field_name] = _truncate_text( # type: ignore - text=str_value, max_length=max_length - ) - - def get_standard_logging_metadata( metadata: Optional[Dict[str, Any]] ) -> StandardLoggingMetadata: diff --git a/tests/logging_callback_tests/test_standard_logging_payload.py b/tests/logging_callback_tests/test_standard_logging_payload.py index bd8e7f46a6..084be4756b 100644 --- a/tests/logging_callback_tests/test_standard_logging_payload.py +++ b/tests/logging_callback_tests/test_standard_logging_payload.py @@ -28,9 +28,10 @@ from create_mock_standard_logging_payload import ( ) from litellm.litellm_core_utils.litellm_logging import ( StandardLoggingPayloadSetup, - truncate_standard_logging_payload_content, ) +from litellm.integrations.custom_logger import CustomLogger + @pytest.mark.parametrize( "response_obj,expected_values", @@ -332,6 +333,7 @@ def test_truncate_standard_logging_payload(): 1. original messages, response, and error_str should NOT BE MODIFIED, since these are from kwargs 2. the `messages`, `response`, and `error_str` in new standard_logging_payload should be truncated """ + _custom_logger = CustomLogger() standard_logging_payload: StandardLoggingPayload = ( create_standard_logging_payload_with_long_content() ) @@ -342,7 +344,7 @@ def test_truncate_standard_logging_payload(): original_error_str = standard_logging_payload["error_str"] len_original_error_str = len(str(original_error_str)) - truncate_standard_logging_payload_content(standard_logging_payload) + _custom_logger.truncate_standard_logging_payload_content(standard_logging_payload) # Original messages, response, and error_str should NOT BE MODIFIED assert standard_logging_payload["messages"] != original_messages