refactor: refactor key tests

This commit is contained in:
Krrish Dholakia 2024-01-10 20:58:29 +05:30
parent bb04a340a5
commit 162f6f1ed3
2 changed files with 248 additions and 229 deletions

View file

@ -1,63 +1,63 @@
import sys, os, time # import sys, os, time
import traceback # import traceback
from dotenv import load_dotenv # from dotenv import load_dotenv
load_dotenv() # load_dotenv()
import os, io # import os, io
# this file is to test litellm/proxy # # this file is to test litellm/proxy
sys.path.insert( # sys.path.insert(
0, os.path.abspath("../..") # 0, os.path.abspath("../..")
) # Adds the parent directory to the system path # ) # Adds the parent directory to the system path
import pytest, logging, requests # import pytest, logging, requests
import litellm # import litellm
from litellm import embedding, completion, completion_cost, Timeout # from litellm import embedding, completion, completion_cost, Timeout
from litellm import RateLimitError # from litellm import RateLimitError
def test_add_new_key(): # def test_add_new_key():
max_retries = 3 # max_retries = 3
retry_delay = 1 # seconds # retry_delay = 1 # seconds
for retry in range(max_retries + 1): # for retry in range(max_retries + 1):
try: # try:
# Your test data # # Your test data
test_data = { # test_data = {
"models": ["gpt-3.5-turbo", "gpt-4", "claude-2", "azure-model"], # "models": ["gpt-3.5-turbo", "gpt-4", "claude-2", "azure-model"],
"aliases": {"mistral-7b": "gpt-3.5-turbo"}, # "aliases": {"mistral-7b": "gpt-3.5-turbo"},
"duration": "20m", # "duration": "20m",
} # }
print("testing proxy server") # print("testing proxy server")
# Your bearer token # # Your bearer token
token = os.getenv("PROXY_MASTER_KEY") # token = os.getenv("PROXY_MASTER_KEY")
headers = {"Authorization": f"Bearer {token}"} # headers = {"Authorization": f"Bearer {token}"}
staging_endpoint = "https://litellm-litellm-pr-1376.up.railway.app" # staging_endpoint = "https://litellm-litellm-pr-1376.up.railway.app"
main_endpoint = "https://litellm-staging.up.railway.app" # main_endpoint = "https://litellm-staging.up.railway.app"
# Make a request to the staging endpoint # # Make a request to the staging endpoint
response = requests.post( # response = requests.post(
main_endpoint + "/key/generate", json=test_data, headers=headers # main_endpoint + "/key/generate", json=test_data, headers=headers
) # )
print(f"response: {response.text}") # print(f"response: {response.text}")
if response.status_code == 200: # if response.status_code == 200:
result = response.json() # result = response.json()
break # Successful response, exit the loop # break # Successful response, exit the loop
elif response.status_code == 503 and retry < max_retries: # elif response.status_code == 503 and retry < max_retries:
print( # print(
f"Retrying in {retry_delay} seconds... (Retry {retry + 1}/{max_retries})" # f"Retrying in {retry_delay} seconds... (Retry {retry + 1}/{max_retries})"
) # )
time.sleep(retry_delay) # time.sleep(retry_delay)
else: # else:
assert False, f"Unexpected response status code: {response.status_code}" # assert False, f"Unexpected response status code: {response.status_code}"
except Exception as e: # except Exception as e:
print(traceback.format_exc()) # print(traceback.format_exc())
pytest.fail(f"An error occurred {e}") # pytest.fail(f"An error occurred {e}")
test_add_new_key() # test_add_new_key()

View file

