litellm-mirror/litellm/tests/test_proxy_server_keys.py
2024-01-06 12:30:43 +05:30

290 lines
9.1 KiB
Python

import sys, os, time
import traceback
from dotenv import load_dotenv
load_dotenv()
import os, io
# this file is to test litellm/proxy
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import pytest, logging
import litellm
from litellm import embedding, completion, completion_cost, Timeout
from litellm import RateLimitError
from httpx import AsyncClient
# Configure logging
logging.basicConfig(
level=logging.DEBUG, # Set the desired logging level
format="%(asctime)s - %(levelname)s - %(message)s",
)
from concurrent.futures import ThreadPoolExecutor
# test /chat/completion request to the proxy
from fastapi.testclient import TestClient
from fastapi import FastAPI
from litellm.proxy.proxy_server import (
router,
save_worker_config,
startup_event,
asyncio,
) # Replace with the actual module where your FastAPI router is defined
filepath = os.path.dirname(os.path.abspath(__file__))
config_fp = f"{filepath}/test_configs/test_config.yaml"
save_worker_config(
config=config_fp,
model=None,
alias=None,
api_base=None,
api_version=None,
debug=True,
temperature=None,
max_tokens=None,
request_timeout=600,
max_budget=None,
telemetry=False,
drop_params=True,
add_function_to_prompt=False,
headers=None,
save=False,
use_queue=False,
)
import asyncio
# @pytest.fixture
# def event_loop():
# """Create an instance of the default event loop for each test case."""
# policy = asyncio.WindowsSelectorEventLoopPolicy()
# res = policy.new_event_loop()
# asyncio.set_event_loop(res)
# res._close = res.close
# res.close = lambda: None
# yield res
# res._close()
# Here you create a fixture that will be used by your tests
# Make sure the fixture returns TestClient(app)
@pytest.fixture(scope="function")
async def client():
from litellm.proxy.proxy_server import (
cleanup_router_config_variables,
initialize,
ProxyLogging,
proxy_logging_obj,
)
cleanup_router_config_variables() # rest proxy before test
proxy_logging_obj = ProxyLogging(user_api_key_cache={})
proxy_logging_obj._init_litellm_callbacks() # INITIALIZE LITELLM CALLBACKS ON SERVER STARTUP <- do this to catch any logging errors on startup, not when calls are being made
await initialize(config=config_fp, debug=True)
app = FastAPI()
app.include_router(router) # Include your router in the test app
async with AsyncClient(app=app, base_url="http://testserver") as client:
yield client
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
@pytest.mark.anyio
async def test_add_new_key(client):
try:
# Your test data
test_data = {
"models": ["gpt-3.5-turbo", "gpt-4", "claude-2", "azure-model"],
"aliases": {"mistral-7b": "gpt-3.5-turbo"},
"duration": "20m",
}
print("testing proxy server - test_add_new_key")
# Your bearer token
token = os.getenv("PROXY_MASTER_KEY")
headers = {"Authorization": f"Bearer {token}"}
response = await client.post("/key/generate", json=test_data, headers=headers)
print(f"response: {response.text}")
assert response.status_code == 200
result = response.json()
assert result["key"].startswith("sk-")
async def _post_data():
json_data = {
"model": "azure-model",
"messages": [
{
"role": "user",
"content": f"this is a test request, write a short poem {time.time()}",
}
],
}
response = await client.post(
"/chat/completions",
json=json_data,
headers={"Authorization": f"Bearer {result['key']}"},
)
return response
await _post_data()
print(f"Received response: {result}")
except Exception as e:
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
@pytest.mark.anyio
async def test_update_new_key(client):
try:
# Your test data
test_data = {
"models": ["gpt-3.5-turbo", "gpt-4", "claude-2", "azure-model"],
"aliases": {"mistral-7b": "gpt-3.5-turbo"},
"duration": "20m",
}
print("testing proxy server-test_update_new_key")
# Your bearer token
token = os.getenv("PROXY_MASTER_KEY")
headers = {"Authorization": f"Bearer {token}"}
response = await client.post("/key/generate", json=test_data, headers=headers)
print(f"response: {response.text}")
assert response.status_code == 200
result = response.json()
assert result["key"].startswith("sk-")
async def _post_data():
json_data = {"models": ["bedrock-models"], "key": result["key"]}
response = await client.post("/key/update", json=json_data, headers=headers)
print(f"response text: {response.text}")
assert response.status_code == 200
return response
await _post_data()
print(f"Received response: {result}")
except Exception as e:
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")
# Run the test - only runs via pytest
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
@pytest.mark.anyio
async def test_add_new_key_max_parallel_limit(client):
try:
import anyio
print("ANY IO BACKENDS")
print(anyio.get_all_backends())
# Your test data
test_data = {
"duration": "20m",
"max_parallel_requests": 1,
"metadata": {"type": "ishaan-test"},
}
# Your bearer token
token = os.getenv("PROXY_MASTER_KEY")
headers = {"Authorization": f"Bearer {token}"}
response = await client.post("/key/generate", json=test_data, headers=headers)
print(f"response: {response.text}")
assert response.status_code == 200
result = response.json()
async def _post_data():
json_data = {
"model": "azure-model",
"messages": [
{
"role": "user",
"content": f"this is a test request, write a short poem {time.time()}",
}
],
}
response = await client.post(
"/chat/completions",
json=json_data,
headers={"Authorization": f"Bearer {result['key']}"},
)
return response
async def _run_in_parallel():
try:
futures = [_post_data() for _ in range(2)]
responses = await asyncio.gather(*futures)
print("response1 status: ", responses[0].status_code)
print("response2 status: ", responses[1].status_code)
if any(response.status_code == 429 for response in responses):
pass
else:
raise Exception()
except Exception as e:
pass
await _run_in_parallel()
# assert responses[0].status_code == 200 or responses[1].status_code == 200
except Exception as e:
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
@pytest.mark.anyio
async def test_add_new_key_max_parallel_limit_streaming(client):
try:
# Your test data
test_data = {"duration": "20m", "max_parallel_requests": 1}
# Your bearer token
token = os.getenv("PROXY_MASTER_KEY")
headers = {"Authorization": f"Bearer {token}"}
response = await client.post("/key/generate", json=test_data, headers=headers)
print(f"response: {response.text}")
assert response.status_code == 200
result = response.json()
async def _post_data():
json_data = {
"model": "azure-model",
"messages": [
{
"role": "user",
"content": f"this is a test request, write a short poem {time.time()}",
}
],
"stream": True,
}
response = await client.post(
"/chat/completions",
json=json_data,
headers={"Authorization": f"Bearer {result['key']}"},
)
return response
async def _run_in_parallel():
try:
futures = [_post_data() for _ in range(2)]
responses = await asyncio.gather(*futures)
print("response1 status: ", responses[0].status_code)
print("response2 status: ", responses[1].status_code)
if any(response.status_code == 429 for response in responses):
pass
else:
raise Exception()
except Exception as e:
pass
_run_in_parallel()
except Exception as e:
pytest.fail(f"LiteLLM Proxy test failed. Exception: {str(e)}")