(fix) working s3 logging

This commit is contained in:
ishaan-jaff 2024-01-11 08:57:32 +05:30
parent e04f76ad65
commit f61d8596e1
2 changed files with 83 additions and 15 deletions

View file

@ -11,19 +11,39 @@ import litellm, uuid
from litellm._logging import print_verbose from litellm._logging import print_verbose
class s3Logger: class S3Logger:
# Class variables or attributes # Class variables or attributes
def __init__(
def __init__(self): self,
# Instance variables s3_bucket_name="litellm-logs",
s3_region_name=None,
s3_api_version=None,
s3_use_ssl=True,
s3_verify=None,
s3_endpoint_url=None,
s3_aws_access_key_id=None,
s3_aws_secret_access_key=None,
s3_aws_session_token=None,
s3_config=None,
**kwargs,
):
import boto3 import boto3
self.s3 = boto3.resource("s3", region_name=os.environ["AWS_REGION_NAME"]) self.bucket_name = s3_bucket_name
if litellm.s3_table_name is None: # Create an S3 client with custom endpoint URL
raise ValueError( self.s3_client = boto3.client(
"LiteLLM Error, trying to use s3 but not table name passed. Create a table and set `litellm.s3_table_name=<your-table>`" "s3",
region_name=s3_region_name,
endpoint_url=s3_endpoint_url,
api_version=s3_api_version,
use_ssl=s3_use_ssl,
verify=s3_verify,
aws_access_key_id=s3_aws_access_key_id,
aws_secret_access_key=s3_aws_secret_access_key,
aws_session_token=s3_aws_session_token,
config=s3_config,
**kwargs,
) )
self.table_name = litellm.s3_table_name
async def _async_log_event( async def _async_log_event(
self, kwargs, response_obj, start_time, end_time, print_verbose self, kwargs, response_obj, start_time, end_time, print_verbose
@ -68,13 +88,22 @@ class s3Logger:
except: except:
# non blocking if it can't cast to a str # non blocking if it can't cast to a str
pass pass
s3_object_key = payload["id"]
import json
payload = json.dumps(payload)
print_verbose(f"\ns3 Logger - Logging payload = {payload}") print_verbose(f"\ns3 Logger - Logging payload = {payload}")
# put data in dyanmo DB response = self.s3_client.put_object(
table = self.s3.Table(self.table_name) Bucket=self.bucket_name,
# Assuming log_data is a dictionary with log information Key=s3_object_key,
response = table.put_item(Item=payload) Body=payload,
ContentType="application/json",
ContentLanguage="en",
ContentDisposition=f'inline; filename="{key}.json"',
)
print_verbose(f"Response from s3:{str(response)}") print_verbose(f"Response from s3:{str(response)}")

View file

@ -47,6 +47,7 @@ from .integrations.weights_biases import WeightsBiasesLogger
from .integrations.custom_logger import CustomLogger from .integrations.custom_logger import CustomLogger
from .integrations.langfuse import LangFuseLogger from .integrations.langfuse import LangFuseLogger
from .integrations.dynamodb import DyanmoDBLogger from .integrations.dynamodb import DyanmoDBLogger
from .integrations.s3 import S3Logger
from .integrations.litedebugger import LiteDebugger from .integrations.litedebugger import LiteDebugger
from .proxy._types import KeyManagementSystem from .proxy._types import KeyManagementSystem
from openai import OpenAIError as OriginalError from openai import OpenAIError as OriginalError
@ -90,6 +91,7 @@ weightsBiasesLogger = None
customLogger = None customLogger = None
langFuseLogger = None langFuseLogger = None
dynamoLogger = None dynamoLogger = None
s3Logger = None
llmonitorLogger = None llmonitorLogger = None
aispendLogger = None aispendLogger = None
berrispendLogger = None berrispendLogger = None
@ -1459,6 +1461,36 @@ class Logging:
end_time=end_time, end_time=end_time,
print_verbose=print_verbose, print_verbose=print_verbose,
) )
if callback == "s3":
global s3Logger
if s3Logger is None:
s3Logger = S3Logger()
if self.stream:
if "complete_streaming_response" in self.model_call_details:
print_verbose(
"S3Logger Logger: Got Stream Event - Completed Stream Response"
)
await s3Logger._async_log_event(
kwargs=self.model_call_details,
response_obj=self.model_call_details[
"complete_streaming_response"
],
start_time=start_time,
end_time=end_time,
print_verbose=print_verbose,
)
else:
print_verbose(
"S3Logger Logger: Got Stream Event - No complete stream response as yet"
)
else:
await s3Logger._async_log_event(
kwargs=self.model_call_details,
response_obj=result,
start_time=start_time,
end_time=end_time,
print_verbose=print_verbose,
)
if callback == "langfuse": if callback == "langfuse":
global langFuseLogger global langFuseLogger
print_verbose("reaches Async langfuse for logging!") print_verbose("reaches Async langfuse for logging!")
@ -1806,6 +1838,11 @@ def client(original_function):
# we only support async dynamo db logging for acompletion/aembedding since that's used on proxy # we only support async dynamo db logging for acompletion/aembedding since that's used on proxy
litellm._async_success_callback.append(callback) litellm._async_success_callback.append(callback)
removed_async_items.append(index) removed_async_items.append(index)
elif callback == "s3":
# s3 is an async callback, it's used for the proxy and needs to be async
# we only support async s3 logging for acompletion/aembedding since that's used on proxy
litellm._async_success_callback.append(callback)
removed_async_items.append(index)
elif callback == "langfuse" and inspect.iscoroutinefunction( elif callback == "langfuse" and inspect.iscoroutinefunction(
original_function original_function
): ):
@ -4678,7 +4715,7 @@ def validate_environment(model: Optional[str] = None) -> dict:
def set_callbacks(callback_list, function_id=None): def set_callbacks(callback_list, function_id=None):
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, llmonitorLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, langsmithLogger, dynamoLogger global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, llmonitorLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, langsmithLogger, dynamoLogger, s3Logger
try: try:
for callback in callback_list: for callback in callback_list:
print_verbose(f"callback: {callback}") print_verbose(f"callback: {callback}")
@ -4743,6 +4780,8 @@ def set_callbacks(callback_list, function_id=None):
langFuseLogger = LangFuseLogger() langFuseLogger = LangFuseLogger()
elif callback == "dynamodb": elif callback == "dynamodb":
dynamoLogger = DyanmoDBLogger() dynamoLogger = DyanmoDBLogger()
elif callback == "s3":
s3Logger = S3Logger()
elif callback == "wandb": elif callback == "wandb":
weightsBiasesLogger = WeightsBiasesLogger() weightsBiasesLogger = WeightsBiasesLogger()
elif callback == "langsmith": elif callback == "langsmith":