@ -15,69 +15,66 @@ import litellm
from litellm import embedding, completion, completion_cost, Timeout from litellm import embedding, completion, completion_cost, Timeout
from litellm import RateLimitError from litellm import RateLimitError
# Configure logging
logging.basicConfig( import sys, os, time
level=logging.DEBUG, # Set the desired logging level import traceback
format="%(asctime)s - %(levelname)s - %(message)s", from dotenv import load_dotenv
)
load_dotenv()
import os, io
# this file is to test litellm/proxy
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
# test /chat/completion request to the proxy sys.path.insert(
from fastapi.testclient import TestClient 0, os.path.abspath("../..")
from fastapi import FastAPI ) # Adds the parent directory to the system path
from litellm.proxy.proxy_server import (
router,
save_worker_config,
startup_event,
shutdown_event,
) # Replace with the actual module where your FastAPI router is defined
filepath = os.path.dirname(os.path.abspath(__file__)) import pytest, logging, requests
config_fp = f"{filepath}/test_configs/test_config.yaml" import litellm
save_worker_config( from litellm import embedding, completion, completion_cost, Timeout
config=config_fp, from litellm import RateLimitError
model=None, from github import Github
alias=None, import subprocess
api_base=None,
api_version=None,
debug=True,
temperature=None,
max_tokens=None,
request_timeout=600,
max_budget=None,
telemetry=False,
drop_params=True,
add_function_to_prompt=False,
headers=None,
save=False,
use_queue=False,
)
app = FastAPI()
app.include_router(router) # Include your router in the test app
@app.on_event("startup") # Function to execute a command and return the output
async def wrapper_startup_event(): def run_command(command):
await startup_event() process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
output, _ = process.communicate()
return output.decode().strip()
@app.on_event("shutdown") # Retrieve the current branch name
async def wrapper_shutdown_event(): branch_name = run_command("git rev-parse --abbrev-ref HEAD")
await shutdown_event()
# GitHub personal access token (with repo scope) or use username and password
access_token = os.getenv("GITHUB_ACCESS_TOKEN")
# Instantiate the PyGithub library's Github object
g = Github(access_token)
# Provide the owner and name of the repository where the pull request is located
repository_owner = "BerriAI"
repository_name = "litellm"
# Get the repository object
repo = g.get_repo(f"{repository_owner}/{repository_name}")
# Iterate through the pull requests to find the one related to your branch
for pr in repo.get_pulls():
print(f"in here! {pr.head.ref}")
if pr.head.ref == branch_name:
pr_number = pr.number
break
print(f"The pull request number for branch {branch_name} is: {pr_number}")
# Here you create a fixture that will be used by your tests def test_add_new_key():
# Make sure the fixture returns TestClient(app) max_retries = 3
@pytest.fixture(autouse=True) retry_delay = 1 # seconds
def client():
from litellm.proxy.proxy_server import cleanup_router_config_variables
cleanup_router_config_variables() for retry in range(max_retries + 1):
with TestClient(app) as client:
yield client
def test_add_new_key(client):
try: try:
# Your test data # Your test data
test_data = { test_data = {
@ -86,40 +83,37 @@ def test_add_new_key(client):
"duration": "20m", "duration": "20m",
} }
print("testing proxy server") print("testing proxy server")
# Your bearer token # Your bearer token
token = os.getenv("PROXY_MASTER_KEY") token = os.getenv("PROXY_MASTER_KEY")
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
response = client.post("/key/generate", json=test_data, headers=headers)
print(f"response: {response.text}")
assert response.status_code == 200
result = response.json()
assert result["key"].startswith("sk-")
def _post_data(): endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app"
json_data = {
"model": "azure-model", # Make a request to the staging endpoint
"messages": [ response = requests.post(
{ endpoint + "/key/generate", json=test_data, headers=headers
"role": "user",
"content": f"this is a test request, write a short poem {time.time()}",
}
],
}
response = client.post(
"/chat/completions",
json=json_data,
headers={"Authorization": f"Bearer {result['key']}"},
) )
return response
_post_data() print(f"response: {response.text}")
print(f"Received response: {result}")
if response.status_code == 200:
result = response.json()
break # Successful response, exit the loop
elif response.status_code == 503 and retry < max_retries:
print(
f"Retrying in {retry_delay} seconds... (Retry {retry + 1}/{max_retries})"
)
time.sleep(retry_delay)
else:
assert False, f"Unexpected response status code: {response.status_code}"
except Exception as e: except Exception as e:
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}") print(traceback.format_exc())
pytest.fail(f"An error occurred {e}")
def test_update_new_key(client): def test_update_new_key():
try: try:
# Your test data # Your test data
test_data = { test_data = {
@ -130,17 +124,23 @@ def test_update_new_key(client):
print("testing proxy server") print("testing proxy server")
# Your bearer token # Your bearer token
token = os.getenv("PROXY_MASTER_KEY") token = os.getenv("PROXY_MASTER_KEY")
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
response = client.post("/key/generate", json=test_data, headers=headers)
print(f"response: {response.text}") endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app"
# Make a request to the staging endpoint
response = requests.post(
endpoint + "/key/generate", json=test_data, headers=headers
)
assert response.status_code == 200 assert response.status_code == 200
result = response.json() result = response.json()
assert result["key"].startswith("sk-") assert result["key"].startswith("sk-")
def _post_data(): def _post_data():
json_data = {"models": ["bedrock-models"], "key": result["key"]} json_data = {"models": ["bedrock-models"], "key": result["key"]}
response = client.post("/key/update", json=json_data, headers=headers) response = requests.post(
endpoint + "/key/generate", json=json_data, headers=headers
)
print(f"response text: {response.text}") print(f"response text: {response.text}")
assert response.status_code == 200 assert response.status_code == 200
return response return response
@ -151,101 +151,120 @@ def test_update_new_key(client):
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}") pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")
# # Run the test - only runs via pytest # def test_add_new_key_max_parallel_limit():
# try:
# # Your test data
# test_data = {"duration": "20m", "max_parallel_requests": 1}
# # Your bearer token
# token = os.getenv("PROXY_MASTER_KEY")
# headers = {"Authorization": f"Bearer {token}"}
# endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app"
# print(f"endpoint: {endpoint}")
# # Make a request to the staging endpoint
# response = requests.post(
# endpoint + "/key/generate", json=test_data, headers=headers
# )
# assert response.status_code == 200
# result = response.json()
# # load endpoint with model
# model_data = {
# "model_name": "azure-model",
# "litellm_params": {
# "model": "azure/chatgpt-v-2",
# "api_key": os.getenv("AZURE_API_KEY"),
# "api_base": os.getenv("AZURE_API_BASE"),
# "api_version": os.getenv("AZURE_API_VERSION")
# }
# }
# response = requests.post(endpoint + "/model/new", json=model_data, headers=headers)
# assert response.status_code == 200
# print(f"response text: {response.text}")
def test_add_new_key_max_parallel_limit(client): # def _post_data():
try: # json_data = {
# Your test data # "model": "azure-model",
test_data = {"duration": "20m", "max_parallel_requests": 1} # "messages": [
# Your bearer token # {
token = os.getenv("PROXY_MASTER_KEY") # "role": "user",
# "content": f"this is a test request, write a short poem {time.time()}",
# }
# ],
# }
# # Your bearer token
# response = requests.post(
# endpoint + "/chat/completions", json=json_data, headers={"Authorization": f"Bearer {result['key']}"}
# )
# return response
headers = {"Authorization": f"Bearer {token}"} # def _run_in_parallel():
response = client.post("/key/generate", json=test_data, headers=headers) # with ThreadPoolExecutor(max_workers=2) as executor:
print(f"response: {response.text}") # future1 = executor.submit(_post_data)
assert response.status_code == 200 # future2 = executor.submit(_post_data)
result = response.json()
def _post_data(): # # Obtain the results from the futures
json_data = { # response1 = future1.result()
"model": "azure-model", # print(f"response1 text: {response1.text}")
"messages": [ # response2 = future2.result()
{ # print(f"response2 text: {response2.text}")
"role": "user", # if response1.status_code == 429 or response2.status_code == 429:
"content": f"this is a test request, write a short poem {time.time()}", # pass
} # else:
], # raise Exception()
}
response = client.post(
"/chat/completions",
json=json_data,
headers={"Authorization": f"Bearer {result['key']}"},
)
return response
def _run_in_parallel(): # _run_in_parallel()
with ThreadPoolExecutor(max_workers=2) as executor: # except Exception as e:
future1 = executor.submit(_post_data) # pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")
future2 = executor.submit(_post_data)
# Obtain the results from the futures # def test_add_new_key_max_parallel_limit_streaming():
response1 = future1.result() # try:
response2 = future2.result() # # Your test data
if response1.status_code == 429 or response2.status_code == 429: # test_data = {"duration": "20m", "max_parallel_requests": 1}
pass # # Your bearer token
else: # token = os.getenv("PROXY_MASTER_KEY")
raise Exception() # headers = {"Authorization": f"Bearer {token}"}
_run_in_parallel() # endpoint = f"https://litellm-litellm-pr-{pr_number}.up.railway.app"
except Exception as e:
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")
# # Make a request to the staging endpoint
# response = requests.post(
# endpoint + "/key/generate", json=test_data, headers=headers
# )
# print(f"response: {response.text}")
# assert response.status_code == 200
# result = response.json()
def test_add_new_key_max_parallel_limit_streaming(client): # def _post_data():
try: # json_data = {
# Your test data # "model": "azure-model",
test_data = {"duration": "20m", "max_parallel_requests": 1} # "messages": [
# Your bearer token # {
token = os.getenv("PROXY_MASTER_KEY") # "role": "user",
# "content": f"this is a test request, write a short poem {time.time()}",
# }
# ],
# "stream": True,
# }
# response = requests.post(
# endpoint + "/chat/completions", json=json_data, headers={"Authorization": f"Bearer {result['key']}"}
# )
# return response
headers = {"Authorization": f"Bearer {token}"} # def _run_in_parallel():
response = client.post("/key/generate", json=test_data, headers=headers) # with ThreadPoolExecutor(max_workers=2) as executor:
print(f"response: {response.text}") # future1 = executor.submit(_post_data)
assert response.status_code == 200 # future2 = executor.submit(_post_data)
result = response.json()
def _post_data(): # # Obtain the results from the futures
json_data = { # response1 = future1.result()
"model": "azure-model", # response2 = future2.result()
"messages": [ # if response1.status_code == 429 or response2.status_code == 429:
{ # pass
"role": "user", # else:
"content": f"this is a test request, write a short poem {time.time()}", # raise Exception()
}
],
"stream": True,
}
response = client.post(
"/chat/completions",
json=json_data,
headers={"Authorization": f"Bearer {result['key']}"},
)
return response
def _run_in_parallel(): # _run_in_parallel()
with ThreadPoolExecutor(max_workers=2) as executor: # except Exception as e:
future1 = executor.submit(_post_data) # pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")
future2 = executor.submit(_post_data)
# Obtain the results from the futures
response1 = future1.result()
response2 = future2.result()
if response1.status_code == 429 or response2.status_code == 429:
pass
else:
raise Exception()
_run_in_parallel()
except Exception as e:
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")