litellm/litellm/integrations/s3.py
Krish Dholakia 72e961af3c
LiteLLM Minor Fixes and Improvements (08/06/2024) (#5567)
* fix(utils.py): return citations for perplexity streaming

Fixes https://github.com/BerriAI/litellm/issues/5535

* fix(anthropic/chat.py): support fallbacks for anthropic streaming (#5542)

* fix(anthropic/chat.py): support fallbacks for anthropic streaming

Fixes https://github.com/BerriAI/litellm/issues/5512

* fix(anthropic/chat.py): use module level http client if none given (prevents early client closure)

* fix: fix linting errors

* fix(http_handler.py): fix raise_for_status error handling

* test: retry flaky test

* fix otel type

* fix(bedrock/embed): fix error raising

* test(test_openai_batches_and_files.py): skip azure batches test (for now) quota exceeded

* fix(test_router.py): skip azure batch route test (for now) - hit batch quota limits

---------

Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com>

* All `model_group_alias` should show up in `/models`, `/model/info` , `/model_group/info` (#5539)

* fix(router.py): support returning model_alias model names in `/v1/models`

* fix(proxy_server.py): support returning model alias'es on `/model/info`

* feat(router.py): support returning model group alias for `/model_group/info`

* fix(proxy_server.py): fix linting errors

* fix(proxy_server.py): fix linting errors

* build(model_prices_and_context_window.json): add amazon titan text premier pricing information

Closes https://github.com/BerriAI/litellm/issues/5560

* feat(litellm_logging.py): log standard logging response object for pass through endpoints. Allows bedrock /invoke agent calls to be correctly logged to langfuse + s3

* fix(success_handler.py): fix linting error

* fix(success_handler.py): fix linting errors

* fix(team_endpoints.py): Allows admin to update team member budgets

---------

Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com>
2024-09-06 17:16:24 -07:00

169 lines
6.4 KiB
Python

#### What this does ####
# On success + failure, log events to Supabase
import datetime
import os
import subprocess
import sys
import traceback
import uuid
from typing import Optional
import litellm
from litellm._logging import print_verbose, verbose_logger
from litellm.types.utils import StandardLoggingPayload
class S3Logger:
# Class variables or attributes
def __init__(
self,
s3_bucket_name=None,
s3_path=None,
s3_region_name=None,
s3_api_version=None,
s3_use_ssl=True,
s3_verify=None,
s3_endpoint_url=None,
s3_aws_access_key_id=None,
s3_aws_secret_access_key=None,
s3_aws_session_token=None,
s3_config=None,
**kwargs,
):
import boto3
try:
verbose_logger.debug(
f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}"
)
if litellm.s3_callback_params is not None:
# read in .env variables - example os.environ/AWS_BUCKET_NAME
for key, value in litellm.s3_callback_params.items():
if type(value) is str and value.startswith("os.environ/"):
litellm.s3_callback_params[key] = litellm.get_secret(value)
# now set s3 params from litellm.s3_logger_params
s3_bucket_name = litellm.s3_callback_params.get("s3_bucket_name")
s3_region_name = litellm.s3_callback_params.get("s3_region_name")
s3_api_version = litellm.s3_callback_params.get("s3_api_version")
s3_use_ssl = litellm.s3_callback_params.get("s3_use_ssl", True)
s3_verify = litellm.s3_callback_params.get("s3_verify")
s3_endpoint_url = litellm.s3_callback_params.get("s3_endpoint_url")
s3_aws_access_key_id = litellm.s3_callback_params.get(
"s3_aws_access_key_id"
)
s3_aws_secret_access_key = litellm.s3_callback_params.get(
"s3_aws_secret_access_key"
)
s3_aws_session_token = litellm.s3_callback_params.get(
"s3_aws_session_token"
)
s3_config = litellm.s3_callback_params.get("s3_config")
s3_path = litellm.s3_callback_params.get("s3_path")
# done reading litellm.s3_callback_params
self.bucket_name = s3_bucket_name
self.s3_path = s3_path
verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}")
# Create an S3 client with custom endpoint URL
self.s3_client = boto3.client(
"s3",
region_name=s3_region_name,
endpoint_url=s3_endpoint_url,
api_version=s3_api_version,
use_ssl=s3_use_ssl,
verify=s3_verify,
aws_access_key_id=s3_aws_access_key_id,
aws_secret_access_key=s3_aws_secret_access_key,
aws_session_token=s3_aws_session_token,
config=s3_config,
**kwargs,
)
except Exception as e:
print_verbose(f"Got exception on init s3 client {str(e)}")
raise e
async def _async_log_event(
self, kwargs, response_obj, start_time, end_time, print_verbose
):
self.log_event(kwargs, response_obj, start_time, end_time, print_verbose)
def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose):
try:
verbose_logger.debug(
f"s3 Logging - Enters logging function for model {kwargs}"
)
# construct payload to send to s3
# follows the same params as langfuse.py
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
# Clean Metadata before logging - never log raw metadata
# the raw metadata can contain circular references which leads to infinite recursion
# we clean out all extra litellm metadata params before logging
clean_metadata = {}
if isinstance(metadata, dict):
for key, value in metadata.items():
# clean litellm metadata before logging
if key in [
"headers",
"endpoint",
"caching_groups",
"previous_models",
]:
continue
else:
clean_metadata[key] = value
# Ensure everything in the payload is converted to str
payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None
)
if payload is None:
return
s3_file_name = litellm.utils.get_logging_id(start_time, payload) or ""
s3_object_key = (
(self.s3_path.rstrip("/") + "/" if self.s3_path else "")
+ start_time.strftime("%Y-%m-%d")
+ "/"
+ s3_file_name
) # we need the s3 key to include the time, so we log cache hits too
s3_object_key += ".json"
s3_object_download_filename = (
"time-"
+ start_time.strftime("%Y-%m-%dT%H-%M-%S-%f")
+ "_"
+ payload["id"]
+ ".json"
)
import json
payload = json.dumps(payload)
print_verbose(f"\ns3 Logger - Logging payload = {payload}")
response = self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_object_key,
Body=payload,
ContentType="application/json",
ContentLanguage="en",
ContentDisposition=f'inline; filename="{s3_object_download_filename}"',
CacheControl="private, immutable, max-age=31536000, s-maxage=0",
)
print_verbose(f"Response from s3:{str(response)}")
print_verbose(f"s3 Layer Logging - final response object: {response_obj}")
return response
except Exception as e:
verbose_logger.exception(f"s3 Layer Error - {str(e)}")
pass