diff --git a/litellm/__init__.py b/litellm/__init__.py index 54314c1ba..ac44c40d2 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -53,7 +53,6 @@ _custom_logger_compatible_callbacks_literal = Literal[ "arize", "langtrace", "gcs_bucket", - "s3", "opik", ] _known_custom_logger_compatible_callbacks: List = list( diff --git a/litellm/integrations/custom_batch_logger.py b/litellm/integrations/custom_batch_logger.py index 7ef63d25c..aa7f0bba2 100644 --- a/litellm/integrations/custom_batch_logger.py +++ b/litellm/integrations/custom_batch_logger.py @@ -21,7 +21,6 @@ class CustomBatchLogger(CustomLogger): self, flush_lock: Optional[asyncio.Lock] = None, batch_size: Optional[int] = DEFAULT_BATCH_SIZE, - flush_interval: Optional[int] = DEFAULT_FLUSH_INTERVAL_SECONDS, **kwargs, ) -> None: """ @@ -29,7 +28,7 @@ class CustomBatchLogger(CustomLogger): flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching """ self.log_queue: List = [] - self.flush_interval = flush_interval or DEFAULT_FLUSH_INTERVAL_SECONDS + self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE self.last_flush_time = time.time() self.flush_lock = flush_lock diff --git a/litellm/integrations/s3.py b/litellm/integrations/s3.py index 397b53cd5..1f82406e1 100644 --- a/litellm/integrations/s3.py +++ b/litellm/integrations/s3.py @@ -1,67 +1,43 @@ -""" -s3 Bucket Logging Integration +#### What this does #### +# On success + failure, log events to Supabase -async_log_success_event: Processes the event, stores it in memory for 10 seconds or until MAX_BATCH_SIZE and then flushes to s3 - -NOTE 1: S3 does not provide a BATCH PUT API endpoint, so we create tasks to upload each element individually -NOTE 2: We create a httpx client with a concurrent limit of 1 to upload to s3. Files should get uploaded BUT they should not impact latency of LLM calling logic -""" - -import asyncio -import json -from datetime import datetime -from typing import Dict, List, Optional +import datetime +import os +import subprocess +import sys +import traceback +import uuid +from typing import Optional import litellm from litellm._logging import print_verbose, verbose_logger -from litellm.llms.base_aws_llm import BaseAWSLLM -from litellm.llms.custom_httpx.http_handler import ( - _get_httpx_client, - get_async_httpx_client, - httpxSpecialProvider, -) -from litellm.types.integrations.s3 import s3BatchLoggingElement from litellm.types.utils import StandardLoggingPayload -from .custom_batch_logger import CustomBatchLogger -# Default Flush interval and batch size for s3 -# Flush to s3 every 10 seconds OR every 1K requests in memory -DEFAULT_S3_FLUSH_INTERVAL_SECONDS = 10 -DEFAULT_S3_BATCH_SIZE = 1000 - - -class S3Logger(CustomBatchLogger, BaseAWSLLM): +class S3Logger: # Class variables or attributes def __init__( self, - s3_bucket_name: Optional[str] = None, - s3_path: Optional[str] = None, - s3_region_name: Optional[str] = None, - s3_api_version: Optional[str] = None, - s3_use_ssl: bool = True, - s3_verify: Optional[bool] = None, - s3_endpoint_url: Optional[str] = None, - s3_aws_access_key_id: Optional[str] = None, - s3_aws_secret_access_key: Optional[str] = None, - s3_aws_session_token: Optional[str] = None, - s3_flush_interval: Optional[int] = DEFAULT_S3_FLUSH_INTERVAL_SECONDS, - s3_batch_size: Optional[int] = DEFAULT_S3_BATCH_SIZE, + s3_bucket_name=None, + s3_path=None, + s3_region_name=None, + s3_api_version=None, + s3_use_ssl=True, + s3_verify=None, + s3_endpoint_url=None, + s3_aws_access_key_id=None, + s3_aws_secret_access_key=None, + s3_aws_session_token=None, s3_config=None, **kwargs, ): + import boto3 + try: verbose_logger.debug( f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}" ) - # IMPORTANT: We use a concurrent limit of 1 to upload to s3 - # Files should get uploaded BUT they should not impact latency of LLM calling logic - self.async_httpx_client = get_async_httpx_client( - llm_provider=httpxSpecialProvider.LoggingCallback, - params={"concurrent_limit": 1}, - ) - if litellm.s3_callback_params is not None: # read in .env variables - example os.environ/AWS_BUCKET_NAME for key, value in litellm.s3_callback_params.items(): @@ -87,282 +63,107 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM): s3_path = litellm.s3_callback_params.get("s3_path") # done reading litellm.s3_callback_params - s3_flush_interval = litellm.s3_callback_params.get( - "s3_flush_interval", DEFAULT_S3_FLUSH_INTERVAL_SECONDS - ) - s3_batch_size = litellm.s3_callback_params.get( - "s3_batch_size", DEFAULT_S3_BATCH_SIZE - ) - self.bucket_name = s3_bucket_name self.s3_path = s3_path verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}") - self.s3_bucket_name = s3_bucket_name - self.s3_region_name = s3_region_name - self.s3_api_version = s3_api_version - self.s3_use_ssl = s3_use_ssl - self.s3_verify = s3_verify - self.s3_endpoint_url = s3_endpoint_url - self.s3_aws_access_key_id = s3_aws_access_key_id - self.s3_aws_secret_access_key = s3_aws_secret_access_key - self.s3_aws_session_token = s3_aws_session_token - self.s3_config = s3_config - self.init_kwargs = kwargs - - asyncio.create_task(self.periodic_flush()) - self.flush_lock = asyncio.Lock() - - verbose_logger.debug( - f"s3 flush interval: {s3_flush_interval}, s3 batch size: {s3_batch_size}" + # Create an S3 client with custom endpoint URL + self.s3_client = boto3.client( + "s3", + region_name=s3_region_name, + endpoint_url=s3_endpoint_url, + api_version=s3_api_version, + use_ssl=s3_use_ssl, + verify=s3_verify, + aws_access_key_id=s3_aws_access_key_id, + aws_secret_access_key=s3_aws_secret_access_key, + aws_session_token=s3_aws_session_token, + config=s3_config, + **kwargs, ) - # Call CustomLogger's __init__ - CustomBatchLogger.__init__( - self, - flush_lock=self.flush_lock, - flush_interval=s3_flush_interval, - batch_size=s3_batch_size, - ) - self.log_queue: List[s3BatchLoggingElement] = [] - - # Call BaseAWSLLM's __init__ - BaseAWSLLM.__init__(self) - except Exception as e: print_verbose(f"Got exception on init s3 client {str(e)}") raise e - async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + async def _async_log_event( + self, kwargs, response_obj, start_time, end_time, print_verbose + ): + self.log_event(kwargs, response_obj, start_time, end_time, print_verbose) + + def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose): try: verbose_logger.debug( f"s3 Logging - Enters logging function for model {kwargs}" ) - s3_batch_logging_element = self.create_s3_batch_logging_element( - start_time=start_time, - standard_logging_payload=kwargs.get("standard_logging_object", None), - s3_path=self.s3_path, + # construct payload to send to s3 + # follows the same params as langfuse.py + litellm_params = kwargs.get("litellm_params", {}) + metadata = ( + litellm_params.get("metadata", {}) or {} + ) # if litellm_params['metadata'] == None + + # Clean Metadata before logging - never log raw metadata + # the raw metadata can contain circular references which leads to infinite recursion + # we clean out all extra litellm metadata params before logging + clean_metadata = {} + if isinstance(metadata, dict): + for key, value in metadata.items(): + # clean litellm metadata before logging + if key in [ + "headers", + "endpoint", + "caching_groups", + "previous_models", + ]: + continue + else: + clean_metadata[key] = value + + # Ensure everything in the payload is converted to str + payload: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object", None ) - if s3_batch_logging_element is None: - raise ValueError("s3_batch_logging_element is None") + if payload is None: + return - verbose_logger.debug( - "\ns3 Logger - Logging payload = %s", s3_batch_logging_element + s3_file_name = litellm.utils.get_logging_id(start_time, payload) or "" + s3_object_key = ( + (self.s3_path.rstrip("/") + "/" if self.s3_path else "") + + start_time.strftime("%Y-%m-%d") + + "/" + + s3_file_name + ) # we need the s3 key to include the time, so we log cache hits too + s3_object_key += ".json" + + s3_object_download_filename = ( + "time-" + + start_time.strftime("%Y-%m-%dT%H-%M-%S-%f") + + "_" + + payload["id"] + + ".json" ) - self.log_queue.append(s3_batch_logging_element) - verbose_logger.debug( - "s3 logging: queue length %s, batch size %s", - len(self.log_queue), - self.batch_size, + import json + + payload_str = json.dumps(payload) + + print_verbose(f"\ns3 Logger - Logging payload = {payload_str}") + + response = self.s3_client.put_object( + Bucket=self.bucket_name, + Key=s3_object_key, + Body=payload_str, + ContentType="application/json", + ContentLanguage="en", + ContentDisposition=f'inline; filename="{s3_object_download_filename}"', + CacheControl="private, immutable, max-age=31536000, s-maxage=0", ) - if len(self.log_queue) >= self.batch_size: - await self.flush_queue() + + print_verbose(f"Response from s3:{str(response)}") + + print_verbose(f"s3 Layer Logging - final response object: {response_obj}") + return response except Exception as e: verbose_logger.exception(f"s3 Layer Error - {str(e)}") pass - - def log_success_event(self, kwargs, response_obj, start_time, end_time): - """ - Synchronous logging function to log to s3 - - Does not batch logging requests, instantly logs on s3 Bucket - """ - try: - s3_batch_logging_element = self.create_s3_batch_logging_element( - start_time=start_time, - standard_logging_payload=kwargs.get("standard_logging_object", None), - s3_path=self.s3_path, - ) - - if s3_batch_logging_element is None: - raise ValueError("s3_batch_logging_element is None") - - verbose_logger.debug( - "\ns3 Logger - Logging payload = %s", s3_batch_logging_element - ) - - # log the element sync httpx client - self.upload_data_to_s3(s3_batch_logging_element) - except Exception as e: - verbose_logger.exception(f"s3 Layer Error - {str(e)}") - pass - - async def async_upload_data_to_s3( - self, batch_logging_element: s3BatchLoggingElement - ): - try: - import hashlib - - import boto3 - import requests - from botocore.auth import SigV4Auth - from botocore.awsrequest import AWSRequest - from botocore.credentials import Credentials - except ImportError: - raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") - try: - credentials: Credentials = self.get_credentials( - aws_access_key_id=self.s3_aws_access_key_id, - aws_secret_access_key=self.s3_aws_secret_access_key, - aws_session_token=self.s3_aws_session_token, - aws_region_name=self.s3_region_name, - ) - - # Prepare the URL - url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{batch_logging_element.s3_object_key}" - - if self.s3_endpoint_url: - url = self.s3_endpoint_url + "/" + batch_logging_element.s3_object_key - - # Convert JSON to string - json_string = json.dumps(batch_logging_element.payload) - - # Calculate SHA256 hash of the content - content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest() - - # Prepare the request - headers = { - "Content-Type": "application/json", - "x-amz-content-sha256": content_hash, - "Content-Language": "en", - "Content-Disposition": f'inline; filename="{batch_logging_element.s3_object_download_filename}"', - "Cache-Control": "private, immutable, max-age=31536000, s-maxage=0", - } - req = requests.Request("PUT", url, data=json_string, headers=headers) - prepped = req.prepare() - - # Sign the request - aws_request = AWSRequest( - method=prepped.method, - url=prepped.url, - data=prepped.body, - headers=prepped.headers, - ) - SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request) - - # Prepare the signed headers - signed_headers = dict(aws_request.headers.items()) - - # Make the request - response = await self.async_httpx_client.put( - url, data=json_string, headers=signed_headers - ) - response.raise_for_status() - except Exception as e: - verbose_logger.exception(f"Error uploading to s3: {str(e)}") - - async def async_send_batch(self): - """ - - Sends runs from self.log_queue - - Returns: None - - Raises: Does not raise an exception, will only verbose_logger.exception() - """ - if not self.log_queue: - return - - for payload in self.log_queue: - asyncio.create_task(self.async_upload_data_to_s3(payload)) - - def create_s3_batch_logging_element( - self, - start_time: datetime, - standard_logging_payload: Optional[StandardLoggingPayload], - s3_path: Optional[str], - ) -> Optional[s3BatchLoggingElement]: - """ - Helper function to create an s3BatchLoggingElement. - - Args: - start_time (datetime): The start time of the logging event. - standard_logging_payload (Optional[StandardLoggingPayload]): The payload to be logged. - s3_path (Optional[str]): The S3 path prefix. - - Returns: - Optional[s3BatchLoggingElement]: The created s3BatchLoggingElement, or None if payload is None. - """ - if standard_logging_payload is None: - return None - - s3_file_name = ( - litellm.utils.get_logging_id(start_time, standard_logging_payload) or "" - ) - s3_object_key = ( - (s3_path.rstrip("/") + "/" if s3_path else "") - + start_time.strftime("%Y-%m-%d") - + "/" - + s3_file_name - + ".json" - ) - - s3_object_download_filename = f"time-{start_time.strftime('%Y-%m-%dT%H-%M-%S-%f')}_{standard_logging_payload['id']}.json" - - return s3BatchLoggingElement( - payload=standard_logging_payload, # type: ignore - s3_object_key=s3_object_key, - s3_object_download_filename=s3_object_download_filename, - ) - - def upload_data_to_s3(self, batch_logging_element: s3BatchLoggingElement): - try: - import hashlib - - import boto3 - import requests - from botocore.auth import SigV4Auth - from botocore.awsrequest import AWSRequest - from botocore.credentials import Credentials - except ImportError: - raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") - try: - credentials: Credentials = self.get_credentials( - aws_access_key_id=self.s3_aws_access_key_id, - aws_secret_access_key=self.s3_aws_secret_access_key, - aws_session_token=self.s3_aws_session_token, - aws_region_name=self.s3_region_name, - ) - - # Prepare the URL - url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{batch_logging_element.s3_object_key}" - - if self.s3_endpoint_url: - url = self.s3_endpoint_url + "/" + batch_logging_element.s3_object_key - - # Convert JSON to string - json_string = json.dumps(batch_logging_element.payload) - - # Calculate SHA256 hash of the content - content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest() - - # Prepare the request - headers = { - "Content-Type": "application/json", - "x-amz-content-sha256": content_hash, - "Content-Language": "en", - "Content-Disposition": f'inline; filename="{batch_logging_element.s3_object_download_filename}"', - "Cache-Control": "private, immutable, max-age=31536000, s-maxage=0", - } - req = requests.Request("PUT", url, data=json_string, headers=headers) - prepped = req.prepare() - - # Sign the request - aws_request = AWSRequest( - method=prepped.method, - url=prepped.url, - data=prepped.body, - headers=prepped.headers, - ) - SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request) - - # Prepare the signed headers - signed_headers = dict(aws_request.headers.items()) - - httpx_client = _get_httpx_client() - # Make the request - response = httpx_client.put(url, data=json_string, headers=signed_headers) - response.raise_for_status() - except Exception as e: - verbose_logger.exception(f"Error uploading to s3: {str(e)}") diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 8b6dbc1f9..c23f9a979 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -119,6 +119,7 @@ lagoLogger = None dataDogLogger = None prometheusLogger = None dynamoLogger = None +s3Logger = None genericAPILogger = None clickHouseLogger = None greenscaleLogger = None @@ -1328,6 +1329,36 @@ class Logging: user_id=kwargs.get("user", None), print_verbose=print_verbose, ) + if callback == "s3": + global s3Logger + if s3Logger is None: + s3Logger = S3Logger() + if self.stream: + if "complete_streaming_response" in self.model_call_details: + print_verbose( + "S3Logger Logger: Got Stream Event - Completed Stream Response" + ) + s3Logger.log_event( + kwargs=self.model_call_details, + response_obj=self.model_call_details[ + "complete_streaming_response" + ], + start_time=start_time, + end_time=end_time, + print_verbose=print_verbose, + ) + else: + print_verbose( + "S3Logger Logger: Got Stream Event - No complete stream response as yet" + ) + else: + s3Logger.log_event( + kwargs=self.model_call_details, + response_obj=result, + start_time=start_time, + end_time=end_time, + print_verbose=print_verbose, + ) if ( callback == "openmeter" and self.model_call_details.get("litellm_params", {}).get( @@ -2163,7 +2194,7 @@ def set_callbacks(callback_list, function_id=None): """ Globally sets the callback client """ - global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger + global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger try: for callback in callback_list: @@ -2237,6 +2268,8 @@ def set_callbacks(callback_list, function_id=None): dataDogLogger = DataDogLogger() elif callback == "dynamodb": dynamoLogger = DyanmoDBLogger() + elif callback == "s3": + s3Logger = S3Logger() elif callback == "wandb": weightsBiasesLogger = WeightsBiasesLogger() elif callback == "logfire": @@ -2330,14 +2363,6 @@ def _init_custom_logger_compatible_class( _datadog_logger = DataDogLogger() _in_memory_loggers.append(_datadog_logger) return _datadog_logger # type: ignore - elif logging_integration == "s3": - for callback in _in_memory_loggers: - if isinstance(callback, S3Logger): - return callback # type: ignore - - _s3_logger = S3Logger() - _in_memory_loggers.append(_s3_logger) - return _s3_logger # type: ignore elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): @@ -2504,10 +2529,6 @@ def get_custom_logger_compatible_class( for callback in _in_memory_loggers: if isinstance(callback, PrometheusLogger): return callback - elif logging_integration == "s3": - for callback in _in_memory_loggers: - if isinstance(callback, S3Logger): - return callback elif logging_integration == "datadog": for callback in _in_memory_loggers: if isinstance(callback, DataDogLogger): diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index 7c70332fd..707f76ce8 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -3,7 +3,12 @@ model_list: litellm_params: model: openai/fake api_key: fake-key - api_base: https://exampleopenaiendpoint-production.up.railway.app/ + api_base: https://exampleopenaiendpoint-production.up.railwaz.app/ + - model_name: db-openai-endpoint + litellm_params: + model: openai/gpt-5 + api_key: fake-key + api_base: https://exampleopenaiendpoint-production.up.railwxaz.app/ litellm_settings: callbacks: ["arize"] diff --git a/litellm/types/integrations/s3.py b/litellm/types/integrations/s3.py deleted file mode 100644 index d66e2c59d..000000000 --- a/litellm/types/integrations/s3.py +++ /dev/null @@ -1,14 +0,0 @@ -from typing import Dict - -from pydantic import BaseModel - - -class s3BatchLoggingElement(BaseModel): - """ - Type of element stored in self.log_queue in S3Logger - - """ - - payload: Dict - s3_object_key: str - s3_object_download_filename: str diff --git a/litellm/utils.py b/litellm/utils.py index 935887637..50da9f49a 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -208,6 +208,7 @@ lagoLogger = None dataDogLogger = None prometheusLogger = None dynamoLogger = None +s3Logger = None genericAPILogger = None clickHouseLogger = None greenscaleLogger = None diff --git a/tests/local_testing/test_amazing_s3_logs.py b/tests/local_testing/test_amazing_s3_logs.py index f489a5a0e..17efb177d 100644 --- a/tests/local_testing/test_amazing_s3_logs.py +++ b/tests/local_testing/test_amazing_s3_logs.py @@ -12,16 +12,18 @@ import litellm litellm.num_retries = 3 import time, random -from litellm._logging import verbose_logger -import logging import pytest import boto3 +from litellm._logging import verbose_logger +import logging @pytest.mark.asyncio -@pytest.mark.parametrize("sync_mode", [True, False]) -@pytest.mark.flaky(retries=6, delay=1) -async def test_basic_s3_logging(sync_mode): +@pytest.mark.parametrize( + "sync_mode,streaming", [(True, True), (True, False), (False, True), (False, False)] +) +@pytest.mark.flaky(retries=3, delay=1) +async def test_basic_s3_logging(sync_mode, streaming): verbose_logger.setLevel(level=logging.DEBUG) litellm.success_callback = ["s3"] litellm.s3_callback_params = { @@ -31,27 +33,41 @@ async def test_basic_s3_logging(sync_mode): "s3_region_name": "us-west-2", } litellm.set_verbose = True - + response_id = None if sync_mode is True: response = litellm.completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "This is a test"}], mock_response="It's simple to use and easy to get started", + stream=streaming, ) + if streaming: + for chunk in response: + print() + response_id = chunk.id + else: + response_id = response.id + time.sleep(2) else: response = await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "This is a test"}], mock_response="It's simple to use and easy to get started", + stream=streaming, ) + if streaming: + async for chunk in response: + print(chunk) + response_id = chunk.id + else: + response_id = response.id + await asyncio.sleep(2) print(f"response: {response}") - await asyncio.sleep(12) - total_objects, all_s3_keys = list_all_s3_objects("load-testing-oct") # assert that atlest one key has response.id in it - assert any(response.id in key for key in all_s3_keys) + assert any(response_id in key for key in all_s3_keys) s3 = boto3.client("s3") # delete all objects for key in all_s3_keys: