litellm-mirror/litellm/tests/test_profiling_router.py
2023-11-29 20:05:13 -08:00

116 lines
No EOL
3.9 KiB
Python

# #### What this tests ####
# # This profiles a router call to find where calls are taking the most time.
# import sys, os, time, logging
# import traceback, asyncio, uuid
# import pytest
# import cProfile
# from pstats import Stats
# sys.path.insert(
# 0, os.path.abspath("../..")
# ) # Adds the parent directory to the system path
# import litellm
# from litellm import Router
# from concurrent.futures import ThreadPoolExecutor
# from dotenv import load_dotenv
# from aiodebug import log_slow_callbacks # Import the aiodebug utility for logging slow callbacks
# load_dotenv()
# logging.basicConfig(
# level=logging.DEBUG,
# format='%(asctime)s %(levelname)s: %(message)s',
# datefmt='%I:%M:%S %p',
# filename='aiologs.log', # Name of the log file where logs will be written
# filemode='w' # 'w' to overwrite the log file on each run, use 'a' to append
# )
# model_list = [{
# "model_name": "azure-model",
# "litellm_params": {
# "model": "azure/gpt-turbo",
# "api_key": "os.environ/AZURE_FRANCE_API_KEY",
# "api_base": "https://openai-france-1234.openai.azure.com",
# "rpm": 1440,
# }
# }, {
# "model_name": "azure-model",
# "litellm_params": {
# "model": "azure/gpt-35-turbo",
# "api_key": "os.environ/AZURE_EUROPE_API_KEY",
# "api_base": "https://my-endpoint-europe-berri-992.openai.azure.com",
# "rpm": 6
# }
# }, {
# "model_name": "azure-model",
# "litellm_params": {
# "model": "azure/gpt-35-turbo",
# "api_key": "os.environ/AZURE_CANADA_API_KEY",
# "api_base": "https://my-endpoint-canada-berri992.openai.azure.com",
# "rpm": 6
# }
# }]
# router = Router(model_list=model_list, set_verbose=False, num_retries=3)
# async def router_completion():
# try:
# messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}]
# response = await router.acompletion(model="azure-model", messages=messages)
# return response
# except asyncio.exceptions.CancelledError:
# print("Task was cancelled")
# return None
# except Exception as e:
# return None
# async def loadtest_fn(n = 1000):
# start = time.time()
# tasks = [router_completion() for _ in range(n)]
# chat_completions = await asyncio.gather(*tasks)
# successful_completions = [c for c in chat_completions if c is not None]
# print(n, time.time() - start, len(successful_completions))
# # loop = asyncio.get_event_loop()
# # loop.set_debug(True)
# # log_slow_callbacks.enable(0.05) # Log callbacks slower than 0.05 seconds
# # # Excute the load testing function within the asyncio event loop
# # loop.run_until_complete(loadtest_fn())
# ### SUSTAINED LOAD TESTS ###
# import time, asyncio
# async def make_requests(n):
# tasks = [router_completion() for _ in range(n)]
# print(f"num tasks: {len(tasks)}")
# chat_completions = await asyncio.gather(*tasks)
# successful_completions = [c for c in chat_completions if c is not None]
# print(f"successful_completions: {len(successful_completions)}")
# return successful_completions
# async def main():
# start_time = time.time()
# total_successful_requests = 0
# request_limit = 1000
# batches = 2 # batches of 1k requests
# start = time.time()
# tasks = [] # list to hold all tasks
# async def request_loop():
# nonlocal tasks
# for _ in range(batches):
# # Make 1,000 requests
# task = asyncio.create_task(make_requests(request_limit))
# tasks.append(task)
# # Introduce a delay to achieve 1,000 requests per second
# await asyncio.sleep(1)
# await request_loop()
# results = await asyncio.gather(*tasks)
# total_successful_requests = sum(len(res) for res in results)
# print(request_limit*batches, time.time() - start, total_successful_requests)
# asyncio.run(main())