forked from phoenix/litellm-mirror
use batch logger for s3
This commit is contained in:
parent
1636459af8
commit
6865e3ce73
2 changed files with 123 additions and 54 deletions
|
@ -1,7 +1,9 @@
|
||||||
#### What this does ####
|
#### What this does ####
|
||||||
# On success + failure, log events to Supabase
|
# On success + failure, log events to Supabase
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import datetime
|
import datetime
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
@ -9,27 +11,35 @@ import traceback
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
import litellm
|
import litellm
|
||||||
from litellm._logging import print_verbose, verbose_logger
|
from litellm._logging import print_verbose, verbose_logger
|
||||||
|
from litellm.llms.base_aws_llm import BaseAWSLLM
|
||||||
|
from litellm.llms.custom_httpx.http_handler import (
|
||||||
|
get_async_httpx_client,
|
||||||
|
httpxSpecialProvider,
|
||||||
|
)
|
||||||
from litellm.types.utils import StandardLoggingPayload
|
from litellm.types.utils import StandardLoggingPayload
|
||||||
|
|
||||||
|
from .custom_batch_logger import CustomBatchLogger
|
||||||
from .custom_logger import CustomLogger
|
from .custom_logger import CustomLogger
|
||||||
|
|
||||||
|
|
||||||
class S3Logger(CustomLogger):
|
class S3Logger(CustomBatchLogger, BaseAWSLLM):
|
||||||
# Class variables or attributes
|
# Class variables or attributes
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
s3_bucket_name=None,
|
s3_bucket_name: Optional[str] = None,
|
||||||
s3_path=None,
|
s3_path: Optional[str] = None,
|
||||||
s3_region_name=None,
|
s3_region_name: Optional[str] = None,
|
||||||
s3_api_version=None,
|
s3_api_version: Optional[str] = None,
|
||||||
s3_use_ssl=True,
|
s3_use_ssl: bool = True,
|
||||||
s3_verify=None,
|
s3_verify: Optional[bool] = None,
|
||||||
s3_endpoint_url=None,
|
s3_endpoint_url: Optional[str] = None,
|
||||||
s3_aws_access_key_id=None,
|
s3_aws_access_key_id: Optional[str] = None,
|
||||||
s3_aws_secret_access_key=None,
|
s3_aws_secret_access_key: Optional[str] = None,
|
||||||
s3_aws_session_token=None,
|
s3_aws_session_token: Optional[str] = None,
|
||||||
s3_config=None,
|
s3_config=None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
|
@ -39,6 +49,10 @@ class S3Logger(CustomLogger):
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}"
|
f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}"
|
||||||
)
|
)
|
||||||
|
self.async_httpx_client = get_async_httpx_client(
|
||||||
|
llm_provider=httpxSpecialProvider.LoggingCallback,
|
||||||
|
params={"concurrent_limit": 1},
|
||||||
|
)
|
||||||
|
|
||||||
if litellm.s3_callback_params is not None:
|
if litellm.s3_callback_params is not None:
|
||||||
# read in .env variables - example os.environ/AWS_BUCKET_NAME
|
# read in .env variables - example os.environ/AWS_BUCKET_NAME
|
||||||
|
@ -68,24 +82,83 @@ class S3Logger(CustomLogger):
|
||||||
self.bucket_name = s3_bucket_name
|
self.bucket_name = s3_bucket_name
|
||||||
self.s3_path = s3_path
|
self.s3_path = s3_path
|
||||||
verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}")
|
verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}")
|
||||||
# Create an S3 client with custom endpoint URL
|
self.s3_bucket_name = s3_bucket_name
|
||||||
self.s3_client = boto3.client(
|
self.s3_region_name = s3_region_name
|
||||||
"s3",
|
self.s3_api_version = s3_api_version
|
||||||
region_name=s3_region_name,
|
self.s3_use_ssl = s3_use_ssl
|
||||||
endpoint_url=s3_endpoint_url,
|
self.s3_verify = s3_verify
|
||||||
api_version=s3_api_version,
|
self.s3_endpoint_url = s3_endpoint_url
|
||||||
use_ssl=s3_use_ssl,
|
self.s3_aws_access_key_id = s3_aws_access_key_id
|
||||||
verify=s3_verify,
|
self.s3_aws_secret_access_key = s3_aws_secret_access_key
|
||||||
aws_access_key_id=s3_aws_access_key_id,
|
self.s3_aws_session_token = s3_aws_session_token
|
||||||
aws_secret_access_key=s3_aws_secret_access_key,
|
self.s3_config = s3_config
|
||||||
aws_session_token=s3_aws_session_token,
|
self.init_kwargs = kwargs
|
||||||
config=s3_config,
|
|
||||||
**kwargs,
|
asyncio.create_task(self.periodic_flush())
|
||||||
)
|
self.flush_lock = asyncio.Lock()
|
||||||
|
# Call CustomLogger's __init__
|
||||||
|
CustomBatchLogger.__init__(self, flush_lock=self.flush_lock)
|
||||||
|
|
||||||
|
# Call BaseAWSLLM's __init__
|
||||||
|
BaseAWSLLM.__init__(self)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print_verbose(f"Got exception on init s3 client {str(e)}")
|
print_verbose(f"Got exception on init s3 client {str(e)}")
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
async def upload_data_to_s3(self, data: StandardLoggingPayload):
|
||||||
|
try:
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
import boto3
|
||||||
|
import requests
|
||||||
|
from botocore.auth import SigV4Auth
|
||||||
|
from botocore.awsrequest import AWSRequest
|
||||||
|
from botocore.credentials import Credentials
|
||||||
|
except ImportError:
|
||||||
|
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
|
||||||
|
|
||||||
|
credentials: Credentials = self.get_credentials(
|
||||||
|
aws_access_key_id=self.s3_aws_access_key_id,
|
||||||
|
aws_secret_access_key=self.s3_aws_secret_access_key,
|
||||||
|
aws_session_token=self.s3_aws_session_token,
|
||||||
|
aws_region_name=self.s3_region_name,
|
||||||
|
)
|
||||||
|
object_name = uuid.uuid4().hex
|
||||||
|
# Prepare the URL
|
||||||
|
url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{object_name}"
|
||||||
|
|
||||||
|
# Convert JSON to string
|
||||||
|
json_string = json.dumps(data)
|
||||||
|
|
||||||
|
# Calculate SHA256 hash of the content
|
||||||
|
content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
# Prepare the request
|
||||||
|
headers = {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"x-amz-content-sha256": content_hash,
|
||||||
|
}
|
||||||
|
req = requests.Request("PUT", url, data=json_string, headers=headers)
|
||||||
|
prepped = req.prepare()
|
||||||
|
|
||||||
|
# Sign the request
|
||||||
|
aws_request = AWSRequest(
|
||||||
|
method=prepped.method,
|
||||||
|
url=prepped.url,
|
||||||
|
data=prepped.body,
|
||||||
|
headers=prepped.headers,
|
||||||
|
)
|
||||||
|
SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request)
|
||||||
|
|
||||||
|
# Prepare the signed headers
|
||||||
|
signed_headers = dict(aws_request.headers.items())
|
||||||
|
|
||||||
|
# Make the request
|
||||||
|
asyncio.create_task(
|
||||||
|
self.async_httpx_client.put(url, data=json_string, headers=signed_headers)
|
||||||
|
)
|
||||||
|
|
||||||
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
try:
|
try:
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
|
@ -141,26 +214,31 @@ class S3Logger(CustomLogger):
|
||||||
+ ".json"
|
+ ".json"
|
||||||
)
|
)
|
||||||
|
|
||||||
import json
|
verbose_logger.debug("\ns3 Logger - Logging payload = %s", payload)
|
||||||
|
|
||||||
payload_str = json.dumps(payload)
|
self.log_queue.append(payload)
|
||||||
|
verbose_logger.debug(
|
||||||
print_verbose(f"\ns3 Logger - Logging payload = {payload_str}")
|
"s3 logging: queue length %s, batch size %s",
|
||||||
|
len(self.log_queue),
|
||||||
response = self.s3_client.put_object(
|
self.batch_size,
|
||||||
Bucket=self.bucket_name,
|
|
||||||
Key=s3_object_key,
|
|
||||||
Body=payload_str,
|
|
||||||
ContentType="application/json",
|
|
||||||
ContentLanguage="en",
|
|
||||||
ContentDisposition=f'inline; filename="{s3_object_download_filename}"',
|
|
||||||
CacheControl="private, immutable, max-age=31536000, s-maxage=0",
|
|
||||||
)
|
)
|
||||||
|
if len(self.log_queue) >= self.batch_size:
|
||||||
print_verbose(f"Response from s3:{str(response)}")
|
await self.flush_queue()
|
||||||
|
|
||||||
print_verbose(f"s3 Layer Logging - final response object: {response_obj}")
|
|
||||||
return response
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verbose_logger.exception(f"s3 Layer Error - {str(e)}")
|
verbose_logger.exception(f"s3 Layer Error - {str(e)}")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def async_send_batch(self):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Sends runs from self.log_queue
|
||||||
|
|
||||||
|
Returns: None
|
||||||
|
|
||||||
|
Raises: Does not raise an exception, will only verbose_logger.exception()
|
||||||
|
"""
|
||||||
|
if not self.log_queue:
|
||||||
|
return
|
||||||
|
|
||||||
|
for payload in self.log_queue:
|
||||||
|
asyncio.create_task(self.upload_data_to_s3(payload))
|
||||||
|
|
|
@ -1,16 +1,7 @@
|
||||||
model_list:
|
model_list:
|
||||||
- model_name: db-openai-endpoint
|
- model_name: db-openai-endpoint
|
||||||
litellm_params:
|
litellm_params:
|
||||||
model: openai/gpt-5
|
model: openai/gpt-4
|
||||||
api_key: fake-key
|
api_key: fake-key
|
||||||
api_base: https://exampleopenaiendpoint-production.up.railwaz.app/
|
api_base: https://exampleopenaiendpoint-production.up.railway.app/
|
||||||
- model_name: db-openai-endpoint
|
|
||||||
litellm_params:
|
|
||||||
model: openai/gpt-5
|
|
||||||
api_key: fake-key
|
|
||||||
api_base: https://exampleopenaiendpoint-production.up.railwxaz.app/
|
|
||||||
|
|
||||||
litellm_settings:
|
|
||||||
callbacks: ["prometheus"]
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue