litellm/tests/local_testing/test_router.py
Krish Dholakia 2d2931a215
LiteLLM Minor Fixes & Improvements (11/26/2024) (#6913)
* docs(config_settings.md): document all router_settings

* ci(config.yml): add router_settings doc test to ci/cd

* test: debug test on ci/cd

* test: debug ci/cd test

* test: fix test

* fix(team_endpoints.py): skip invalid team object. don't fail `/team/list` call

Causes downstream errors if ui just fails to load team list

* test(base_llm_unit_tests.py): add 'response_format={"type": "text"}' test to base_llm_unit_tests

adds complete coverage for all 'response_format' values to ci/cd

* feat(router.py): support wildcard routes in `get_router_model_info()`

Addresses https://github.com/BerriAI/litellm/issues/6914

* build(model_prices_and_context_window.json): add tpm/rpm limits for all gemini models

Allows for ratelimit tracking for gemini models even with wildcard routing enabled

Addresses https://github.com/BerriAI/litellm/issues/6914

* feat(router.py): add tpm/rpm tracking on success/failure to global_router

Addresses https://github.com/BerriAI/litellm/issues/6914

* feat(router.py): support wildcard routes on router.get_model_group_usage()

* fix(router.py): fix linting error

* fix(router.py): implement get_remaining_tokens_and_requests

Addresses https://github.com/BerriAI/litellm/issues/6914

* fix(router.py): fix linting errors

* test: fix test

* test: fix tests

* docs(config_settings.md): add missing dd env vars to docs

* fix(router.py): check if hidden params is dict
2024-11-28 00:01:38 +05:30

2705 lines
90 KiB
Python

#### What this tests ####
# This tests litellm router
import asyncio
import os
import sys
import time
import traceback
import openai
import pytest
import litellm.types
import litellm.types.router
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import os
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
from dotenv import load_dotenv
from pydantic import BaseModel
import litellm
from litellm import Router
from litellm.router import Deployment, LiteLLM_Params, ModelInfo
from litellm.router_utils.cooldown_handlers import (
_async_get_cooldown_deployments,
_get_cooldown_deployments,
)
from litellm.types.router import DeploymentTypedDict
load_dotenv()
def test_router_deployment_typing():
deployment_typed_dict = DeploymentTypedDict(
model_name="hi", litellm_params={"model": "hello-world"}
)
for value in deployment_typed_dict.items():
assert not isinstance(value, BaseModel)
def test_router_multi_org_list():
"""
Pass list of orgs in 1 model definition,
expect a unique deployment for each to be created
"""
router = litellm.Router(
model_list=[
{
"model_name": "*",
"litellm_params": {
"model": "openai/*",
"api_key": "my-key",
"api_base": "https://api.openai.com/v1",
"organization": ["org-1", "org-2", "org-3"],
},
}
]
)
assert len(router.get_model_list()) == 3
@pytest.mark.asyncio()
async def test_router_provider_wildcard_routing():
"""
Pass list of orgs in 1 model definition,
expect a unique deployment for each to be created
"""
litellm.set_verbose = True
router = litellm.Router(
model_list=[
{
"model_name": "openai/*",
"litellm_params": {
"model": "openai/*",
"api_key": os.environ["OPENAI_API_KEY"],
"api_base": "https://api.openai.com/v1",
},
},
{
"model_name": "anthropic/*",
"litellm_params": {
"model": "anthropic/*",
"api_key": os.environ["ANTHROPIC_API_KEY"],
},
},
{
"model_name": "groq/*",
"litellm_params": {
"model": "groq/*",
"api_key": os.environ["GROQ_API_KEY"],
},
},
]
)
print("router model list = ", router.get_model_list())
response1 = await router.acompletion(
model="anthropic/claude-3-sonnet-20240229",
messages=[{"role": "user", "content": "hello"}],
)
print("response 1 = ", response1)
response2 = await router.acompletion(
model="openai/gpt-3.5-turbo",
messages=[{"role": "user", "content": "hello"}],
)
print("response 2 = ", response2)
response3 = await router.acompletion(
model="groq/llama3-8b-8192",
messages=[{"role": "user", "content": "hello"}],
)
print("response 3 = ", response3)
response4 = await router.acompletion(
model="claude-3-5-sonnet-20240620",
messages=[{"role": "user", "content": "hello"}],
)
@pytest.mark.asyncio()
async def test_router_provider_wildcard_routing_regex():
"""
Pass list of orgs in 1 model definition,
expect a unique deployment for each to be created
"""
router = litellm.Router(
model_list=[
{
"model_name": "openai/fo::*:static::*",
"litellm_params": {
"model": "openai/fo::*:static::*",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
},
},
{
"model_name": "openai/foo3::hello::*",
"litellm_params": {
"model": "openai/foo3::hello::*",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
},
},
]
)
print("router model list = ", router.get_model_list())
response1 = await router.acompletion(
model="openai/fo::anything-can-be-here::static::anything-can-be-here",
messages=[{"role": "user", "content": "hello"}],
)
print("response 1 = ", response1)
response2 = await router.acompletion(
model="openai/foo3::hello::static::anything-can-be-here",
messages=[{"role": "user", "content": "hello"}],
)
print("response 2 = ", response2)
def test_router_specific_model_via_id():
"""
Call a specific deployment by it's id
"""
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-fake-key",
"mock_response": "Hello world",
},
"model_info": {"id": "1234"},
}
]
)
router.completion(model="1234", messages=[{"role": "user", "content": "Hey!"}])
def test_router_azure_ai_client_init():
_deployment = {
"model_name": "meta-llama-3-70b",
"litellm_params": {
"model": "azure_ai/Meta-Llama-3-70B-instruct",
"api_base": "my-fake-route",
"api_key": "my-fake-key",
},
"model_info": {"id": "1234"},
}
router = Router(model_list=[_deployment])
_client = router._get_client(
deployment=_deployment,
client_type="async",
kwargs={"stream": False},
)
print(_client)
from openai import AsyncAzureOpenAI, AsyncOpenAI
assert isinstance(_client, AsyncOpenAI)
assert not isinstance(_client, AsyncAzureOpenAI)
def test_router_sensitive_keys():
try:
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "special-key",
},
"model_info": {"id": 12345},
},
],
)
except Exception as e:
print(f"error msg - {str(e)}")
assert "special-key" not in str(e)
def test_router_order():
"""
Asserts for 2 models in a model group, model with order=1 always called first
"""
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-4o",
"api_key": os.getenv("OPENAI_API_KEY"),
"mock_response": "Hello world",
"order": 1,
},
"model_info": {"id": "1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-4o",
"api_key": "bad-key",
"mock_response": Exception("this is a bad key"),
"order": 2,
},
"model_info": {"id": "2"},
},
],
num_retries=0,
allowed_fails=0,
enable_pre_call_checks=True,
)
for _ in range(100):
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
assert isinstance(response, litellm.ModelResponse)
assert response._hidden_params["model_id"] == "1"
@pytest.mark.parametrize("num_retries", [None, 2])
@pytest.mark.parametrize("max_retries", [None, 4])
def test_router_num_retries_init(num_retries, max_retries):
"""
- test when num_retries set v/s not
- test client value when max retries set v/s not
"""
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"max_retries": max_retries,
},
"model_info": {"id": 12345},
},
],
num_retries=num_retries,
)
if num_retries is not None:
assert router.num_retries == num_retries
else:
assert router.num_retries == openai.DEFAULT_MAX_RETRIES
model_client = router._get_client(
{"model_info": {"id": 12345}}, client_type="async", kwargs={}
)
if max_retries is not None:
assert getattr(model_client, "max_retries") == max_retries
else:
assert getattr(model_client, "max_retries") == 0
@pytest.mark.parametrize(
"timeout", [10, 1.0, httpx.Timeout(timeout=300.0, connect=20.0)]
)
@pytest.mark.parametrize("ssl_verify", [True, False])
def test_router_timeout_init(timeout, ssl_verify):
"""
Allow user to pass httpx.Timeout
related issue - https://github.com/BerriAI/litellm/issues/3162
"""
litellm.ssl_verify = ssl_verify
router = Router(
model_list=[
{
"model_name": "test-model",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
"timeout": timeout,
},
"model_info": {"id": 1234},
}
]
)
model_client = router._get_client(
deployment={"model_info": {"id": 1234}}, client_type="sync_client", kwargs={}
)
assert getattr(model_client, "timeout") == timeout
print(f"vars model_client: {vars(model_client)}")
http_client = getattr(model_client, "_client")
print(f"http client: {vars(http_client)}, ssl_Verify={ssl_verify}")
if ssl_verify == False:
assert http_client._transport._pool._ssl_context.verify_mode.name == "CERT_NONE"
else:
assert (
http_client._transport._pool._ssl_context.verify_mode.name
== "CERT_REQUIRED"
)
@pytest.mark.parametrize("sync_mode", [False, True])
@pytest.mark.asyncio
async def test_router_retries(sync_mode):
"""
- make sure retries work as expected
"""
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo", "api_key": "bad-key"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
},
},
]
router = Router(model_list=model_list, num_retries=2)
if sync_mode:
router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
else:
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
print(response.choices[0].message)
@pytest.mark.parametrize(
"mistral_api_base",
[
"os.environ/AZURE_MISTRAL_API_BASE",
"https://Mistral-large-nmefg-serverless.eastus2.inference.ai.azure.com/v1/",
"https://Mistral-large-nmefg-serverless.eastus2.inference.ai.azure.com/v1",
"https://Mistral-large-nmefg-serverless.eastus2.inference.ai.azure.com/",
"https://Mistral-large-nmefg-serverless.eastus2.inference.ai.azure.com",
],
)
def test_router_azure_ai_studio_init(mistral_api_base):
router = Router(
model_list=[
{
"model_name": "test-model",
"litellm_params": {
"model": "azure/mistral-large-latest",
"api_key": "os.environ/AZURE_MISTRAL_API_KEY",
"api_base": mistral_api_base,
},
"model_info": {"id": 1234},
}
]
)
model_client = router._get_client(
deployment={"model_info": {"id": 1234}}, client_type="sync_client", kwargs={}
)
url = getattr(model_client, "_base_url")
uri_reference = str(getattr(url, "_uri_reference"))
print(f"uri_reference: {uri_reference}")
assert "/v1/" in uri_reference
assert uri_reference.count("v1") == 1
def test_exception_raising():
# this tests if the router raises an exception when invalid params are set
# in this test both deployments have bad keys - Keep this test. It validates if the router raises the most recent exception
litellm.set_verbose = True
import openai
try:
print("testing if router raises an exception")
old_api_key = os.environ["AZURE_API_KEY"]
os.environ["AZURE_API_KEY"] = ""
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
"tpm": 240000,
"rpm": 1800,
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { #
"model": "gpt-3.5-turbo",
"api_key": "bad-key",
},
"tpm": 240000,
"rpm": 1800,
},
]
router = Router(
model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=int(os.getenv("REDIS_PORT")),
routing_strategy="simple-shuffle",
set_verbose=False,
num_retries=1,
) # type: ignore
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hello this request will fail"}],
)
os.environ["AZURE_API_KEY"] = old_api_key
pytest.fail(f"Should have raised an Auth Error")
except openai.AuthenticationError:
print(
"Test Passed: Caught an OPENAI AUTH Error, Good job. This is what we needed!"
)
os.environ["AZURE_API_KEY"] = old_api_key
router.reset()
except Exception as e:
os.environ["AZURE_API_KEY"] = old_api_key
print("Got unexpected exception on router!", e)
# test_exception_raising()
def test_reading_key_from_model_list():
# [PROD TEST CASE]
# this tests if the router can read key from model list and make completion call, and completion + stream call. This is 90% of the router use case
# DO NOT REMOVE THIS TEST. It's an IMP ONE. Speak to Ishaan, if you are tring to remove this
litellm.set_verbose = False
import openai
try:
print("testing if router raises an exception")
old_api_key = os.environ["AZURE_API_KEY"]
os.environ.pop("AZURE_API_KEY", None)
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": old_api_key,
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
"tpm": 240000,
"rpm": 1800,
}
]
router = Router(
model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=int(os.getenv("REDIS_PORT")),
routing_strategy="simple-shuffle",
set_verbose=True,
num_retries=1,
) # type: ignore
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hello this request will fail"}],
)
print("\n response", response)
str_response = response.choices[0].message.content
print("\n str_response", str_response)
assert len(str_response) > 0
print("\n Testing streaming response")
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hello this request will fail"}],
stream=True,
)
completed_response = ""
for chunk in response:
if chunk is not None:
print(chunk)
completed_response += chunk.choices[0].delta.content or ""
print("\n completed_response", completed_response)
assert len(completed_response) > 0
print("\n Passed Streaming")
os.environ["AZURE_API_KEY"] = old_api_key
router.reset()
except Exception as e:
os.environ["AZURE_API_KEY"] = old_api_key
print(f"FAILED TEST")
pytest.fail(f"Got unexpected exception on router! - {e}")
# test_reading_key_from_model_list()
def test_call_one_endpoint():
# [PROD TEST CASE]
# user passes one deployment they want to call on the router, we call the specified one
# this test makes a completion calls azure/chatgpt-v-2, it should work
try:
print("Testing calling a specific deployment")
old_api_key = os.environ["AZURE_API_KEY"]
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": old_api_key,
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
"tpm": 240000,
"rpm": 1800,
},
{
"model_name": "text-embedding-ada-002",
"litellm_params": {
"model": "azure/azure-embedding-model",
"api_key": os.environ["AZURE_API_KEY"],
"api_base": os.environ["AZURE_API_BASE"],
},
"tpm": 100000,
"rpm": 10000,
},
]
litellm.set_verbose = True
router = Router(
model_list=model_list,
routing_strategy="simple-shuffle",
set_verbose=True,
num_retries=1,
) # type: ignore
old_api_base = os.environ.pop("AZURE_API_BASE", None)
async def call_azure_completion():
response = await router.acompletion(
model="azure/chatgpt-v-2",
messages=[{"role": "user", "content": "hello this request will pass"}],
specific_deployment=True,
)
print("\n response", response)
async def call_azure_embedding():
response = await router.aembedding(
model="azure/azure-embedding-model",
input=["good morning from litellm"],
specific_deployment=True,
)
print("\n response", response)
asyncio.run(call_azure_completion())
asyncio.run(call_azure_embedding())
os.environ["AZURE_API_BASE"] = old_api_base
os.environ["AZURE_API_KEY"] = old_api_key
except Exception as e:
print(f"FAILED TEST")
pytest.fail(f"Got unexpected exception on router! - {e}")
# test_call_one_endpoint()
def test_router_azure_acompletion():
# [PROD TEST CASE]
# This is 90% of the router use case, makes an acompletion call, acompletion + stream call and verifies it got a response
# DO NOT REMOVE THIS TEST. It's an IMP ONE. Speak to Ishaan, if you are tring to remove this
litellm.set_verbose = False
import openai
try:
print("Router Test Azure - Acompletion, Acompletion with stream")
# remove api key from env to repro how proxy passes key to router
old_api_key = os.environ["AZURE_API_KEY"]
os.environ.pop("AZURE_API_KEY", None)
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": old_api_key,
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
"rpm": 1800,
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/gpt-turbo",
"api_key": os.getenv("AZURE_FRANCE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": "https://openai-france-1234.openai.azure.com",
},
"rpm": 1800,
},
]
router = Router(
model_list=model_list, routing_strategy="simple-shuffle", set_verbose=True
) # type: ignore
async def test1():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hello this request will pass"}],
)
str_response = response.choices[0].message.content
print("\n str_response", str_response)
assert len(str_response) > 0
print("\n response", response)
asyncio.run(test1())
print("\n Testing streaming response")
async def test2():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hello this request will fail"}],
stream=True,
)
completed_response = ""
async for chunk in response:
if chunk is not None:
print(chunk)
completed_response += chunk.choices[0].delta.content or ""
print("\n completed_response", completed_response)
assert len(completed_response) > 0
asyncio.run(test2())
print("\n Passed Streaming")
os.environ["AZURE_API_KEY"] = old_api_key
router.reset()
except Exception as e:
os.environ["AZURE_API_KEY"] = old_api_key
print(f"FAILED TEST")
pytest.fail(f"Got unexpected exception on router! - {e}")
# test_router_azure_acompletion()
def test_router_context_window_fallback():
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
import os
from large_text import text
litellm.set_verbose = False
print(f"len(text): {len(text)}")
try:
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
},
{
"model_name": "gpt-3.5-turbo-large", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, set_verbose=True, context_window_fallbacks=[{"gpt-3.5-turbo": ["gpt-3.5-turbo-large"]}], num_retries=0) # type: ignore
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
assert response.model == "gpt-3.5-turbo-1106"
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
@pytest.mark.asyncio
async def test_async_router_context_window_fallback():
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
import os
from large_text import text
litellm.set_verbose = False
print(f"len(text): {len(text)}")
try:
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
},
{
"model_name": "gpt-3.5-turbo-large", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, set_verbose=True, context_window_fallbacks=[{"gpt-3.5-turbo": ["gpt-3.5-turbo-large"]}], num_retries=0) # type: ignore
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
assert response.model == "gpt-3.5-turbo-1106"
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
def test_router_rpm_pre_call_check():
"""
- for a given model not in model cost map
- with rpm set
- check if rpm check is run
"""
try:
model_list = [
{
"model_name": "fake-openai-endpoint", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "openai/my-fake-model",
"api_key": "my-fake-key",
"api_base": "https://openai-function-calling-workers.tasslexyz.workers.dev/",
"rpm": 0,
},
},
]
router = Router(model_list=model_list, set_verbose=True, enable_pre_call_checks=True, num_retries=0) # type: ignore
try:
router._pre_call_checks(
model="fake-openai-endpoint",
healthy_deployments=model_list,
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
pytest.fail("Expected this to fail")
except Exception:
pass
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
def test_router_context_window_check_pre_call_check_in_group_custom_model_info():
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
import os
from large_text import text
litellm.set_verbose = False
print(f"len(text): {len(text)}")
try:
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
"mock_response": "Hello world 1!",
},
"model_info": {"max_input_tokens": 100},
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
"mock_response": "Hello world 2!",
},
"model_info": {"max_input_tokens": 0},
},
]
router = Router(model_list=model_list, set_verbose=True, enable_pre_call_checks=True, num_retries=0) # type: ignore
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
assert response.choices[0].message.content == "Hello world 1!"
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
def test_router_context_window_check_pre_call_check():
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
import os
from large_text import text
litellm.set_verbose = False
print(f"len(text): {len(text)}")
try:
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
"mock_response": "Hello world 1!",
},
"model_info": {"base_model": "azure/gpt-35-turbo"},
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
"mock_response": "Hello world 2!",
},
},
]
router = Router(model_list=model_list, set_verbose=True, enable_pre_call_checks=True, num_retries=0) # type: ignore
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
assert response.choices[0].message.content == "Hello world 2!"
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
def test_router_context_window_check_pre_call_check_out_group():
"""
- Give 2 gpt-3.5-turbo model groups with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
import os
from large_text import text
litellm.set_verbose = False
print(f"len(text): {len(text)}")
try:
model_list = [
{
"model_name": "gpt-3.5-turbo-small", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
},
{
"model_name": "gpt-3.5-turbo-large", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, set_verbose=True, enable_pre_call_checks=True, num_retries=0, context_window_fallbacks=[{"gpt-3.5-turbo-small": ["gpt-3.5-turbo-large"]}]) # type: ignore
response = router.completion(
model="gpt-3.5-turbo-small",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
def test_filter_invalid_params_pre_call_check():
"""
- gpt-3.5-turbo supports 'response_object'
- gpt-3.5-turbo-16k doesn't support 'response_object'
run pre-call check -> assert returned list doesn't include gpt-3.5-turbo-16k
"""
try:
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-16k",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, set_verbose=True, enable_pre_call_checks=True, num_retries=0) # type: ignore
filtered_deployments = router._pre_call_checks(
model="gpt-3.5-turbo",
healthy_deployments=model_list,
messages=[{"role": "user", "content": "Hey, how's it going?"}],
request_kwargs={"response_format": {"type": "json_object"}},
)
assert len(filtered_deployments) == 1
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
@pytest.mark.parametrize("allowed_model_region", ["eu", None, "us"])
def test_router_region_pre_call_check(allowed_model_region):
"""
If region based routing set
- check if only model in allowed region is allowed by '_pre_call_checks'
"""
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
"region_name": allowed_model_region,
},
"model_info": {"id": "1"},
},
{
"model_name": "gpt-3.5-turbo-large", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"model_info": {"id": "2"},
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
_healthy_deployments = router._pre_call_checks(
model="gpt-3.5-turbo",
healthy_deployments=model_list,
messages=[{"role": "user", "content": "Hey!"}],
request_kwargs={"allowed_model_region": allowed_model_region},
)
if allowed_model_region is None:
assert len(_healthy_deployments) == 2
else:
assert len(_healthy_deployments) == 1, "{} models selected as healthy".format(
len(_healthy_deployments)
)
assert (
_healthy_deployments[0]["model_info"]["id"] == "1"
), "Incorrect model id picked. Got id={}, expected id=1".format(
_healthy_deployments[0]["model_info"]["id"]
)
### FUNCTION CALLING
def test_function_calling():
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 10000,
},
]
messages = [{"role": "user", "content": "What is the weather like in Boston?"}]
functions = [
{
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
}
]
router = Router(model_list=model_list)
response = router.completion(
model="gpt-3.5-turbo", messages=messages, functions=functions
)
router.reset()
print(response)
# test_acompletion_on_router()
def test_function_calling_on_router():
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
function1 = [
{
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
}
]
router = Router(
model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
)
messages = [{"role": "user", "content": "what's the weather in boston"}]
response = router.completion(
model="gpt-3.5-turbo", messages=messages, functions=function1
)
print(f"final returned response: {response}")
router.reset()
assert isinstance(response["choices"][0]["message"]["function_call"], dict)
except Exception as e:
print(f"An exception occurred: {e}")
# test_function_calling_on_router()
### IMAGE GENERATION
@pytest.mark.asyncio
async def test_aimg_gen_on_router():
litellm.set_verbose = True
try:
model_list = [
{
"model_name": "dall-e-3",
"litellm_params": {
"model": "dall-e-3",
},
},
{
"model_name": "dall-e-3",
"litellm_params": {
"model": "azure/dall-e-3-test",
"api_version": "2023-12-01-preview",
"api_base": os.getenv("AZURE_SWEDEN_API_BASE"),
"api_key": os.getenv("AZURE_SWEDEN_API_KEY"),
},
},
{
"model_name": "dall-e-2",
"litellm_params": {
"model": "azure/",
"api_version": "2023-06-01-preview",
"api_base": os.getenv("AZURE_API_BASE"),
"api_key": os.getenv("AZURE_API_KEY"),
},
},
]
router = Router(model_list=model_list, num_retries=3)
response = await router.aimage_generation(
model="dall-e-3", prompt="A cute baby sea otter"
)
print(response)
assert len(response.data) > 0
response = await router.aimage_generation(
model="dall-e-2", prompt="A cute baby sea otter"
)
print(response)
assert len(response.data) > 0
router.reset()
except litellm.InternalServerError as e:
pass
except Exception as e:
if "Your task failed as a result of our safety system." in str(e):
pass
elif "Operation polling timed out" in str(e):
pass
elif "Connection error" in str(e):
pass
else:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# asyncio.run(test_aimg_gen_on_router())
def test_img_gen_on_router():
litellm.set_verbose = True
try:
model_list = [
{
"model_name": "dall-e-3",
"litellm_params": {
"model": "dall-e-3",
},
},
{
"model_name": "dall-e-3",
"litellm_params": {
"model": "azure/dall-e-3-test",
"api_version": "2023-12-01-preview",
"api_base": os.getenv("AZURE_SWEDEN_API_BASE"),
"api_key": os.getenv("AZURE_SWEDEN_API_KEY"),
},
},
]
router = Router(model_list=model_list)
response = router.image_generation(
model="dall-e-3", prompt="A cute baby sea otter"
)
print(response)
assert len(response.data) > 0
router.reset()
except litellm.RateLimitError as e:
pass
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# test_img_gen_on_router()
###
def test_aembedding_on_router():
litellm.set_verbose = True
try:
model_list = [
{
"model_name": "text-embedding-ada-002",
"litellm_params": {
"model": "text-embedding-ada-002",
},
"tpm": 100000,
"rpm": 10000,
},
]
router = Router(model_list=model_list)
async def embedding_call():
## Test 1: user facing function
response = await router.aembedding(
model="text-embedding-ada-002",
input=["good morning from litellm", "this is another item"],
)
print(response)
## Test 2: underlying function
response = await router._aembedding(
model="text-embedding-ada-002",
input=["good morning from litellm 2"],
)
print(response)
router.reset()
asyncio.run(embedding_call())
print("\n Making sync Embedding call\n")
## Test 1: user facing function
response = router.embedding(
model="text-embedding-ada-002",
input=["good morning from litellm 2"],
)
print(response)
router.reset()
## Test 2: underlying function
response = router._embedding(
model="text-embedding-ada-002",
input=["good morning from litellm 2"],
)
print(response)
router.reset()
except Exception as e:
if "Your task failed as a result of our safety system." in str(e):
pass
elif "Operation polling timed out" in str(e):
pass
elif "Connection error" in str(e):
pass
else:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# test_aembedding_on_router()
def test_azure_embedding_on_router():
"""
[PROD Use Case] - Makes an aembedding call + embedding call
"""
litellm.set_verbose = True
try:
model_list = [
{
"model_name": "text-embedding-ada-002",
"litellm_params": {
"model": "azure/azure-embedding-model",
"api_key": os.environ["AZURE_API_KEY"],
"api_base": os.environ["AZURE_API_BASE"],
},
"tpm": 100000,
"rpm": 10000,
},
]
router = Router(model_list=model_list)
async def embedding_call():
response = await router.aembedding(
model="text-embedding-ada-002", input=["good morning from litellm"]
)
print(response)
asyncio.run(embedding_call())
print("\n Making sync Azure Embedding call\n")
response = router.embedding(
model="text-embedding-ada-002",
input=["test 2 from litellm. async embedding"],
)
print(response)
router.reset()
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# test_azure_embedding_on_router()
def test_bedrock_on_router():
litellm.set_verbose = True
print("\n Testing bedrock on router\n")
try:
model_list = [
{
"model_name": "claude-v1",
"litellm_params": {
"model": "bedrock/anthropic.claude-instant-v1",
},
"tpm": 100000,
"rpm": 10000,
},
]
async def test():
router = Router(model_list=model_list)
response = await router.acompletion(
model="claude-v1",
messages=[
{
"role": "user",
"content": "hello from litellm test",
}
],
)
print(response)
router.reset()
asyncio.run(test())
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# test_bedrock_on_router()
# test openai-compatible endpoint
@pytest.mark.asyncio
async def test_mistral_on_router():
litellm.set_verbose = True
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "mistral/mistral-small-latest",
},
},
]
router = Router(model_list=model_list)
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello from litellm test",
}
],
)
print(response)
# asyncio.run(test_mistral_on_router())
def test_openai_completion_on_router():
# [PROD Use Case] - Makes an acompletion call + async acompletion call, and sync acompletion call, sync completion + stream
# 4 LLM API calls made here. If it fails, add retries. Do not remove this test.
litellm.set_verbose = True
print("\n Testing OpenAI on router\n")
try:
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
},
},
]
router = Router(model_list=model_list)
async def test():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello from litellm test",
}
],
)
print(response)
assert len(response.choices[0].message.content) > 0
print("\n streaming + acompletion test")
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"hello from litellm test {time.time()}",
}
],
stream=True,
)
complete_response = ""
print(response)
# if you want to see all the attributes and methods
async for chunk in response:
print(chunk)
complete_response += chunk.choices[0].delta.content or ""
print("\n complete response: ", complete_response)
assert len(complete_response) > 0
asyncio.run(test())
print("\n Testing Sync completion calls \n")
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello from litellm test2",
}
],
)
print(response)
assert len(response.choices[0].message.content) > 0
print("\n streaming + completion test")
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "hello from litellm test3",
}
],
stream=True,
)
complete_response = ""
print(response)
for chunk in response:
print(chunk)
complete_response += chunk.choices[0].delta.content or ""
print("\n complete response: ", complete_response)
assert len(complete_response) > 0
router.reset()
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# test_openai_completion_on_router()
def test_model_group_info():
router = Router(
model_list=[
{
"model_name": "command-r-plus",
"litellm_params": {"model": "cohere.command-r-plus-v1:0"},
}
]
)
response = router.get_model_group_info(model_group="command-r-plus")
assert response is not None
def test_consistent_model_id():
"""
- For a given model group + litellm params, assert the model id is always the same
Test on `_generate_model_id`
Test on `set_model_list`
Test on `_add_deployment`
"""
model_group = "gpt-3.5-turbo"
litellm_params = {
"model": "openai/my-fake-model",
"api_key": "my-fake-key",
"api_base": "https://openai-function-calling-workers.tasslexyz.workers.dev/",
"stream_timeout": 0.001,
}
id1 = Router()._generate_model_id(
model_group=model_group, litellm_params=litellm_params
)
id2 = Router()._generate_model_id(
model_group=model_group, litellm_params=litellm_params
)
assert id1 == id2
@pytest.mark.skip(reason="local test")
def test_reading_keys_os_environ():
import openai
try:
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "os.environ/AZURE_API_KEY",
"api_base": "os.environ/AZURE_API_BASE",
"api_version": "os.environ/AZURE_API_VERSION",
"timeout": "os.environ/AZURE_TIMEOUT",
"stream_timeout": "os.environ/AZURE_STREAM_TIMEOUT",
"max_retries": "os.environ/AZURE_MAX_RETRIES",
},
},
]
router = Router(model_list=model_list)
for model in router.model_list:
assert (
model["litellm_params"]["api_key"] == os.environ["AZURE_API_KEY"]
), f"{model['litellm_params']['api_key']} vs {os.environ['AZURE_API_KEY']}"
assert (
model["litellm_params"]["api_base"] == os.environ["AZURE_API_BASE"]
), f"{model['litellm_params']['api_base']} vs {os.environ['AZURE_API_BASE']}"
assert (
model["litellm_params"]["api_version"]
== os.environ["AZURE_API_VERSION"]
), f"{model['litellm_params']['api_version']} vs {os.environ['AZURE_API_VERSION']}"
assert float(model["litellm_params"]["timeout"]) == float(
os.environ["AZURE_TIMEOUT"]
), f"{model['litellm_params']['timeout']} vs {os.environ['AZURE_TIMEOUT']}"
assert float(model["litellm_params"]["stream_timeout"]) == float(
os.environ["AZURE_STREAM_TIMEOUT"]
), f"{model['litellm_params']['stream_timeout']} vs {os.environ['AZURE_STREAM_TIMEOUT']}"
assert int(model["litellm_params"]["max_retries"]) == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{model['litellm_params']['max_retries']} vs {os.environ['AZURE_MAX_RETRIES']}"
print("passed testing of reading keys from os.environ")
model_id = model["model_info"]["id"]
async_client: openai.AsyncAzureOpenAI = router.cache.get_cache(f"{model_id}_async_client") # type: ignore
assert async_client.api_key == os.environ["AZURE_API_KEY"]
assert async_client.base_url == os.environ["AZURE_API_BASE"]
assert async_client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{async_client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert async_client.timeout == int(
os.environ["AZURE_TIMEOUT"]
), f"{async_client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("async client set correctly!")
print("\n Testing async streaming client")
stream_async_client: openai.AsyncAzureOpenAI = router.cache.get_cache(f"{model_id}_stream_async_client") # type: ignore
assert stream_async_client.api_key == os.environ["AZURE_API_KEY"]
assert stream_async_client.base_url == os.environ["AZURE_API_BASE"]
assert stream_async_client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{stream_async_client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert stream_async_client.timeout == int(
os.environ["AZURE_STREAM_TIMEOUT"]
), f"{stream_async_client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("async stream client set correctly!")
print("\n Testing sync client")
client: openai.AzureOpenAI = router.cache.get_cache(f"{model_id}_client") # type: ignore
assert client.api_key == os.environ["AZURE_API_KEY"]
assert client.base_url == os.environ["AZURE_API_BASE"]
assert client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert client.timeout == int(
os.environ["AZURE_TIMEOUT"]
), f"{client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("sync client set correctly!")
print("\n Testing sync stream client")
stream_client: openai.AzureOpenAI = router.cache.get_cache(f"{model_id}_stream_client") # type: ignore
assert stream_client.api_key == os.environ["AZURE_API_KEY"]
assert stream_client.base_url == os.environ["AZURE_API_BASE"]
assert stream_client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{stream_client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert stream_client.timeout == int(
os.environ["AZURE_STREAM_TIMEOUT"]
), f"{stream_client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("sync stream client set correctly!")
router.reset()
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# test_reading_keys_os_environ()
@pytest.mark.skip(reason="local test")
def test_reading_openai_keys_os_environ():
import openai
try:
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "os.environ/OPENAI_API_KEY",
"timeout": "os.environ/AZURE_TIMEOUT",
"stream_timeout": "os.environ/AZURE_STREAM_TIMEOUT",
"max_retries": "os.environ/AZURE_MAX_RETRIES",
},
},
{
"model_name": "text-embedding-ada-002",
"litellm_params": {
"model": "text-embedding-ada-002",
"api_key": "os.environ/OPENAI_API_KEY",
"timeout": "os.environ/AZURE_TIMEOUT",
"stream_timeout": "os.environ/AZURE_STREAM_TIMEOUT",
"max_retries": "os.environ/AZURE_MAX_RETRIES",
},
},
]
router = Router(model_list=model_list)
for model in router.model_list:
assert (
model["litellm_params"]["api_key"] == os.environ["OPENAI_API_KEY"]
), f"{model['litellm_params']['api_key']} vs {os.environ['AZURE_API_KEY']}"
assert float(model["litellm_params"]["timeout"]) == float(
os.environ["AZURE_TIMEOUT"]
), f"{model['litellm_params']['timeout']} vs {os.environ['AZURE_TIMEOUT']}"
assert float(model["litellm_params"]["stream_timeout"]) == float(
os.environ["AZURE_STREAM_TIMEOUT"]
), f"{model['litellm_params']['stream_timeout']} vs {os.environ['AZURE_STREAM_TIMEOUT']}"
assert int(model["litellm_params"]["max_retries"]) == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{model['litellm_params']['max_retries']} vs {os.environ['AZURE_MAX_RETRIES']}"
print("passed testing of reading keys from os.environ")
model_id = model["model_info"]["id"]
async_client: openai.AsyncOpenAI = router.cache.get_cache(key=f"{model_id}_async_client") # type: ignore
assert async_client.api_key == os.environ["OPENAI_API_KEY"]
assert async_client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{async_client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert async_client.timeout == int(
os.environ["AZURE_TIMEOUT"]
), f"{async_client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("async client set correctly!")
print("\n Testing async streaming client")
stream_async_client: openai.AsyncOpenAI = router.cache.get_cache(key=f"{model_id}_stream_async_client") # type: ignore
assert stream_async_client.api_key == os.environ["OPENAI_API_KEY"]
assert stream_async_client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{stream_async_client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert stream_async_client.timeout == int(
os.environ["AZURE_STREAM_TIMEOUT"]
), f"{stream_async_client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("async stream client set correctly!")
print("\n Testing sync client")
client: openai.AzureOpenAI = router.cache.get_cache(key=f"{model_id}_client") # type: ignore
assert client.api_key == os.environ["OPENAI_API_KEY"]
assert client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert client.timeout == int(
os.environ["AZURE_TIMEOUT"]
), f"{client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("sync client set correctly!")
print("\n Testing sync stream client")
stream_client: openai.AzureOpenAI = router.cache.get_cache(key=f"{model_id}_stream_client") # type: ignore
assert stream_client.api_key == os.environ["OPENAI_API_KEY"]
assert stream_client.max_retries == int(
os.environ["AZURE_MAX_RETRIES"]
), f"{stream_client.max_retries} vs {os.environ['AZURE_MAX_RETRIES']}"
assert stream_client.timeout == int(
os.environ["AZURE_STREAM_TIMEOUT"]
), f"{stream_client.timeout} vs {os.environ['AZURE_TIMEOUT']}"
print("sync stream client set correctly!")
router.reset()
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
# test_reading_openai_keys_os_environ()
def test_router_anthropic_key_dynamic():
anthropic_api_key = os.environ.pop("ANTHROPIC_API_KEY")
model_list = [
{
"model_name": "anthropic-claude",
"litellm_params": {
"model": "claude-3-5-haiku-20241022",
"api_key": anthropic_api_key,
},
}
]
router = Router(model_list=model_list)
messages = [{"role": "user", "content": "Hey, how's it going?"}]
router.completion(model="anthropic-claude", messages=messages)
os.environ["ANTHROPIC_API_KEY"] = anthropic_api_key
def test_router_timeout():
litellm.set_verbose = True
import logging
from litellm._logging import verbose_logger
verbose_logger.setLevel(logging.DEBUG)
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "os.environ/OPENAI_API_KEY",
},
}
]
router = Router(model_list=model_list)
messages = [{"role": "user", "content": "Hey, how's it going?"}]
start_time = time.time()
try:
res = router.completion(
model="gpt-3.5-turbo", messages=messages, timeout=0.0001
)
print(res)
pytest.fail("this should have timed out")
except litellm.exceptions.Timeout as e:
print("got timeout exception")
print(e)
print(vars(e))
pass
@pytest.mark.asyncio
async def test_router_amoderation():
model_list = [
{
"model_name": "openai-moderations",
"litellm_params": {
"model": "text-moderation-stable",
"api_key": os.getenv("OPENAI_API_KEY", None),
},
}
]
router = Router(model_list=model_list)
## Test 1: user facing function
result = await router.amoderation(
model="text-moderation-stable", input="this is valid good text"
)
def test_router_add_deployment():
initial_model_list = [
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "openai/my-fake-model",
"api_key": "my-fake-key",
"api_base": "https://openai-function-calling-workers.tasslexyz.workers.dev/",
},
},
]
router = Router(model_list=initial_model_list)
init_model_id_list = router.get_model_ids()
print(f"init_model_id_list: {init_model_id_list}")
router.add_deployment(
deployment=Deployment(
model_name="gpt-instruct",
litellm_params=LiteLLM_Params(model="gpt-3.5-turbo-instruct"),
model_info=ModelInfo(),
)
)
new_model_id_list = router.get_model_ids()
print(f"new_model_id_list: {new_model_id_list}")
assert len(new_model_id_list) > len(init_model_id_list)
assert new_model_id_list[1] != new_model_id_list[0]
@pytest.mark.asyncio
async def test_router_text_completion_client():
# This tests if we re-use the Async OpenAI client
# This test fails when we create a new Async OpenAI client per request
try:
model_list = [
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "text-completion-openai/gpt-3.5-turbo-instruct",
"api_key": os.getenv("OPENAI_API_KEY", None),
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
},
}
]
router = Router(model_list=model_list, debug_level="DEBUG", set_verbose=True)
tasks = []
for _ in range(300):
tasks.append(
router.atext_completion(
model="fake-openai-endpoint",
prompt="hello from litellm test",
)
)
# Execute all coroutines concurrently
responses = await asyncio.gather(*tasks)
print(responses)
except Exception as e:
pytest.fail(f"Error occurred: {e}")
@pytest.fixture
def mock_response() -> litellm.ModelResponse:
return litellm.ModelResponse(
**{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1699896916,
"model": "gpt-3.5-turbo-0125",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "call_abc123",
"type": "function",
"function": {
"name": "get_current_weather",
"arguments": '{\n"location": "Boston, MA"\n}',
},
}
],
},
"logprobs": None,
"finish_reason": "tool_calls",
}
],
"usage": {"prompt_tokens": 5, "completion_tokens": 5, "total_tokens": 10},
}
)
@pytest.mark.asyncio
async def test_router_model_usage(mock_response):
"""
Test if tracking used model tpm works as expected
"""
model = "my-fake-model"
model_tpm = 100
setattr(
mock_response,
"usage",
litellm.Usage(prompt_tokens=5, completion_tokens=5, total_tokens=10),
)
print(f"mock_response: {mock_response}")
model_tpm = 100
llm_router = Router(
model_list=[
{
"model_name": model,
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-key",
"api_base": "my-base",
"tpm": model_tpm,
"mock_response": mock_response,
},
}
]
)
allowed_fails = 1 # allow for changing b/w minutes
for _ in range(2):
try:
_ = await llm_router.acompletion(
model=model, messages=[{"role": "user", "content": "Hey!"}]
)
await asyncio.sleep(3)
initial_usage_tuple = await llm_router.get_model_group_usage(
model_group=model
)
initial_usage = initial_usage_tuple[0]
# completion call - 10 tokens
_ = await llm_router.acompletion(
model=model, messages=[{"role": "user", "content": "Hey!"}]
)
await asyncio.sleep(3)
updated_usage_tuple = await llm_router.get_model_group_usage(
model_group=model
)
updated_usage = updated_usage_tuple[0]
assert updated_usage == initial_usage + 10 # type: ignore
break
except Exception as e:
if allowed_fails > 0:
print(
f"Decrementing allowed_fails: {allowed_fails}.\nReceived error - {str(e)}"
)
allowed_fails -= 1
else:
print(f"allowed_fails: {allowed_fails}")
raise e
@pytest.mark.skip(reason="Check if this is causing ci/cd issues.")
@pytest.mark.asyncio
async def test_is_proxy_set():
"""
Assert if proxy is set
"""
from httpx import AsyncHTTPTransport
os.environ["HTTPS_PROXY"] = "https://proxy.example.com:8080"
from openai import AsyncAzureOpenAI
# Function to check if a proxy is set on the client
# Function to check if a proxy is set on the client
def check_proxy(client: httpx.AsyncClient) -> bool:
print(f"client._mounts: {client._mounts}")
assert len(client._mounts) == 1
for k, v in client._mounts.items():
assert isinstance(v, AsyncHTTPTransport)
return True
llm_router = Router(
model_list=[
{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-3.5-turbo",
"api_key": "my-key",
"api_base": "my-base",
"mock_response": "hello world",
},
"model_info": {"id": "1"},
}
]
)
_deployment = llm_router.get_deployment(model_id="1")
model_client: AsyncAzureOpenAI = llm_router._get_client(
deployment=_deployment, kwargs={}, client_type="async"
) # type: ignore
assert check_proxy(client=model_client._client)
@pytest.mark.parametrize(
"model, base_model, llm_provider",
[
("azure/gpt-4", None, "azure"),
("azure/gpt-4", "azure/gpt-4-0125-preview", "azure"),
("gpt-4", None, "openai"),
],
)
def test_router_get_model_info(model, base_model, llm_provider):
"""
Test if router get model info works based on provider
For azure -> only if base model set
For openai -> use model=
"""
router = Router(
model_list=[
{
"model_name": "gpt-4",
"litellm_params": {
"model": model,
"api_key": "my-fake-key",
"api_base": "my-fake-base",
},
"model_info": {"base_model": base_model, "id": "1"},
}
]
)
deployment = router.get_deployment(model_id="1")
assert deployment is not None
if llm_provider == "openai" or (base_model is not None and llm_provider == "azure"):
router.get_router_model_info(
deployment=deployment.to_json(), received_model_name=model
)
else:
try:
router.get_router_model_info(
deployment=deployment.to_json(), received_model_name=model
)
pytest.fail("Expected this to raise model not mapped error")
except Exception as e:
if "This model isn't mapped yet" in str(e):
pass
@pytest.mark.parametrize(
"model, base_model, llm_provider",
[
("azure/gpt-4", None, "azure"),
("azure/gpt-4", "azure/gpt-4-0125-preview", "azure"),
("gpt-4", None, "openai"),
],
)
def test_router_context_window_pre_call_check(model, base_model, llm_provider):
"""
- For an azure model
- if no base model set
- don't enforce context window limits
"""
try:
model_list = [
{
"model_name": "gpt-4",
"litellm_params": {
"model": model,
"api_key": "my-fake-key",
"api_base": "my-fake-base",
},
"model_info": {"base_model": base_model, "id": "1"},
}
]
router = Router(
model_list=model_list,
set_verbose=True,
enable_pre_call_checks=True,
num_retries=0,
)
litellm.token_counter = MagicMock()
def token_counter_side_effect(*args, **kwargs):
# Process args and kwargs if needed
return 1000000
litellm.token_counter.side_effect = token_counter_side_effect
try:
updated_list = router._pre_call_checks(
model="gpt-4",
healthy_deployments=model_list,
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
if llm_provider == "azure" and base_model is None:
assert len(updated_list) == 1
else:
pytest.fail("Expected to raise an error. Got={}".format(updated_list))
except Exception as e:
if (
llm_provider == "azure" and base_model is not None
) or llm_provider == "openai":
pass
except Exception as e:
pytest.fail(f"Got unexpected exception on router! - {str(e)}")
def test_router_cooldown_api_connection_error():
try:
_ = litellm.completion(
model="vertex_ai/gemini-1.5-pro",
messages=[{"role": "admin", "content": "Fail on this!"}],
)
except litellm.APIConnectionError as e:
assert (
Router()._is_cooldown_required(
model_id="", exception_status=e.code, exception_str=str(e)
)
is False
)
router = Router(
model_list=[
{
"model_name": "gemini-1.5-pro",
"litellm_params": {"model": "vertex_ai/gemini-1.5-pro"},
}
]
)
try:
router.completion(
model="gemini-1.5-pro",
messages=[{"role": "admin", "content": "Fail on this!"}],
)
except litellm.APIConnectionError:
pass
try:
router.completion(
model="gemini-1.5-pro",
messages=[{"role": "admin", "content": "Fail on this!"}],
)
except litellm.APIConnectionError:
pass
try:
router.completion(
model="gemini-1.5-pro",
messages=[{"role": "admin", "content": "Fail on this!"}],
)
except litellm.APIConnectionError:
pass
def test_router_correctly_reraise_error():
"""
User feedback: There is a problem with my messages array, but the error exception thrown is a Rate Limit error.
```
Rate Limit: Error code: 429 - {'error': {'message': 'No deployments available for selected model, Try again in 60 seconds. Passed model=gemini-1.5-flash..
```
What they want? Propagation of the real error.
"""
router = Router(
model_list=[
{
"model_name": "gemini-1.5-pro",
"litellm_params": {
"model": "vertex_ai/gemini-1.5-pro",
"mock_response": "litellm.RateLimitError",
},
}
]
)
try:
router.completion(
model="gemini-1.5-pro",
messages=[{"role": "admin", "content": "Fail on this!"}],
)
except litellm.RateLimitError:
pass
def test_router_dynamic_cooldown_correct_retry_after_time():
"""
User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds"
but Azure says to retry in at most 9s
```
{"message": "litellm.proxy.proxy_server.embeddings(): Exception occured - No deployments available for selected model, Try again in 60 seconds. Passed model=text-embedding-ada-002. pre-call-checks=False, allowed_model_region=n/a, cooldown_list=[('b49cbc9314273db7181fe69b1b19993f04efb88f2c1819947c538bac08097e4c', {'Exception Received': 'litellm.RateLimitError: AzureException RateLimitError - Requests to the Embeddings_Create Operation under Azure OpenAI API version 2023-09-01-preview have exceeded call rate limit of your current OpenAI S0 pricing tier. Please retry after 9 seconds. Please go here: https://aka.ms/oai/quotaincrease if you would like to further increase the default rate limit.', 'Status Code': '429'})]", "level": "ERROR", "timestamp": "2024-08-22T03:25:36.900476"}
```
"""
router = Router(
model_list=[
{
"model_name": "text-embedding-ada-002",
"litellm_params": {
"model": "openai/text-embedding-ada-002",
},
}
]
)
openai_client = openai.OpenAI(api_key="")
cooldown_time = 30
def _return_exception(*args, **kwargs):
from httpx import Headers, Request, Response
kwargs = {
"request": Request("POST", "https://www.google.com"),
"message": "Error code: 429 - Rate Limit Error!",
"body": {"detail": "Rate Limit Error!"},
"code": None,
"param": None,
"type": None,
"response": Response(
status_code=429,
headers=Headers(
{
"date": "Sat, 21 Sep 2024 22:56:53 GMT",
"server": "uvicorn",
"retry-after": f"{cooldown_time}",
"content-length": "30",
"content-type": "application/json",
}
),
request=Request("POST", "http://0.0.0.0:9000/chat/completions"),
),
"status_code": 429,
"request_id": None,
}
exception = Exception()
for k, v in kwargs.items():
setattr(exception, k, v)
raise exception
with patch.object(
openai_client.embeddings.with_raw_response,
"create",
side_effect=_return_exception,
):
new_retry_after_mock_client = MagicMock(return_value=-1)
litellm.utils._get_retry_after_from_exception_header = (
new_retry_after_mock_client
)
try:
router.embedding(
model="text-embedding-ada-002",
input="Hello world!",
client=openai_client,
)
except litellm.RateLimitError:
pass
new_retry_after_mock_client.assert_called()
print(
f"new_retry_after_mock_client.call_args.kwargs: {new_retry_after_mock_client.call_args.kwargs}"
)
print(
f"new_retry_after_mock_client.call_args: {new_retry_after_mock_client.call_args[0][0]}"
)
response_headers: httpx.Headers = new_retry_after_mock_client.call_args[0][0]
assert int(response_headers["retry-after"]) == cooldown_time
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_aaarouter_dynamic_cooldown_message_retry_time(sync_mode):
"""
User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds"
but Azure says to retry in at most 9s
```
{"message": "litellm.proxy.proxy_server.embeddings(): Exception occured - No deployments available for selected model, Try again in 60 seconds. Passed model=text-embedding-ada-002. pre-call-checks=False, allowed_model_region=n/a, cooldown_list=[('b49cbc9314273db7181fe69b1b19993f04efb88f2c1819947c538bac08097e4c', {'Exception Received': 'litellm.RateLimitError: AzureException RateLimitError - Requests to the Embeddings_Create Operation under Azure OpenAI API version 2023-09-01-preview have exceeded call rate limit of your current OpenAI S0 pricing tier. Please retry after 9 seconds. Please go here: https://aka.ms/oai/quotaincrease if you would like to further increase the default rate limit.', 'Status Code': '429'})]", "level": "ERROR", "timestamp": "2024-08-22T03:25:36.900476"}
```
"""
litellm.set_verbose = True
cooldown_time = 30.0
router = Router(
model_list=[
{
"model_name": "text-embedding-ada-002",
"litellm_params": {
"model": "openai/text-embedding-ada-002",
},
},
{
"model_name": "text-embedding-ada-002",
"litellm_params": {
"model": "openai/text-embedding-ada-002",
},
},
],
set_verbose=True,
debug_level="DEBUG",
cooldown_time=cooldown_time,
)
openai_client = openai.OpenAI(api_key="")
def _return_exception(*args, **kwargs):
from httpx import Headers, Request, Response
kwargs = {
"request": Request("POST", "https://www.google.com"),
"message": "Error code: 429 - Rate Limit Error!",
"body": {"detail": "Rate Limit Error!"},
"code": None,
"param": None,
"type": None,
"response": Response(
status_code=429,
headers=Headers(
{
"date": "Sat, 21 Sep 2024 22:56:53 GMT",
"server": "uvicorn",
"retry-after": f"{cooldown_time}",
"content-length": "30",
"content-type": "application/json",
}
),
request=Request("POST", "http://0.0.0.0:9000/chat/completions"),
),
"status_code": 429,
"request_id": None,
}
exception = Exception()
for k, v in kwargs.items():
setattr(exception, k, v)
raise exception
with patch.object(
openai_client.embeddings.with_raw_response,
"create",
side_effect=_return_exception,
):
for _ in range(1):
try:
if sync_mode:
router.embedding(
model="text-embedding-ada-002",
input="Hello world!",
client=openai_client,
)
else:
await router.aembedding(
model="text-embedding-ada-002",
input="Hello world!",
client=openai_client,
)
except litellm.RateLimitError:
pass
await asyncio.sleep(2)
if sync_mode:
cooldown_deployments = _get_cooldown_deployments(
litellm_router_instance=router, parent_otel_span=None
)
else:
cooldown_deployments = await _async_get_cooldown_deployments(
litellm_router_instance=router, parent_otel_span=None
)
print(
"Cooldown deployments - {}\n{}".format(
cooldown_deployments, len(cooldown_deployments)
)
)
assert len(cooldown_deployments) > 0
exception_raised = False
try:
if sync_mode:
router.embedding(
model="text-embedding-ada-002",
input="Hello world!",
client=openai_client,
)
else:
await router.aembedding(
model="text-embedding-ada-002",
input="Hello world!",
client=openai_client,
)
except litellm.types.router.RouterRateLimitError as e:
print(e)
exception_raised = True
assert e.cooldown_time == cooldown_time
assert exception_raised
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio()
@pytest.mark.flaky(retries=6, delay=1)
async def test_router_weighted_pick(sync_mode):
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"weight": 2,
"mock_response": "Hello world 1!",
},
"model_info": {"id": "1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"weight": 1,
"mock_response": "Hello world 2!",
},
"model_info": {"id": "2"},
},
]
)
model_id_1_count = 0
model_id_2_count = 0
for _ in range(50):
# make 50 calls. expect model id 1 to be picked more than model id 2
if sync_mode:
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello world!"}],
)
else:
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello world!"}],
)
model_id = int(response._hidden_params["model_id"])
if model_id == 1:
model_id_1_count += 1
elif model_id == 2:
model_id_2_count += 1
else:
raise Exception("invalid model id returned!")
assert model_id_1_count > model_id_2_count
@pytest.mark.skip(reason="Hit azure batch quota limits")
@pytest.mark.parametrize("provider", ["azure"])
@pytest.mark.asyncio
async def test_router_batch_endpoints(provider):
"""
1. Create File for Batch completion
2. Create Batch Request
3. Retrieve the specific batch
"""
print("Testing async create batch")
router = Router(
model_list=[
{
"model_name": "my-custom-name",
"litellm_params": {
"model": "azure/gpt-4o-mini",
"api_base": os.getenv("AZURE_API_BASE"),
"api_key": os.getenv("AZURE_API_KEY"),
},
},
]
)
file_name = "openai_batch_completions_router.jsonl"
_current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(_current_dir, file_name)
file_obj = await router.acreate_file(
model="my-custom-name",
file=open(file_path, "rb"),
purpose="batch",
custom_llm_provider=provider,
)
print("Response from creating file=", file_obj)
## TEST 2 - test underlying create_file function
file_obj = await router._acreate_file(
model="my-custom-name",
file=open(file_path, "rb"),
purpose="batch",
custom_llm_provider=provider,
)
print("Response from creating file=", file_obj)
await asyncio.sleep(10)
batch_input_file_id = file_obj.id
assert (
batch_input_file_id is not None
), "Failed to create file, expected a non null file_id but got {batch_input_file_id}"
create_batch_response = await router.acreate_batch(
model="my-custom-name",
completion_window="24h",
endpoint="/v1/chat/completions",
input_file_id=batch_input_file_id,
custom_llm_provider=provider,
metadata={"key1": "value1", "key2": "value2"},
)
## TEST 2 - test underlying create_batch function
create_batch_response = await router._acreate_batch(
model="my-custom-name",
completion_window="24h",
endpoint="/v1/chat/completions",
input_file_id=batch_input_file_id,
custom_llm_provider=provider,
metadata={"key1": "value1", "key2": "value2"},
)
print("response from router.create_batch=", create_batch_response)
assert (
create_batch_response.id is not None
), f"Failed to create batch, expected a non null batch_id but got {create_batch_response.id}"
assert (
create_batch_response.endpoint == "/v1/chat/completions"
or create_batch_response.endpoint == "/chat/completions"
), f"Failed to create batch, expected endpoint to be /v1/chat/completions but got {create_batch_response.endpoint}"
assert (
create_batch_response.input_file_id == batch_input_file_id
), f"Failed to create batch, expected input_file_id to be {batch_input_file_id} but got {create_batch_response.input_file_id}"
await asyncio.sleep(1)
retrieved_batch = await router.aretrieve_batch(
batch_id=create_batch_response.id,
custom_llm_provider=provider,
)
print("retrieved batch=", retrieved_batch)
# just assert that we retrieved a non None batch
assert retrieved_batch.id == create_batch_response.id
# list all batches
list_batches = await router.alist_batches(
model="my-custom-name", custom_llm_provider=provider, limit=2
)
print("list_batches=", list_batches)
@pytest.mark.parametrize("hidden", [True, False])
def test_model_group_alias(hidden):
_model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo"},
},
{"model_name": "gpt-4", "litellm_params": {"model": "gpt-4"}},
]
router = Router(
model_list=_model_list,
model_group_alias={
"gpt-4.5-turbo": {"model": "gpt-3.5-turbo", "hidden": hidden}
},
)
models = router.get_model_list()
model_names = router.get_model_names()
if hidden:
assert len(models) == len(_model_list)
assert len(model_names) == len(_model_list)
else:
assert len(models) == len(_model_list) + 1
assert len(model_names) == len(_model_list) + 1
# @pytest.mark.parametrize("on_error", [True, False])
# @pytest.mark.asyncio
# async def test_router_response_headers(on_error):
# router = Router(
# model_list=[
# {
# "model_name": "gpt-3.5-turbo",
# "litellm_params": {
# "model": "azure/chatgpt-v-2",
# "api_key": os.getenv("AZURE_API_KEY"),
# "api_base": os.getenv("AZURE_API_BASE"),
# "tpm": 100000,
# "rpm": 100000,
# },
# },
# {
# "model_name": "gpt-3.5-turbo",
# "litellm_params": {
# "model": "azure/chatgpt-v-2",
# "api_key": os.getenv("AZURE_API_KEY"),
# "api_base": os.getenv("AZURE_API_BASE"),
# "tpm": 500,
# "rpm": 500,
# },
# },
# ]
# )
# response = await router.acompletion(
# model="gpt-3.5-turbo",
# messages=[{"role": "user", "content": "Hello world!"}],
# mock_testing_rate_limit_error=on_error,
# )
# response_headers = response._hidden_params["additional_headers"]
# print(response_headers)
# assert response_headers["x-ratelimit-limit-requests"] == 100500
# assert int(response_headers["x-ratelimit-remaining-requests"]) > 0
# assert response_headers["x-ratelimit-limit-tokens"] == 100500
# assert int(response_headers["x-ratelimit-remaining-tokens"]) > 0