From 2a5624af471284f174e084142504d950ede2567d Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Fri, 11 Oct 2024 19:49:03 +0530 Subject: [PATCH] (perf) move s3 logging to Batch logging + async [94% faster perf under 100 RPS on 1 litellm instance] (#6165) * fix move s3 to use customLogger * add basic s3 logging test * add s3 to custom logger compatible * use batch logger for s3 * s3 set flush interval and batch size * fix s3 logging * add notes on s3 logging * fix s3 logging * add basic s3 logging test * fix s3 type errors * add test for sync logging on s3 --- litellm/__init__.py | 1 + litellm/integrations/custom_batch_logger.py | 3 +- litellm/integrations/s3.py | 407 +++++++++++++----- litellm/litellm_core_utils/litellm_logging.py | 47 +- litellm/proxy/proxy_config.yaml | 17 +- litellm/types/integrations/s3.py | 14 + litellm/utils.py | 4 +- tests/local_testing/test_amazing_s3_logs.py | 63 +++ 8 files changed, 407 insertions(+), 149 deletions(-) create mode 100644 litellm/types/integrations/s3.py diff --git a/litellm/__init__.py b/litellm/__init__.py index 9379caac1..5e6cb4c05 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -53,6 +53,7 @@ _custom_logger_compatible_callbacks_literal = Literal[ "arize", "langtrace", "gcs_bucket", + "s3", "opik", ] _known_custom_logger_compatible_callbacks: List = list( diff --git a/litellm/integrations/custom_batch_logger.py b/litellm/integrations/custom_batch_logger.py index aa7f0bba2..7ef63d25c 100644 --- a/litellm/integrations/custom_batch_logger.py +++ b/litellm/integrations/custom_batch_logger.py @@ -21,6 +21,7 @@ class CustomBatchLogger(CustomLogger): self, flush_lock: Optional[asyncio.Lock] = None, batch_size: Optional[int] = DEFAULT_BATCH_SIZE, + flush_interval: Optional[int] = DEFAULT_FLUSH_INTERVAL_SECONDS, **kwargs, ) -> None: """ @@ -28,7 +29,7 @@ class CustomBatchLogger(CustomLogger): flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching """ self.log_queue: List = [] - self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds + self.flush_interval = flush_interval or DEFAULT_FLUSH_INTERVAL_SECONDS self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE self.last_flush_time = time.time() self.flush_lock = flush_lock diff --git a/litellm/integrations/s3.py b/litellm/integrations/s3.py index 1f82406e1..397b53cd5 100644 --- a/litellm/integrations/s3.py +++ b/litellm/integrations/s3.py @@ -1,43 +1,67 @@ -#### What this does #### -# On success + failure, log events to Supabase +""" +s3 Bucket Logging Integration -import datetime -import os -import subprocess -import sys -import traceback -import uuid -from typing import Optional +async_log_success_event: Processes the event, stores it in memory for 10 seconds or until MAX_BATCH_SIZE and then flushes to s3 + +NOTE 1: S3 does not provide a BATCH PUT API endpoint, so we create tasks to upload each element individually +NOTE 2: We create a httpx client with a concurrent limit of 1 to upload to s3. Files should get uploaded BUT they should not impact latency of LLM calling logic +""" + +import asyncio +import json +from datetime import datetime +from typing import Dict, List, Optional import litellm from litellm._logging import print_verbose, verbose_logger +from litellm.llms.base_aws_llm import BaseAWSLLM +from litellm.llms.custom_httpx.http_handler import ( + _get_httpx_client, + get_async_httpx_client, + httpxSpecialProvider, +) +from litellm.types.integrations.s3 import s3BatchLoggingElement from litellm.types.utils import StandardLoggingPayload +from .custom_batch_logger import CustomBatchLogger -class S3Logger: +# Default Flush interval and batch size for s3 +# Flush to s3 every 10 seconds OR every 1K requests in memory +DEFAULT_S3_FLUSH_INTERVAL_SECONDS = 10 +DEFAULT_S3_BATCH_SIZE = 1000 + + +class S3Logger(CustomBatchLogger, BaseAWSLLM): # Class variables or attributes def __init__( self, - s3_bucket_name=None, - s3_path=None, - s3_region_name=None, - s3_api_version=None, - s3_use_ssl=True, - s3_verify=None, - s3_endpoint_url=None, - s3_aws_access_key_id=None, - s3_aws_secret_access_key=None, - s3_aws_session_token=None, + s3_bucket_name: Optional[str] = None, + s3_path: Optional[str] = None, + s3_region_name: Optional[str] = None, + s3_api_version: Optional[str] = None, + s3_use_ssl: bool = True, + s3_verify: Optional[bool] = None, + s3_endpoint_url: Optional[str] = None, + s3_aws_access_key_id: Optional[str] = None, + s3_aws_secret_access_key: Optional[str] = None, + s3_aws_session_token: Optional[str] = None, + s3_flush_interval: Optional[int] = DEFAULT_S3_FLUSH_INTERVAL_SECONDS, + s3_batch_size: Optional[int] = DEFAULT_S3_BATCH_SIZE, s3_config=None, **kwargs, ): - import boto3 - try: verbose_logger.debug( f"in init s3 logger - s3_callback_params {litellm.s3_callback_params}" ) + # IMPORTANT: We use a concurrent limit of 1 to upload to s3 + # Files should get uploaded BUT they should not impact latency of LLM calling logic + self.async_httpx_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback, + params={"concurrent_limit": 1}, + ) + if litellm.s3_callback_params is not None: # read in .env variables - example os.environ/AWS_BUCKET_NAME for key, value in litellm.s3_callback_params.items(): @@ -63,107 +87,282 @@ class S3Logger: s3_path = litellm.s3_callback_params.get("s3_path") # done reading litellm.s3_callback_params + s3_flush_interval = litellm.s3_callback_params.get( + "s3_flush_interval", DEFAULT_S3_FLUSH_INTERVAL_SECONDS + ) + s3_batch_size = litellm.s3_callback_params.get( + "s3_batch_size", DEFAULT_S3_BATCH_SIZE + ) + self.bucket_name = s3_bucket_name self.s3_path = s3_path verbose_logger.debug(f"s3 logger using endpoint url {s3_endpoint_url}") - # Create an S3 client with custom endpoint URL - self.s3_client = boto3.client( - "s3", - region_name=s3_region_name, - endpoint_url=s3_endpoint_url, - api_version=s3_api_version, - use_ssl=s3_use_ssl, - verify=s3_verify, - aws_access_key_id=s3_aws_access_key_id, - aws_secret_access_key=s3_aws_secret_access_key, - aws_session_token=s3_aws_session_token, - config=s3_config, - **kwargs, + self.s3_bucket_name = s3_bucket_name + self.s3_region_name = s3_region_name + self.s3_api_version = s3_api_version + self.s3_use_ssl = s3_use_ssl + self.s3_verify = s3_verify + self.s3_endpoint_url = s3_endpoint_url + self.s3_aws_access_key_id = s3_aws_access_key_id + self.s3_aws_secret_access_key = s3_aws_secret_access_key + self.s3_aws_session_token = s3_aws_session_token + self.s3_config = s3_config + self.init_kwargs = kwargs + + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + + verbose_logger.debug( + f"s3 flush interval: {s3_flush_interval}, s3 batch size: {s3_batch_size}" ) + # Call CustomLogger's __init__ + CustomBatchLogger.__init__( + self, + flush_lock=self.flush_lock, + flush_interval=s3_flush_interval, + batch_size=s3_batch_size, + ) + self.log_queue: List[s3BatchLoggingElement] = [] + + # Call BaseAWSLLM's __init__ + BaseAWSLLM.__init__(self) + except Exception as e: print_verbose(f"Got exception on init s3 client {str(e)}") raise e - async def _async_log_event( - self, kwargs, response_obj, start_time, end_time, print_verbose - ): - self.log_event(kwargs, response_obj, start_time, end_time, print_verbose) - - def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose): + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): try: verbose_logger.debug( f"s3 Logging - Enters logging function for model {kwargs}" ) - # construct payload to send to s3 - # follows the same params as langfuse.py - litellm_params = kwargs.get("litellm_params", {}) - metadata = ( - litellm_params.get("metadata", {}) or {} - ) # if litellm_params['metadata'] == None - - # Clean Metadata before logging - never log raw metadata - # the raw metadata can contain circular references which leads to infinite recursion - # we clean out all extra litellm metadata params before logging - clean_metadata = {} - if isinstance(metadata, dict): - for key, value in metadata.items(): - # clean litellm metadata before logging - if key in [ - "headers", - "endpoint", - "caching_groups", - "previous_models", - ]: - continue - else: - clean_metadata[key] = value - - # Ensure everything in the payload is converted to str - payload: Optional[StandardLoggingPayload] = kwargs.get( - "standard_logging_object", None + s3_batch_logging_element = self.create_s3_batch_logging_element( + start_time=start_time, + standard_logging_payload=kwargs.get("standard_logging_object", None), + s3_path=self.s3_path, ) - if payload is None: - return + if s3_batch_logging_element is None: + raise ValueError("s3_batch_logging_element is None") - s3_file_name = litellm.utils.get_logging_id(start_time, payload) or "" - s3_object_key = ( - (self.s3_path.rstrip("/") + "/" if self.s3_path else "") - + start_time.strftime("%Y-%m-%d") - + "/" - + s3_file_name - ) # we need the s3 key to include the time, so we log cache hits too - s3_object_key += ".json" - - s3_object_download_filename = ( - "time-" - + start_time.strftime("%Y-%m-%dT%H-%M-%S-%f") - + "_" - + payload["id"] - + ".json" + verbose_logger.debug( + "\ns3 Logger - Logging payload = %s", s3_batch_logging_element ) - import json - - payload_str = json.dumps(payload) - - print_verbose(f"\ns3 Logger - Logging payload = {payload_str}") - - response = self.s3_client.put_object( - Bucket=self.bucket_name, - Key=s3_object_key, - Body=payload_str, - ContentType="application/json", - ContentLanguage="en", - ContentDisposition=f'inline; filename="{s3_object_download_filename}"', - CacheControl="private, immutable, max-age=31536000, s-maxage=0", + self.log_queue.append(s3_batch_logging_element) + verbose_logger.debug( + "s3 logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, ) - - print_verbose(f"Response from s3:{str(response)}") - - print_verbose(f"s3 Layer Logging - final response object: {response_obj}") - return response + if len(self.log_queue) >= self.batch_size: + await self.flush_queue() except Exception as e: verbose_logger.exception(f"s3 Layer Error - {str(e)}") pass + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + """ + Synchronous logging function to log to s3 + + Does not batch logging requests, instantly logs on s3 Bucket + """ + try: + s3_batch_logging_element = self.create_s3_batch_logging_element( + start_time=start_time, + standard_logging_payload=kwargs.get("standard_logging_object", None), + s3_path=self.s3_path, + ) + + if s3_batch_logging_element is None: + raise ValueError("s3_batch_logging_element is None") + + verbose_logger.debug( + "\ns3 Logger - Logging payload = %s", s3_batch_logging_element + ) + + # log the element sync httpx client + self.upload_data_to_s3(s3_batch_logging_element) + except Exception as e: + verbose_logger.exception(f"s3 Layer Error - {str(e)}") + pass + + async def async_upload_data_to_s3( + self, batch_logging_element: s3BatchLoggingElement + ): + try: + import hashlib + + import boto3 + import requests + from botocore.auth import SigV4Auth + from botocore.awsrequest import AWSRequest + from botocore.credentials import Credentials + except ImportError: + raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") + try: + credentials: Credentials = self.get_credentials( + aws_access_key_id=self.s3_aws_access_key_id, + aws_secret_access_key=self.s3_aws_secret_access_key, + aws_session_token=self.s3_aws_session_token, + aws_region_name=self.s3_region_name, + ) + + # Prepare the URL + url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{batch_logging_element.s3_object_key}" + + if self.s3_endpoint_url: + url = self.s3_endpoint_url + "/" + batch_logging_element.s3_object_key + + # Convert JSON to string + json_string = json.dumps(batch_logging_element.payload) + + # Calculate SHA256 hash of the content + content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest() + + # Prepare the request + headers = { + "Content-Type": "application/json", + "x-amz-content-sha256": content_hash, + "Content-Language": "en", + "Content-Disposition": f'inline; filename="{batch_logging_element.s3_object_download_filename}"', + "Cache-Control": "private, immutable, max-age=31536000, s-maxage=0", + } + req = requests.Request("PUT", url, data=json_string, headers=headers) + prepped = req.prepare() + + # Sign the request + aws_request = AWSRequest( + method=prepped.method, + url=prepped.url, + data=prepped.body, + headers=prepped.headers, + ) + SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request) + + # Prepare the signed headers + signed_headers = dict(aws_request.headers.items()) + + # Make the request + response = await self.async_httpx_client.put( + url, data=json_string, headers=signed_headers + ) + response.raise_for_status() + except Exception as e: + verbose_logger.exception(f"Error uploading to s3: {str(e)}") + + async def async_send_batch(self): + """ + + Sends runs from self.log_queue + + Returns: None + + Raises: Does not raise an exception, will only verbose_logger.exception() + """ + if not self.log_queue: + return + + for payload in self.log_queue: + asyncio.create_task(self.async_upload_data_to_s3(payload)) + + def create_s3_batch_logging_element( + self, + start_time: datetime, + standard_logging_payload: Optional[StandardLoggingPayload], + s3_path: Optional[str], + ) -> Optional[s3BatchLoggingElement]: + """ + Helper function to create an s3BatchLoggingElement. + + Args: + start_time (datetime): The start time of the logging event. + standard_logging_payload (Optional[StandardLoggingPayload]): The payload to be logged. + s3_path (Optional[str]): The S3 path prefix. + + Returns: + Optional[s3BatchLoggingElement]: The created s3BatchLoggingElement, or None if payload is None. + """ + if standard_logging_payload is None: + return None + + s3_file_name = ( + litellm.utils.get_logging_id(start_time, standard_logging_payload) or "" + ) + s3_object_key = ( + (s3_path.rstrip("/") + "/" if s3_path else "") + + start_time.strftime("%Y-%m-%d") + + "/" + + s3_file_name + + ".json" + ) + + s3_object_download_filename = f"time-{start_time.strftime('%Y-%m-%dT%H-%M-%S-%f')}_{standard_logging_payload['id']}.json" + + return s3BatchLoggingElement( + payload=standard_logging_payload, # type: ignore + s3_object_key=s3_object_key, + s3_object_download_filename=s3_object_download_filename, + ) + + def upload_data_to_s3(self, batch_logging_element: s3BatchLoggingElement): + try: + import hashlib + + import boto3 + import requests + from botocore.auth import SigV4Auth + from botocore.awsrequest import AWSRequest + from botocore.credentials import Credentials + except ImportError: + raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") + try: + credentials: Credentials = self.get_credentials( + aws_access_key_id=self.s3_aws_access_key_id, + aws_secret_access_key=self.s3_aws_secret_access_key, + aws_session_token=self.s3_aws_session_token, + aws_region_name=self.s3_region_name, + ) + + # Prepare the URL + url = f"https://{self.bucket_name}.s3.{self.s3_region_name}.amazonaws.com/{batch_logging_element.s3_object_key}" + + if self.s3_endpoint_url: + url = self.s3_endpoint_url + "/" + batch_logging_element.s3_object_key + + # Convert JSON to string + json_string = json.dumps(batch_logging_element.payload) + + # Calculate SHA256 hash of the content + content_hash = hashlib.sha256(json_string.encode("utf-8")).hexdigest() + + # Prepare the request + headers = { + "Content-Type": "application/json", + "x-amz-content-sha256": content_hash, + "Content-Language": "en", + "Content-Disposition": f'inline; filename="{batch_logging_element.s3_object_download_filename}"', + "Cache-Control": "private, immutable, max-age=31536000, s-maxage=0", + } + req = requests.Request("PUT", url, data=json_string, headers=headers) + prepped = req.prepare() + + # Sign the request + aws_request = AWSRequest( + method=prepped.method, + url=prepped.url, + data=prepped.body, + headers=prepped.headers, + ) + SigV4Auth(credentials, "s3", self.s3_region_name).add_auth(aws_request) + + # Prepare the signed headers + signed_headers = dict(aws_request.headers.items()) + + httpx_client = _get_httpx_client() + # Make the request + response = httpx_client.put(url, data=json_string, headers=signed_headers) + response.raise_for_status() + except Exception as e: + verbose_logger.exception(f"Error uploading to s3: {str(e)}") diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index ce97f1c6f..10e61882b 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -116,7 +116,6 @@ lagoLogger = None dataDogLogger = None prometheusLogger = None dynamoLogger = None -s3Logger = None genericAPILogger = None clickHouseLogger = None greenscaleLogger = None @@ -1346,36 +1345,6 @@ class Logging: user_id=kwargs.get("user", None), print_verbose=print_verbose, ) - if callback == "s3": - global s3Logger - if s3Logger is None: - s3Logger = S3Logger() - if self.stream: - if "complete_streaming_response" in self.model_call_details: - print_verbose( - "S3Logger Logger: Got Stream Event - Completed Stream Response" - ) - s3Logger.log_event( - kwargs=self.model_call_details, - response_obj=self.model_call_details[ - "complete_streaming_response" - ], - start_time=start_time, - end_time=end_time, - print_verbose=print_verbose, - ) - else: - print_verbose( - "S3Logger Logger: Got Stream Event - No complete stream response as yet" - ) - else: - s3Logger.log_event( - kwargs=self.model_call_details, - response_obj=result, - start_time=start_time, - end_time=end_time, - print_verbose=print_verbose, - ) if ( callback == "openmeter" and self.model_call_details.get("litellm_params", {}).get( @@ -2245,7 +2214,7 @@ def set_callbacks(callback_list, function_id=None): """ Globally sets the callback client """ - global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger + global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger try: for callback in callback_list: @@ -2319,8 +2288,6 @@ def set_callbacks(callback_list, function_id=None): dataDogLogger = DataDogLogger() elif callback == "dynamodb": dynamoLogger = DyanmoDBLogger() - elif callback == "s3": - s3Logger = S3Logger() elif callback == "wandb": weightsBiasesLogger = WeightsBiasesLogger() elif callback == "logfire": @@ -2423,6 +2390,14 @@ def _init_custom_logger_compatible_class( _datadog_logger = DataDogLogger() _in_memory_loggers.append(_datadog_logger) return _datadog_logger # type: ignore + elif logging_integration == "s3": + for callback in _in_memory_loggers: + if isinstance(callback, S3Logger): + return callback # type: ignore + + _s3_logger = S3Logger() + _in_memory_loggers.append(_s3_logger) + return _s3_logger # type: ignore elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): @@ -2589,6 +2564,10 @@ def get_custom_logger_compatible_class( for callback in _in_memory_loggers: if isinstance(callback, PrometheusLogger): return callback + elif logging_integration == "s3": + for callback in _in_memory_loggers: + if isinstance(callback, S3Logger): + return callback elif logging_integration == "datadog": for callback in _in_memory_loggers: if isinstance(callback, DataDogLogger): diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index d611aa87b..11ccc8561 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -1,16 +1,17 @@ model_list: - model_name: db-openai-endpoint litellm_params: - model: openai/gpt-5 + model: openai/gpt-4 api_key: fake-key - api_base: https://exampleopenaiendpoint-production.up.railwaz.app/ - - model_name: db-openai-endpoint - litellm_params: - model: openai/gpt-5 - api_key: fake-key - api_base: https://exampleopenaiendpoint-production.up.railwxaz.app/ + api_base: https://exampleopenaiendpoint-production.up.railway.app/ litellm_settings: - callbacks: ["prometheus"] + success_callback: ["s3"] + turn_off_message_logging: true + s3_callback_params: + s3_bucket_name: load-testing-oct # AWS Bucket Name for S3 + s3_region_name: us-west-2 # AWS Region Name for S3 + s3_aws_access_key_id: os.environ/AWS_ACCESS_KEY_ID # us os.environ/ to pass environment variables. This is AWS Access Key ID for S3 + s3_aws_secret_access_key: os.environ/AWS_SECRET_ACCESS_KEY # AWS Secret Access Key for S3 diff --git a/litellm/types/integrations/s3.py b/litellm/types/integrations/s3.py new file mode 100644 index 000000000..d66e2c59d --- /dev/null +++ b/litellm/types/integrations/s3.py @@ -0,0 +1,14 @@ +from typing import Dict + +from pydantic import BaseModel + + +class s3BatchLoggingElement(BaseModel): + """ + Type of element stored in self.log_queue in S3Logger + + """ + + payload: Dict + s3_object_key: str + s3_object_download_filename: str diff --git a/litellm/utils.py b/litellm/utils.py index 5afeab58e..d08cf7138 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -197,7 +197,6 @@ lagoLogger = None dataDogLogger = None prometheusLogger = None dynamoLogger = None -s3Logger = None genericAPILogger = None clickHouseLogger = None greenscaleLogger = None @@ -1799,8 +1798,9 @@ def calculate_tiles_needed( total_tiles = tiles_across * tiles_down return total_tiles + def get_image_type(image_data: bytes) -> Union[str, None]: - """ take an image (really only the first ~100 bytes max are needed) + """take an image (really only the first ~100 bytes max are needed) and return 'png' 'gif' 'jpeg' 'heic' or None. method added to allow deprecation of imghdr in 3.13""" diff --git a/tests/local_testing/test_amazing_s3_logs.py b/tests/local_testing/test_amazing_s3_logs.py index c3e8a61db..5459647c1 100644 --- a/tests/local_testing/test_amazing_s3_logs.py +++ b/tests/local_testing/test_amazing_s3_logs.py @@ -12,7 +12,70 @@ import litellm litellm.num_retries = 3 import time, random +from litellm._logging import verbose_logger +import logging import pytest +import boto3 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync_mode", [True, False]) +async def test_basic_s3_logging(sync_mode): + verbose_logger.setLevel(level=logging.DEBUG) + litellm.success_callback = ["s3"] + litellm.s3_callback_params = { + "s3_bucket_name": "load-testing-oct", + "s3_aws_secret_access_key": "os.environ/AWS_SECRET_ACCESS_KEY", + "s3_aws_access_key_id": "os.environ/AWS_ACCESS_KEY_ID", + "s3_region_name": "us-west-2", + } + litellm.set_verbose = True + + if sync_mode is True: + response = litellm.completion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "This is a test"}], + mock_response="It's simple to use and easy to get started", + ) + else: + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "This is a test"}], + mock_response="It's simple to use and easy to get started", + ) + print(f"response: {response}") + + await asyncio.sleep(12) + + total_objects, all_s3_keys = list_all_s3_objects("load-testing-oct") + + # assert that atlest one key has response.id in it + assert any(response.id in key for key in all_s3_keys) + s3 = boto3.client("s3") + # delete all objects + for key in all_s3_keys: + s3.delete_object(Bucket="load-testing-oct", Key=key) + + +def list_all_s3_objects(bucket_name): + s3 = boto3.client("s3") + + all_s3_keys = [] + + paginator = s3.get_paginator("list_objects_v2") + total_objects = 0 + + for page in paginator.paginate(Bucket=bucket_name): + if "Contents" in page: + total_objects += len(page["Contents"]) + all_s3_keys.extend([obj["Key"] for obj in page["Contents"]]) + + print(f"Total number of objects in {bucket_name}: {total_objects}") + print(all_s3_keys) + return total_objects, all_s3_keys + + +list_all_s3_objects("load-testing-oct") @pytest.mark.skip(reason="AWS Suspended Account")