litellm-mirror/tests/local_testing/test_router_caching.py
Krish Dholakia 1ea046cc61
test: update tests to new deployment model (#10142)
* test: update tests to new deployment model

* test: update model name

* test: skip cohere rbac issue test

* test: update test - replace gpt-4o model
2025-04-18 14:22:12 -07:00

360 lines
12 KiB
Python

#### What this tests ####
# This tests caching on the router
import asyncio
import os
import sys
import time
import traceback
from unittest.mock import patch
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import litellm
from litellm import Router
from litellm.caching import RedisCache, RedisClusterCache
## Scenarios
## 1. 2 models - openai + azure - 1 model group "gpt-3.5-turbo",
## 2. 2 models - openai, azure - 2 diff model groups, 1 caching group
@pytest.mark.asyncio
async def test_router_async_caching_with_ssl_url():
"""
Tests when a redis url is passed to the router, if caching is correctly setup
"""
try:
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 10000,
},
],
redis_url=os.getenv("REDIS_SSL_URL"),
)
response = await router.cache.redis_cache.ping()
print(f"response: {response}")
assert response == True
except Exception as e:
pytest.fail(f"An exception occurred - {str(e)}")
def test_router_sync_caching_with_ssl_url():
"""
Tests when a redis url is passed to the router, if caching is correctly setup
"""
try:
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 10000,
},
],
redis_url=os.getenv("REDIS_SSL_URL"),
)
response = router.cache.redis_cache.sync_ping()
print(f"response: {response}")
assert response == True
except Exception as e:
pytest.fail(f"An exception occurred - {str(e)}")
@pytest.mark.asyncio
@pytest.mark.flaky(retries=3, delay=1)
async def test_acompletion_caching_on_router():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 10000,
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-3",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
},
"tpm": 100000,
"rpm": 10000,
},
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(
model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
cache_responses=True,
timeout=30,
routing_strategy="simple-shuffle",
)
response1 = await router.acompletion(
model="gpt-3.5-turbo", messages=messages, temperature=1
)
print(f"response1: {response1}")
await asyncio.sleep(5) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(
model="gpt-3.5-turbo", messages=messages, temperature=1
)
print(f"response2: {response2}")
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert (
response1.choices[0].message.content == response2.choices[0].message.content
)
router.reset()
except litellm.Timeout as e:
end_time = time.time()
print(f"timeout error occurred: {end_time - start_time}")
pass
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
@pytest.mark.asyncio
@pytest.mark.flaky(retries=3, delay=1)
async def test_completion_caching_on_router():
# tests completion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 1000,
"rpm": 1,
},
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
router = Router(
model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
cache_responses=True,
timeout=30,
routing_strategy_args={"ttl": 10},
routing_strategy="usage-based-routing",
)
response1 = await router.acompletion(
model="gpt-3.5-turbo", messages=messages, temperature=1
)
print(f"response1: {response1}")
await asyncio.sleep(10)
response2 = await router.acompletion(
model="gpt-3.5-turbo", messages=messages, temperature=1
)
print(f"response2: {response2}")
assert len(response1.choices[0].message.content) > 0
assert len(response2.choices[0].message.content) > 0
router.reset()
except litellm.Timeout as e:
pass
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
@pytest.mark.asyncio
async def test_acompletion_caching_with_ttl_on_router():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 10000,
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-3",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
},
"tpm": 100000,
"rpm": 10000,
},
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(
model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
cache_responses=True,
timeout=30,
routing_strategy="simple-shuffle",
)
response1 = await router.acompletion(
model="gpt-3.5-turbo", messages=messages, temperature=1, ttl=0
)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(
model="gpt-3.5-turbo", messages=messages, temperature=1, ttl=0
)
print(f"response2: {response2}")
assert response1.id != response2.id
assert len(response1.choices[0].message.content) > 0
assert (
response1.choices[0].message.content != response2.choices[0].message.content
)
router.reset()
except litellm.Timeout as e:
end_time = time.time()
print(f"timeout error occurred: {end_time - start_time}")
pass
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
@pytest.mark.asyncio
async def test_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"mock_response": "Hello world",
},
"tpm": 100000,
"rpm": 10000,
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-3",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
},
"tpm": 100000,
"rpm": 10000,
},
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(
model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
cache_responses=True,
timeout=30,
routing_strategy="simple-shuffle",
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")],
)
response1 = await router.acompletion(
model="openai-gpt-3.5-turbo", messages=messages, temperature=1
)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(
model="azure-gpt-3.5-turbo", messages=messages, temperature=1
)
print(f"response2: {response2}")
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert (
response1.choices[0].message.content == response2.choices[0].message.content
)
router.reset()
except litellm.Timeout as e:
end_time = time.time()
print(f"timeout error occurred: {end_time - start_time}")
pass
except Exception as e:
traceback.print_exc()
pytest.fail(f"Error occurred: {e}")
@pytest.mark.parametrize(
"startup_nodes, expected_cache_type",
[
pytest.param(
[dict(host="node1.localhost", port=6379)],
RedisClusterCache,
id="Expects a RedisClusterCache instance when startup_nodes provided",
),
pytest.param(
None,
RedisCache,
id="Expects a RedisCache instance when there is no startup nodes",
),
],
)
def test_create_correct_redis_cache_instance(
startup_nodes: list[dict] | None,
expected_cache_type: type[RedisClusterCache | RedisCache],
):
cache_config = dict(
host="mockhost",
port=6379,
password="mock-password",
startup_nodes=startup_nodes,
)
def _mock_redis_cache_init(*args, **kwargs): ...
with patch.object(RedisCache, "__init__", _mock_redis_cache_init):
redis_cache = Router._create_redis_cache(cache_config)
assert isinstance(redis_cache, expected_cache_type)