forked from phoenix/litellm-mirror
Revert "(perf) move s3 logging to Batch logging + async [94% faster perf under 100 RPS on 1 litellm instance] (#6165)"
This reverts commit 2a5624af47
.
This commit is contained in:
parent
2a5624af47
commit
91ecb36277
8 changed files with 149 additions and 407 deletions
|
@ -53,7 +53,6 @@ _custom_logger_compatible_callbacks_literal = Literal[
|
|||
"arize",
|
||||
"langtrace",
|
||||
"gcs_bucket",
|
||||
"s3",
|
||||
"opik",
|
||||
]
|
||||
_known_custom_logger_compatible_callbacks: List = list(
|
||||
|
|
|
@ -21,7 +21,6 @@ class CustomBatchLogger(CustomLogger):
|
|||
self,
|
||||
flush_lock: Optional[asyncio.Lock] = None,
|
||||
batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
|
||||
flush_interval: Optional[int] = DEFAULT_FLUSH_INTERVAL_SECONDS,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
|
@ -29,7 +28,7 @@ class CustomBatchLogger(CustomLogger):
|
|||
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
|
||||
"""
|
||||
self.log_queue: List = []
|
||||
self.flush_interval = flush_interval or DEFAULT_FLUSH_INTERVAL_SECONDS
|
||||
self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds
|
||||
self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE
|
||||
self.last_flush_time = time.time()
|
||||
self.flush_lock = flush_lock
|
||||
|
|
|
@ -1,67 +1,43 @@
|
|||
"""
|
||||
s3 Bucket Logging Integration
|
||||
#### What this does ####
|
||||
# On success + failure, log events to Supabase
|
||||
|
||||
async_log_success_event: Processes the event, stores it in memory for 10 seconds or until MAX_BATCH_SIZE and then flushes to s3
|
||||
|
||||
NOTE 1: S3 does not provide a BATCH PUT API endpoint, so we create tasks to upload each element individually
|
||||
NOTE 2: We create a httpx client with a concurrent limit of 1 to upload to s3. Files should get uploaded BUT they should not impact latency of LLM calling logic
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
import datetime
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import traceback
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
import litellm
|
||||
from litellm._logging import print_verbose, verbose_logger
|
||||
from litellm.llms.base_aws_llm import BaseAWSLLM
|
||||
from litellm.llms.custom_httpx.http_handler import (
|
||||
_get_httpx_client,
|
||||
get_async_httpx_client,
|
||||
httpxSpecialProvider,
|
||||
)
|
||||
from litellm.types.integrations.s3 import s3BatchLoggingElement
|
||||
from litellm.types.utils import StandardLoggingPayload
|
||||
|
||||
from .custom_batch_logger import CustomBatchLogger
|
||||
|
||||
# Default Flush interval and batch size for s3
|
||||
# Flush to s3 every 10 seconds OR every 1K requests in memory
|
||||
DEFAULT_S3_FLUSH_INTERVAL_SECONDS = 10
|
||||
DEFAULT_S3_BATCH_SIZE = 1000
|
||||
|
||||
|
||||
class S3Logger(CustomBatchLogger, BaseAWSLLM):
|
||||
class S3Logger:
|
||||
# Class variables or attributes
|
||||
def __init__(
|
||||
self,
|
||||
s3_bucket_name: Optional[str] = None,
|
||||
s3_path: Optional[str] = None,
|
||||
s3_region_name: Optional[str] = None,
|
||||
s3_api_version: Optional[str] = None,
|
||||
s3_use_ssl: bool = True,
|
||||
s3_verify: Optional[bool] = None,
|
||||
s3_endpoint_url: Optional[str] = None,
|
||||
s3_aws_access_key_id: Optional[str] = None,
|
||||
s3_aws_secret_access_key: Optional[str] = None,
|
||||
s3_aws_session_token: Optional[str] = None,
|
||||
s3_flush_interval: Optional[int] = DEFAULT_S3_FLUSH_INTERVAL_SECONDS,
|
||||
s3_batch_size: Optional[int] = DEFAULT_S3_BATCH_SIZE,
|
||||
s3_bucket_name=None,
|
||||
s3_path=None,
|
||||
s3_region_name=None,
|
||||
s3_api_version=None,
|
||||
s3_use_ssl=True,
|
||||
s3_verify=None,
|
||||
s3_endpoint_url=None,
|
||||
s3_aws_access_key_id=None,
|
||||
s3_aws_secret_access_key=None,
|
||||
s3_aws_session_token=None,
|
||||
s3_config=None,
|
||||
**kwargs,
|
||||
):
|
||||
import boto3
|
||||
|
||||
try:
|
||||
verbose_logger.debug(
|
||||
f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}"
|
||||
)
|
||||
|
||||
# IMPORTANT: We use a concurrent limit of 1 to upload to s3
|
||||
# Files should get uploaded BUT they should not impact latency of LLM calling logic
|
||||
self.async_httpx_client = get_async_httpx_client(
|
||||
llm_provider=httpxSpecialProvider.LoggingCallback,
|
||||
params={"concurrent_limit": 1},
|
||||
)
|
||||
|
||||
if litellm.s3_callback_params is not None:
|
||||
# read in .env variables - example os.environ/AWS_BUCKET_NAME
|
||||
for key, value in litellm.s3_callback_params.items():
|
||||
|
@ -87,282 +63,107 @@ class S3Logger(CustomBatchLogger, BaseAWSLLM):
|
|||
s3_path = litellm.s3_callback_params.get("s3_path")
|
||||
# done reading litellm.s3_callback_params
|
||||
|
||||
s3_flush_interval = litellm.s3_callback_params.get(
|
||||
"s3_flush_interval", DEFAULT_S3_FLUSH_INTERVAL_SECONDS
|
||||
)
|
||||
s3_batch_size = litellm.s3_callback_params.get(
|
||||
"s3_batch_size", DEFAULT_S3_BATCH_SIZE
|
||||
)
|
||||
|
||||
self.bucket_name = s3_bucket_name
|
||||
self.s3_path = s3_path
|
||||
verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}")
|
||||
self.s3_bucket_name = s3_bucket_name
|
||||
self.s3_region_name = s3_region_name
|
||||
self.s3_api_version = s3_api_version
|
||||
self.s3_use_ssl = s3_use_ssl
|
||||
self.s3_verify = s3_verify
|
||||
self.s3_endpoint_url = s3_endpoint_url
|
||||
self.s3_aws_access_key_id = s3_aws_access_key_id
|
||||
self.s3_aws_secret_access_key = s3_aws_secret_access_key
|
||||
self.s3_aws_session_token = s3_aws_session_token
|
||||
self.s3_config = s3_config
|
||||
self.init_kwargs = kwargs
|
||||
|
||||
asyncio.create_task(self.periodic_flush())
|
||||
self.flush_lock = asyncio.Lock()
|
||||
|
||||
verbose_logger.debug(
|
||||
f"s3 flush interval: {s3_flush_interval}, s3 batch size: {s3_batch_size}"
|
||||
# Create an S3 client with custom endpoint URL
|
||||
self.s3_client = boto3.client(
|
||||
"s3",
|
||||
region_name=s3_region_name,
|
||||
endpoint_url=s3_endpoint_url,
|
||||
api_version=s3_api_version,
|
||||
use_ssl=s3_use_ssl,
|
||||
verify=s3_verify,
|
||||
aws_access_key_id=s3_aws_access_key_id,
|
||||
aws_secret_access_key=s3_aws_secret_access_key,
|
||||
aws_session_token=s3_aws_session_token,
|
||||
config=s3_config,
|
||||
**kwargs,
|
||||
)
|
||||
# Call CustomLogger's __init__
|
||||
CustomBatchLogger.__init__(
|
||||
self,
|
||||
flush_lock=self.flush_lock,
|
||||
flush_interval=s3_flush_interval,
|
||||
batch_size=s3_batch_size,
|
||||
)
|
||||
self.log_queue: List[s3BatchLoggingElement] = []
|
||||
|
||||
# Call BaseAWSLLM's __init__
|
||||
BaseAWSLLM.__init__(self)
|
||||
|
||||
except Exception as e:
|
||||
print_verbose(f"Got exception on init s3 client {str(e)}")
|
||||
raise e
|
||||
|
||||
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||
async def _async_log_event(
|
||||
self, kwargs, response_obj, start_time, end_time, print_verbose
|
||||
):
|
||||
self.log_event(kwargs, response_obj, start_time, end_time, print_verbose)
|
||||
|
||||
def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose):
|
||||
try:
|
||||
verbose_logger.debug(
|
||||
f"s3 Logging - Enters logging function for model {kwargs}"
|
||||
)
|
||||
|
||||
s3_batch_logging_element = self.create_s3_batch_logging_element(
|
||||
start_time=start_time,
|
||||
standard_logging_payload=kwargs.get("standard_logging_object", None),
|
||||
s3_path=self.s3_path,
|
||||
# construct payload to send to s3
|
||||
# follows the same params as langfuse.py
|
||||
litellm_params = kwargs.get("litellm_params", {})
|
||||
metadata = (
|
||||
litellm_params.get("metadata", {}) or {}
|
||||
) # if litellm_params['metadata'] == None
|
||||
|
||||
# Clean Metadata before logging - never log raw metadata
|
||||
# the raw metadata can contain circular references which leads to infinite recursion
|
||||
# we clean out all extra litellm metadata params before logging
|
||||
clean_metadata = {}
|
||||
if isinstance(metadata, dict):
|
||||
for key, value in metadata.items():
|
||||
# clean litellm metadata before logging
|
||||
if key in [
|
||||
"headers",
|
||||
"endpoint",
|
||||
"caching_groups",
|
||||
"previous_models",
|
||||
]:
|
||||
continue
|
||||
else:
|
||||
clean_metadata[key] = value
|
||||
|
||||
# Ensure everything in the payload is converted to str
|
||||
payload: Optional[StandardLoggingPayload] = kwargs.get(
|
||||
"standard_logging_object", None
|
||||
)
|
||||
|
||||
if s3_batch_logging_element is None:
|
||||
raise ValueError("s3_batch_logging_element is None")
|
||||
if payload is None:
|
||||
return
|
||||
|
||||
verbose_logger.debug(
|
||||
"\ns3 Logger - Logging payload = %s", s3_batch_logging_element
|
||||
s3_file_name = litellm.utils.get_logging_id(start_time, payload) or ""
|
||||
s3_object_key = (
|
||||
(self.s3_path.rstrip("/") + "/" if self.s3_path else "")
|
||||
+ start_time.strftime("%Y-%m-%d")
|
||||
+ "/"
|
||||
+ s3_file_name
|
||||
) # we need the s3 key to include the time, so we log cache hits too
|
||||
s3_object_key += ".json"
|
||||
|
||||
s3_object_download_filename = (
|
||||
"time-"
|
||||
+ start_time.strftime("%Y-%m-%dT%H-%M-%S-%f")
|
||||
+ "_"
|
||||
+ payload["id"]
|
||||
+ ".json"
|
||||
)
|
||||
|
||||
self.log_queue.append(s3_batch_logging_element)
|
||||
verbose_logger.debug(
|
||||
"s3 logging: queue length %s, batch size %s",
|
||||
len(self.log_queue),
|
||||
self.batch_size,
|
||||
import json
|
||||
|
||||
payload_str = json.dumps(payload)
|
||||
|
||||
print_verbose(f"\ns3 Logger - Logging payload = {payload_str}")
|
||||
|
||||
response = self.s3_client.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=s3_object_key,
|
||||
Body=payload_str,
|
||||
ContentType="application/json",
|
||||
ContentLanguage="en",
|
||||
ContentDisposition=f'inline; filename="{s3_object_download_filename}"',
|
||||
CacheControl="private, immutable, max-age=31536000, s-maxage=0",
|
||||
)
|
||||
if len(self.log_queue) >= self.batch_size:
|
||||
await self.flush_queue()
|
||||
|
||||
print_verbose(f"Response from s3:{str(response)}")
|
||||
|
||||
print_verbose(f"s3 Layer Logging - final response object: {response_obj}")
|
||||
return response
|
||||
except Exception as e:
|
||||
verbose_logger.exception(f"s3 Layer Error - {str(e)}")
|
||||
pass
|
||||
|
||||
def log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||
"""
|
||||
Synchronous logging function to log to s3
|
||||
|
||||
Does not batch logging requests, instantly logs on s3 Bucket
|
||||
"""
|
||||
try:
|
||||
s3_batch_logging_element = self.create_s3_batch_logging_element(
|
||||
start_time=start_time,
|
||||
standard_logging_payload=kwargs.get("standard_logging_object", None),
|
||||
s3_path=self.s3_path,
|
||||
)
|
||||
|
||||
if s3_batch_logging_element is None:
|
||||
raise ValueError("s3_batch_logging_element is None")
|
||||
|
||||
verbose_logger.debug(
|
||||
"\ns3 Logger - Logging payload = %s", s3_batch_logging_element
|
||||
)
|
||||
|
||||
# log the element sync httpx client
|
||||
self.upload_data_to_s3(s3_batch_logging_element)
|
||||
except Exception as e:
|
||||
verbose_logger.exception(f"s3 Layer Error - {str(e)}")
|
||||
pass
|
||||
|
||||
async def async_upload_data_to_s3(
|
||||
self, batch_logging_element: s3BatchLoggingElement
|
||||
):
|
||||
try:
|
||||
import hashlib
|
||||
|
||||
import boto3
|
||||
import requests
|
||||
from botocore.auth import SigV4Auth
|
||||
from botocore.awsrequest import AWSRequest
|
||||
from botocore.credentials import Credentials
|
||||
except ImportError:
|
||||
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
|
||||
try:
|
||||
credentials: Credentials = self.get_credentials(
|
||||
aws_access_key_id=self.s3_aws_access_key_id,
|
||||
aws_secret_access_key=self.s3_aws_secret_access_key,
|
||||
aws_session_token=self.s3_aws_session_token,
|
||||
aws_region_name=self.s3_region_name,
|
||||
)
|
||||
|
||||
# Prepare the URL
|
||||
url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{batch_logging_element.s3_object_key}"
|
||||
|
||||
if self.s3_endpoint_url:
|
||||
url = self.s3_endpoint_url + "/" + batch_logging_element.s3_object_key
|
||||
|
||||
# Convert JSON to string
|
||||
json_string = json.dumps(batch_logging_element.payload)
|
||||
|
||||
# Calculate SHA256 hash of the content
|
||||
content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest()
|
||||
|
||||
# Prepare the request
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"x-amz-content-sha256": content_hash,
|
||||
"Content-Language": "en",
|
||||
"Content-Disposition": f'inline; filename="{batch_logging_element.s3_object_download_filename}"',
|
||||
"Cache-Control": "private, immutable, max-age=31536000, s-maxage=0",
|
||||
}
|
||||
req = requests.Request("PUT", url, data=json_string, headers=headers)
|
||||
prepped = req.prepare()
|
||||
|
||||
# Sign the request
|
||||
aws_request = AWSRequest(
|
||||
method=prepped.method,
|
||||
url=prepped.url,
|
||||
data=prepped.body,
|
||||
headers=prepped.headers,
|
||||
)
|
||||
SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request)
|
||||
|
||||
# Prepare the signed headers
|
||||
signed_headers = dict(aws_request.headers.items())
|
||||
|
||||
# Make the request
|
||||
response = await self.async_httpx_client.put(
|
||||
url, data=json_string, headers=signed_headers
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
verbose_logger.exception(f"Error uploading to s3: {str(e)}")
|
||||
|
||||
async def async_send_batch(self):
|
||||
"""
|
||||
|
||||
Sends runs from self.log_queue
|
||||
|
||||
Returns: None
|
||||
|
||||
Raises: Does not raise an exception, will only verbose_logger.exception()
|
||||
"""
|
||||
if not self.log_queue:
|
||||
return
|
||||
|
||||
for payload in self.log_queue:
|
||||
asyncio.create_task(self.async_upload_data_to_s3(payload))
|
||||
|
||||
def create_s3_batch_logging_element(
|
||||
self,
|
||||
start_time: datetime,
|
||||
standard_logging_payload: Optional[StandardLoggingPayload],
|
||||
s3_path: Optional[str],
|
||||
) -> Optional[s3BatchLoggingElement]:
|
||||
"""
|
||||
Helper function to create an s3BatchLoggingElement.
|
||||
|
||||
Args:
|
||||
start_time (datetime): The start time of the logging event.
|
||||
standard_logging_payload (Optional[StandardLoggingPayload]): The payload to be logged.
|
||||
s3_path (Optional[str]): The S3 path prefix.
|
||||
|
||||
Returns:
|
||||
Optional[s3BatchLoggingElement]: The created s3BatchLoggingElement, or None if payload is None.
|
||||
"""
|
||||
if standard_logging_payload is None:
|
||||
return None
|
||||
|
||||
s3_file_name = (
|
||||
litellm.utils.get_logging_id(start_time, standard_logging_payload) or ""
|
||||
)
|
||||
s3_object_key = (
|
||||
(s3_path.rstrip("/") + "/" if s3_path else "")
|
||||
+ start_time.strftime("%Y-%m-%d")
|
||||
+ "/"
|
||||
+ s3_file_name
|
||||
+ ".json"
|
||||
)
|
||||
|
||||
s3_object_download_filename = f"time-{start_time.strftime('%Y-%m-%dT%H-%M-%S-%f')}_{standard_logging_payload['id']}.json"
|
||||
|
||||
return s3BatchLoggingElement(
|
||||
payload=standard_logging_payload, # type: ignore
|
||||
s3_object_key=s3_object_key,
|
||||
s3_object_download_filename=s3_object_download_filename,
|
||||
)
|
||||
|
||||
def upload_data_to_s3(self, batch_logging_element: s3BatchLoggingElement):
|
||||
try:
|
||||
import hashlib
|
||||
|
||||
import boto3
|
||||
import requests
|
||||
from botocore.auth import SigV4Auth
|
||||
from botocore.awsrequest import AWSRequest
|
||||
from botocore.credentials import Credentials
|
||||
except ImportError:
|
||||
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
|
||||
try:
|
||||
credentials: Credentials = self.get_credentials(
|
||||
aws_access_key_id=self.s3_aws_access_key_id,
|
||||
aws_secret_access_key=self.s3_aws_secret_access_key,
|
||||
aws_session_token=self.s3_aws_session_token,
|
||||
aws_region_name=self.s3_region_name,
|
||||
)
|
||||
|
||||
# Prepare the URL
|
||||
url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{batch_logging_element.s3_object_key}"
|
||||
|
||||
if self.s3_endpoint_url:
|
||||
url = self.s3_endpoint_url + "/" + batch_logging_element.s3_object_key
|
||||
|
||||
# Convert JSON to string
|
||||
json_string = json.dumps(batch_logging_element.payload)
|
||||
|
||||
# Calculate SHA256 hash of the content
|
||||
content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest()
|
||||
|
||||
# Prepare the request
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"x-amz-content-sha256": content_hash,
|
||||
"Content-Language": "en",
|
||||
"Content-Disposition": f'inline; filename="{batch_logging_element.s3_object_download_filename}"',
|
||||
"Cache-Control": "private, immutable, max-age=31536000, s-maxage=0",
|
||||
}
|
||||
req = requests.Request("PUT", url, data=json_string, headers=headers)
|
||||
prepped = req.prepare()
|
||||
|
||||
# Sign the request
|
||||
aws_request = AWSRequest(
|
||||
method=prepped.method,
|
||||
url=prepped.url,
|
||||
data=prepped.body,
|
||||
headers=prepped.headers,
|
||||
)
|
||||
SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request)
|
||||
|
||||
# Prepare the signed headers
|
||||
signed_headers = dict(aws_request.headers.items())
|
||||
|
||||
httpx_client = _get_httpx_client()
|
||||
# Make the request
|
||||
response = httpx_client.put(url, data=json_string, headers=signed_headers)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
verbose_logger.exception(f"Error uploading to s3: {str(e)}")
|
||||
|
|
|
@ -116,6 +116,7 @@ lagoLogger = None
|
|||
dataDogLogger = None
|
||||
prometheusLogger = None
|
||||
dynamoLogger = None
|
||||
s3Logger = None
|
||||
genericAPILogger = None
|
||||
clickHouseLogger = None
|
||||
greenscaleLogger = None
|
||||
|
@ -1345,6 +1346,36 @@ class Logging:
|
|||
user_id=kwargs.get("user", None),
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
if callback == "s3":
|
||||
global s3Logger
|
||||
if s3Logger is None:
|
||||
s3Logger = S3Logger()
|
||||
if self.stream:
|
||||
if "complete_streaming_response" in self.model_call_details:
|
||||
print_verbose(
|
||||
"S3Logger Logger: Got Stream Event - Completed Stream Response"
|
||||
)
|
||||
s3Logger.log_event(
|
||||
kwargs=self.model_call_details,
|
||||
response_obj=self.model_call_details[
|
||||
"complete_streaming_response"
|
||||
],
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
else:
|
||||
print_verbose(
|
||||
"S3Logger Logger: Got Stream Event - No complete stream response as yet"
|
||||
)
|
||||
else:
|
||||
s3Logger.log_event(
|
||||
kwargs=self.model_call_details,
|
||||
response_obj=result,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
if (
|
||||
callback == "openmeter"
|
||||
and self.model_call_details.get("litellm_params", {}).get(
|
||||
|
@ -2214,7 +2245,7 @@ def set_callbacks(callback_list, function_id=None):
|
|||
"""
|
||||
Globally sets the callback client
|
||||
"""
|
||||
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
|
||||
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
|
||||
|
||||
try:
|
||||
for callback in callback_list:
|
||||
|
@ -2288,6 +2319,8 @@ def set_callbacks(callback_list, function_id=None):
|
|||
dataDogLogger = DataDogLogger()
|
||||
elif callback == "dynamodb":
|
||||
dynamoLogger = DyanmoDBLogger()
|
||||
elif callback == "s3":
|
||||
s3Logger = S3Logger()
|
||||
elif callback == "wandb":
|
||||
weightsBiasesLogger = WeightsBiasesLogger()
|
||||
elif callback == "logfire":
|
||||
|
@ -2390,14 +2423,6 @@ def _init_custom_logger_compatible_class(
|
|||
_datadog_logger = DataDogLogger()
|
||||
_in_memory_loggers.append(_datadog_logger)
|
||||
return _datadog_logger # type: ignore
|
||||
elif logging_integration == "s3":
|
||||
for callback in _in_memory_loggers:
|
||||
if isinstance(callback, S3Logger):
|
||||
return callback # type: ignore
|
||||
|
||||
_s3_logger = S3Logger()
|
||||
_in_memory_loggers.append(_s3_logger)
|
||||
return _s3_logger # type: ignore
|
||||
elif logging_integration == "gcs_bucket":
|
||||
for callback in _in_memory_loggers:
|
||||
if isinstance(callback, GCSBucketLogger):
|
||||
|
@ -2564,10 +2589,6 @@ def get_custom_logger_compatible_class(
|
|||
for callback in _in_memory_loggers:
|
||||
if isinstance(callback, PrometheusLogger):
|
||||
return callback
|
||||
elif logging_integration == "s3":
|
||||
for callback in _in_memory_loggers:
|
||||
if isinstance(callback, S3Logger):
|
||||
return callback
|
||||
elif logging_integration == "datadog":
|
||||
for callback in _in_memory_loggers:
|
||||
if isinstance(callback, DataDogLogger):
|
||||
|
|
|
@ -1,17 +1,16 @@
|
|||
model_list:
|
||||
- model_name: db-openai-endpoint
|
||||
litellm_params:
|
||||
model: openai/gpt-4
|
||||
model: openai/gpt-5
|
||||
api_key: fake-key
|
||||
api_base: https://exampleopenaiendpoint-production.up.railway.app/
|
||||
api_base: https://exampleopenaiendpoint-production.up.railwaz.app/
|
||||
- model_name: db-openai-endpoint
|
||||
litellm_params:
|
||||
model: openai/gpt-5
|
||||
api_key: fake-key
|
||||
api_base: https://exampleopenaiendpoint-production.up.railwxaz.app/
|
||||
|
||||
litellm_settings:
|
||||
success_callback: ["s3"]
|
||||
turn_off_message_logging: true
|
||||
s3_callback_params:
|
||||
s3_bucket_name: load-testing-oct # AWS Bucket Name for S3
|
||||
s3_region_name: us-west-2 # AWS Region Name for S3
|
||||
s3_aws_access_key_id: os.environ/AWS_ACCESS_KEY_ID # us os.environ/<variable name> to pass environment variables. This is AWS Access Key ID for S3
|
||||
s3_aws_secret_access_key: os.environ/AWS_SECRET_ACCESS_KEY # AWS Secret Access Key for S3
|
||||
callbacks: ["prometheus"]
|
||||
|
||||
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
from typing import Dict
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class s3BatchLoggingElement(BaseModel):
|
||||
"""
|
||||
Type of element stored in self.log_queue in S3Logger
|
||||
|
||||
"""
|
||||
|
||||
payload: Dict
|
||||
s3_object_key: str
|
||||
s3_object_download_filename: str
|
|
@ -197,6 +197,7 @@ lagoLogger = None
|
|||
dataDogLogger = None
|
||||
prometheusLogger = None
|
||||
dynamoLogger = None
|
||||
s3Logger = None
|
||||
genericAPILogger = None
|
||||
clickHouseLogger = None
|
||||
greenscaleLogger = None
|
||||
|
@ -1798,9 +1799,8 @@ def calculate_tiles_needed(
|
|||
total_tiles = tiles_across * tiles_down
|
||||
return total_tiles
|
||||
|
||||
|
||||
def get_image_type(image_data: bytes) -> Union[str, None]:
|
||||
"""take an image (really only the first ~100 bytes max are needed)
|
||||
""" take an image (really only the first ~100 bytes max are needed)
|
||||
and return 'png' 'gif' 'jpeg' 'heic' or None. method added to
|
||||
allow deprecation of imghdr in 3.13"""
|
||||
|
||||
|
|
|
@ -12,70 +12,7 @@ import litellm
|
|||
litellm.num_retries = 3
|
||||
|
||||
import time, random
|
||||
from litellm._logging import verbose_logger
|
||||
import logging
|
||||
import pytest
|
||||
import boto3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("sync_mode", [True, False])
|
||||
async def test_basic_s3_logging(sync_mode):
|
||||
verbose_logger.setLevel(level=logging.DEBUG)
|
||||
litellm.success_callback = ["s3"]
|
||||
litellm.s3_callback_params = {
|
||||
"s3_bucket_name": "load-testing-oct",
|
||||
"s3_aws_secret_access_key": "os.environ/AWS_SECRET_ACCESS_KEY",
|
||||
"s3_aws_access_key_id": "os.environ/AWS_ACCESS_KEY_ID",
|
||||
"s3_region_name": "us-west-2",
|
||||
}
|
||||
litellm.set_verbose = True
|
||||
|
||||
if sync_mode is True:
|
||||
response = litellm.completion(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[{"role": "user", "content": "This is a test"}],
|
||||
mock_response="It's simple to use and easy to get started",
|
||||
)
|
||||
else:
|
||||
response = await litellm.acompletion(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[{"role": "user", "content": "This is a test"}],
|
||||
mock_response="It's simple to use and easy to get started",
|
||||
)
|
||||
print(f"response: {response}")
|
||||
|
||||
await asyncio.sleep(12)
|
||||
|
||||
total_objects, all_s3_keys = list_all_s3_objects("load-testing-oct")
|
||||
|
||||
# assert that atlest one key has response.id in it
|
||||
assert any(response.id in key for key in all_s3_keys)
|
||||
s3 = boto3.client("s3")
|
||||
# delete all objects
|
||||
for key in all_s3_keys:
|
||||
s3.delete_object(Bucket="load-testing-oct", Key=key)
|
||||
|
||||
|
||||
def list_all_s3_objects(bucket_name):
|
||||
s3 = boto3.client("s3")
|
||||
|
||||
all_s3_keys = []
|
||||
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
total_objects = 0
|
||||
|
||||
for page in paginator.paginate(Bucket=bucket_name):
|
||||
if "Contents" in page:
|
||||
total_objects += len(page["Contents"])
|
||||
all_s3_keys.extend([obj["Key"] for obj in page["Contents"]])
|
||||
|
||||
print(f"Total number of objects in {bucket_name}: {total_objects}")
|
||||
print(all_s3_keys)
|
||||
return total_objects, all_s3_keys
|
||||
|
||||
|
||||
list_all_s3_objects("load-testing-oct")
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="AWS Suspended Account")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue