diff --git a/litellm/integrations/s3.py b/litellm/integrations/s3.py index b9e1ffc92..bf1fa2111 100644 --- a/litellm/integrations/s3.py +++ b/litellm/integrations/s3.py @@ -1,7 +1,9 @@ #### What this does #### # On success + failure, log events to Supabase +import asyncio import datetime +import json import os import subprocess import sys @@ -9,27 +11,35 @@ import traceback import uuid from typing import Optional +import httpx + import litellm from litellm._logging import print_verbose, verbose_logger +from litellm.llms.base_aws_llm import BaseAWSLLM +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, + httpxSpecialProvider, +) from litellm.types.utils import StandardLoggingPayload +from .custom_batch_logger import CustomBatchLogger from .custom_logger import CustomLogger -class S3Logger(CustomLogger): +class S3Logger(CustomBatchLogger, BaseAWSLLM): # Class variables or attributes def __init__( self, - s3_bucket_name=None, - s3_path=None, - s3_region_name=None, - s3_api_version=None, - s3_use_ssl=True, - s3_verify=None, - s3_endpoint_url=None, - s3_aws_access_key_id=None, - s3_aws_secret_access_key=None, - s3_aws_session_token=None, + s3_bucket_name: Optional[str] = None, + s3_path: Optional[str] = None, + s3_region_name: Optional[str] = None, + s3_api_version: Optional[str] = None, + s3_use_ssl: bool = True, + s3_verify: Optional[bool] = None, + s3_endpoint_url: Optional[str] = None, + s3_aws_access_key_id: Optional[str] = None, + s3_aws_secret_access_key: Optional[str] = None, + s3_aws_session_token: Optional[str] = None, s3_config=None, **kwargs, ): @@ -39,6 +49,10 @@ class S3Logger(CustomLogger): verbose_logger.debug( f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}" ) + self.async_httpx_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback, + params={"concurrent_limit": 1}, + ) if litellm.s3_callback_params is not None: # read in .env variables - example os.environ/AWS_BUCKET_NAME @@ -68,24 +82,83 @@ class S3Logger(CustomLogger): self.bucket_name = s3_bucket_name self.s3_path = s3_path verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}") - # Create an S3 client with custom endpoint URL - self.s3_client = boto3.client( - "s3", - region_name=s3_region_name, - endpoint_url=s3_endpoint_url, - api_version=s3_api_version, - use_ssl=s3_use_ssl, - verify=s3_verify, - aws_access_key_id=s3_aws_access_key_id, - aws_secret_access_key=s3_aws_secret_access_key, - aws_session_token=s3_aws_session_token, - config=s3_config, - **kwargs, - ) + self.s3_bucket_name = s3_bucket_name + self.s3_region_name = s3_region_name + self.s3_api_version = s3_api_version + self.s3_use_ssl = s3_use_ssl + self.s3_verify = s3_verify + self.s3_endpoint_url = s3_endpoint_url + self.s3_aws_access_key_id = s3_aws_access_key_id + self.s3_aws_secret_access_key = s3_aws_secret_access_key + self.s3_aws_session_token = s3_aws_session_token + self.s3_config = s3_config + self.init_kwargs = kwargs + + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + # Call CustomLogger's __init__ + CustomBatchLogger.__init__(self, flush_lock=self.flush_lock) + + # Call BaseAWSLLM's __init__ + BaseAWSLLM.__init__(self) + except Exception as e: print_verbose(f"Got exception on init s3 client {str(e)}") raise e + async def upload_data_to_s3(self, data: StandardLoggingPayload): + try: + import hashlib + + import boto3 + import requests + from botocore.auth import SigV4Auth + from botocore.awsrequest import AWSRequest + from botocore.credentials import Credentials + except ImportError: + raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") + + credentials: Credentials = self.get_credentials( + aws_access_key_id=self.s3_aws_access_key_id, + aws_secret_access_key=self.s3_aws_secret_access_key, + aws_session_token=self.s3_aws_session_token, + aws_region_name=self.s3_region_name, + ) + object_name = uuid.uuid4().hex + # Prepare the URL + url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{object_name}" + + # Convert JSON to string + json_string = json.dumps(data) + + # Calculate SHA256 hash of the content + content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest() + + # Prepare the request + headers = { + "Content-Type": "application/json", + "x-amz-content-sha256": content_hash, + } + req = requests.Request("PUT", url, data=json_string, headers=headers) + prepped = req.prepare() + + # Sign the request + aws_request = AWSRequest( + method=prepped.method, + url=prepped.url, + data=prepped.body, + headers=prepped.headers, + ) + SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request) + + # Prepare the signed headers + signed_headers = dict(aws_request.headers.items()) + + # Make the request + asyncio.create_task( + self.async_httpx_client.put(url, data=json_string, headers=signed_headers) + ) + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): try: verbose_logger.debug( @@ -141,26 +214,31 @@ class S3Logger(CustomLogger): + ".json" ) - import json + verbose_logger.debug("\ns3 Logger - Logging payload = %s", payload) - payload_str = json.dumps(payload) - - print_verbose(f"\ns3 Logger - Logging payload = {payload_str}") - - response = self.s3_client.put_object( - Bucket=self.bucket_name, - Key=s3_object_key, - Body=payload_str, - ContentType="application/json", - ContentLanguage="en", - ContentDisposition=f'inline; filename="{s3_object_download_filename}"', - CacheControl="private, immutable, max-age=31536000, s-maxage=0", + self.log_queue.append(payload) + verbose_logger.debug( + "s3 logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, ) - - print_verbose(f"Response from s3:{str(response)}") - - print_verbose(f"s3 Layer Logging - final response object: {response_obj}") - return response + if len(self.log_queue) >= self.batch_size: + await self.flush_queue() except Exception as e: verbose_logger.exception(f"s3 Layer Error - {str(e)}") pass + + async def async_send_batch(self): + """ + + Sends runs from self.log_queue + + Returns: None + + Raises: Does not raise an exception, will only verbose_logger.exception() + """ + if not self.log_queue: + return + + for payload in self.log_queue: + asyncio.create_task(self.upload_data_to_s3(payload)) diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index d611aa87b..67bdc0d55 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -1,16 +1,7 @@ model_list: - model_name: db-openai-endpoint litellm_params: - model: openai/gpt-5 + model: openai/gpt-4 api_key: fake-key - api_base: https://exampleopenaiendpoint-production.up.railwaz.app/ - - model_name: db-openai-endpoint - litellm_params: - model: openai/gpt-5 - api_key: fake-key - api_base: https://exampleopenaiendpoint-production.up.railwxaz.app/ - -litellm_settings: - callbacks: ["prometheus"] - + api_base: https://exampleopenaiendpoint-production.up.railway.app/