import sys import os import io, asyncio # import logging # logging.basicConfig(level=logging.DEBUG) sys.path.insert(0, os.path.abspath("../..")) from litellm import completion import litellm litellm.num_retries = 3 import time, random import pytest def test_s3_logging(): # all s3 requests need to be in one test function # since we are modifying stdout, and pytests runs tests in parallel # on circle ci - we only test litellm.acompletion() try: # redirect stdout to log_file litellm.cache = litellm.Cache( type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2" ) litellm.success_callback = ["s3"] litellm.s3_callback_params = { "s3_bucket_name": "litellm-logs", "s3_aws_secret_access_key": "os.environ/AWS_SECRET_ACCESS_KEY", "s3_aws_access_key_id": "os.environ/AWS_ACCESS_KEY_ID", } litellm.set_verbose = True print("Testing async s3 logging") expected_keys = [] import time curr_time = str(time.time()) async def _test(): return await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": f"This is a test {curr_time}"}], max_tokens=10, temperature=0.7, user="ishaan-2", ) response = asyncio.run(_test()) print(f"response: {response}") expected_keys.append(response.id) async def _test(): return await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": f"This is a test {curr_time}"}], max_tokens=10, temperature=0.7, user="ishaan-2", ) response = asyncio.run(_test()) expected_keys.append(response.id) print(f"response: {response}") import boto3 s3 = boto3.client("s3") bucket_name = "litellm-logs" # List objects in the bucket response = s3.list_objects(Bucket=bucket_name) # Sort the objects based on the LastModified timestamp objects = sorted( response["Contents"], key=lambda x: x["LastModified"], reverse=True ) # Get the keys of the most recent objects most_recent_keys = [obj["Key"] for obj in objects] print(most_recent_keys) # for each key, get the part before "-" as the key. Do it safely cleaned_keys = [] for key in most_recent_keys: split_key = key.split("-time=") cleaned_keys.append(split_key[0]) print("\n most recent keys", most_recent_keys) print("\n cleaned keys", cleaned_keys) print("\n Expected keys: ", expected_keys) matches = 0 for key in expected_keys: assert key in cleaned_keys if key in cleaned_keys: matches += 1 # remove the match key cleaned_keys.remove(key) # this asserts we log, the first request + the 2nd cached request print("we had two matches ! passed ", matches) assert matches == 2 try: # cleanup s3 bucket in test for key in most_recent_keys: s3.delete_object(Bucket=bucket_name, Key=key) except: # don't let cleanup fail a test pass except Exception as e: pytest.fail(f"An exception occurred - {e}") finally: # post, close log file and verify # Reset stdout to the original value print("Passed! Testing async s3 logging") # test_s3_logging() def test_s3_logging_r2(): # all s3 requests need to be in one test function # since we are modifying stdout, and pytests runs tests in parallel # on circle ci - we only test litellm.acompletion() try: # redirect stdout to log_file # litellm.cache = litellm.Cache( # type="s3", s3_bucket_name="litellm-r2-bucket", s3_region_name="us-west-2" # ) litellm.set_verbose = True from litellm._logging import verbose_logger import logging verbose_logger.setLevel(level=logging.DEBUG) litellm.success_callback = ["s3"] litellm.s3_callback_params = { "s3_bucket_name": "litellm-r2-bucket", "s3_aws_secret_access_key": "os.environ/R2_S3_ACCESS_KEY", "s3_aws_access_key_id": "os.environ/R2_S3_ACCESS_ID", "s3_endpoint_url": "os.environ/R2_S3_URL", "s3_region_name": "os.environ/R2_S3_REGION_NAME", } print("Testing async s3 logging") expected_keys = [] import time curr_time = str(time.time()) async def _test(): return await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": f"This is a test {curr_time}"}], max_tokens=10, temperature=0.7, user="ishaan-2", ) response = asyncio.run(_test()) print(f"response: {response}") expected_keys.append(response.id) import boto3 s3 = boto3.client( "s3", endpoint_url=os.getenv("R2_S3_URL"), region_name=os.getenv("R2_S3_REGION_NAME"), aws_access_key_id=os.getenv("R2_S3_ACCESS_ID"), aws_secret_access_key=os.getenv("R2_S3_ACCESS_KEY"), ) bucket_name = "litellm-r2-bucket" # List objects in the bucket response = s3.list_objects(Bucket=bucket_name) # # Sort the objects based on the LastModified timestamp # objects = sorted( # response["Contents"], key=lambda x: x["LastModified"], reverse=True # ) # # Get the keys of the most recent objects # most_recent_keys = [obj["Key"] for obj in objects] # print(most_recent_keys) # # for each key, get the part before "-" as the key. Do it safely # cleaned_keys = [] # for key in most_recent_keys: # split_key = key.split("-time=") # cleaned_keys.append(split_key[0]) # print("\n most recent keys", most_recent_keys) # print("\n cleaned keys", cleaned_keys) # print("\n Expected keys: ", expected_keys) # matches = 0 # for key in expected_keys: # assert key in cleaned_keys # if key in cleaned_keys: # matches += 1 # # remove the match key # cleaned_keys.remove(key) # # this asserts we log, the first request + the 2nd cached request # print("we had two matches ! passed ", matches) # assert matches == 1 # try: # # cleanup s3 bucket in test # for key in most_recent_keys: # s3.delete_object(Bucket=bucket_name, Key=key) # except: # # don't let cleanup fail a test # pass except Exception as e: pytest.fail(f"An exception occurred - {e}") finally: # post, close log file and verify # Reset stdout to the original value print("Passed! Testing async s3 logging")