From f61d8596e123ef3ae11784b18f26150e7b3d1d38 Mon Sep 17 00:00:00 2001 From: ishaan-jaff Date: Thu, 11 Jan 2024 08:57:32 +0530 Subject: [PATCH] (fix) working s3 logging --- litellm/integrations/s3.py | 57 ++++++++++++++++++++++++++++---------- litellm/utils.py | 41 ++++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 15 deletions(-) diff --git a/litellm/integrations/s3.py b/litellm/integrations/s3.py index bbee5a82b4..5da1e629ba 100644 --- a/litellm/integrations/s3.py +++ b/litellm/integrations/s3.py @@ -11,19 +11,39 @@ import litellm, uuid from litellm._logging import print_verbose -class s3Logger: +class S3Logger: # Class variables or attributes - - def __init__(self): - # Instance variables + def __init__( + self, + s3_bucket_name="litellm-logs", + s3_region_name=None, + s3_api_version=None, + s3_use_ssl=True, + s3_verify=None, + s3_endpoint_url=None, + s3_aws_access_key_id=None, + s3_aws_secret_access_key=None, + s3_aws_session_token=None, + s3_config=None, + **kwargs, + ): import boto3 - self.s3 = boto3.resource("s3", region_name=os.environ["AWS_REGION_NAME"]) - if litellm.s3_table_name is None: - raise ValueError( - "LiteLLM Error, trying to use s3 but not table name passed. Create a table and set `litellm.s3_table_name=`" - ) - self.table_name = litellm.s3_table_name + self.bucket_name = s3_bucket_name + # Create an S3 client with custom endpoint URL + self.s3_client = boto3.client( + "s3", + region_name=s3_region_name, + endpoint_url=s3_endpoint_url, + api_version=s3_api_version, + use_ssl=s3_use_ssl, + verify=s3_verify, + aws_access_key_id=s3_aws_access_key_id, + aws_secret_access_key=s3_aws_secret_access_key, + aws_session_token=s3_aws_session_token, + config=s3_config, + **kwargs, + ) async def _async_log_event( self, kwargs, response_obj, start_time, end_time, print_verbose @@ -68,13 +88,22 @@ class s3Logger: except: # non blocking if it can't cast to a str pass + s3_object_key = payload["id"] + + import json + + payload = json.dumps(payload) print_verbose(f"\ns3 Logger - Logging payload = {payload}") - # put data in dyanmo DB - table = self.s3.Table(self.table_name) - # Assuming log_data is a dictionary with log information - response = table.put_item(Item=payload) + response = self.s3_client.put_object( + Bucket=self.bucket_name, + Key=s3_object_key, + Body=payload, + ContentType="application/json", + ContentLanguage="en", + ContentDisposition=f'inline; filename="{key}.json"', + ) print_verbose(f"Response from s3:{str(response)}") diff --git a/litellm/utils.py b/litellm/utils.py index 77e4de8e91..f3e743ec4c 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -47,6 +47,7 @@ from .integrations.weights_biases import WeightsBiasesLogger from .integrations.custom_logger import CustomLogger from .integrations.langfuse import LangFuseLogger from .integrations.dynamodb import DyanmoDBLogger +from .integrations.s3 import S3Logger from .integrations.litedebugger import LiteDebugger from .proxy._types import KeyManagementSystem from openai import OpenAIError as OriginalError @@ -90,6 +91,7 @@ weightsBiasesLogger = None customLogger = None langFuseLogger = None dynamoLogger = None +s3Logger = None llmonitorLogger = None aispendLogger = None berrispendLogger = None @@ -1459,6 +1461,36 @@ class Logging: end_time=end_time, print_verbose=print_verbose, ) + if callback == "s3": + global s3Logger + if s3Logger is None: + s3Logger = S3Logger() + if self.stream: + if "complete_streaming_response" in self.model_call_details: + print_verbose( + "S3Logger Logger: Got Stream Event - Completed Stream Response" + ) + await s3Logger._async_log_event( + kwargs=self.model_call_details, + response_obj=self.model_call_details[ + "complete_streaming_response" + ], + start_time=start_time, + end_time=end_time, + print_verbose=print_verbose, + ) + else: + print_verbose( + "S3Logger Logger: Got Stream Event - No complete stream response as yet" + ) + else: + await s3Logger._async_log_event( + kwargs=self.model_call_details, + response_obj=result, + start_time=start_time, + end_time=end_time, + print_verbose=print_verbose, + ) if callback == "langfuse": global langFuseLogger print_verbose("reaches Async langfuse for logging!") @@ -1806,6 +1838,11 @@ def client(original_function): # we only support async dynamo db logging for acompletion/aembedding since that's used on proxy litellm._async_success_callback.append(callback) removed_async_items.append(index) + elif callback == "s3": + # s3 is an async callback, it's used for the proxy and needs to be async + # we only support async s3 logging for acompletion/aembedding since that's used on proxy + litellm._async_success_callback.append(callback) + removed_async_items.append(index) elif callback == "langfuse" and inspect.iscoroutinefunction( original_function ): @@ -4678,7 +4715,7 @@ def validate_environment(model: Optional[str] = None) -> dict: def set_callbacks(callback_list, function_id=None): - global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, llmonitorLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, langsmithLogger, dynamoLogger + global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, llmonitorLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, langsmithLogger, dynamoLogger, s3Logger try: for callback in callback_list: print_verbose(f"callback: {callback}") @@ -4743,6 +4780,8 @@ def set_callbacks(callback_list, function_id=None): langFuseLogger = LangFuseLogger() elif callback == "dynamodb": dynamoLogger = DyanmoDBLogger() + elif callback == "s3": + s3Logger = S3Logger() elif callback == "wandb": weightsBiasesLogger = WeightsBiasesLogger() elif callback == "langsmith":