forked from phoenix/litellm-mirror
Merge pull request #2798 from CLARKBENHAM/main
add test for rate limits - Router isn't coroutine safe
This commit is contained in:
commit
9119858f4a
3 changed files with 236 additions and 25 deletions
|
@ -67,8 +67,8 @@ class LowestLatencyLoggingHandler(CustomLogger):
|
|||
{
|
||||
{model_group}_map: {
|
||||
id: {
|
||||
"latency": [..]
|
||||
f"{date:hour:minute}" : {"tpm": 34, "rpm": 3}
|
||||
"latency": [..]
|
||||
f"{date:hour:minute}" : {"tpm": 34, "rpm": 3}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -151,8 +151,8 @@ class LowestLatencyLoggingHandler(CustomLogger):
|
|||
{
|
||||
{model_group}_map: {
|
||||
id: {
|
||||
"latency": [..]
|
||||
f"{date:hour:minute}" : {"tpm": 34, "rpm": 3}
|
||||
"latency": [..]
|
||||
f"{date:hour:minute}" : {"tpm": 34, "rpm": 3}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -298,6 +298,4 @@ class LowestLatencyLoggingHandler(CustomLogger):
|
|||
elif item_latency < lowest_latency:
|
||||
lowest_latency = item_latency
|
||||
deployment = _deployment
|
||||
if deployment is None:
|
||||
deployment = random.choice(healthy_deployments)
|
||||
return deployment
|
||||
|
|
|
@ -14,7 +14,6 @@ sys.path.insert(
|
|||
) # Adds the parent directory to the system path
|
||||
import pytest
|
||||
from litellm import Router
|
||||
import litellm
|
||||
from litellm.router_strategy.lowest_latency import LowestLatencyLoggingHandler
|
||||
from litellm.caching import DualCache
|
||||
|
||||
|
@ -173,10 +172,36 @@ def test_get_available_deployments():
|
|||
)
|
||||
|
||||
|
||||
# test_get_available_deployments()
|
||||
async def _deploy(lowest_latency_logger, deployment_id, tokens_used, duration):
|
||||
kwargs = {
|
||||
"litellm_params": {
|
||||
"metadata": {
|
||||
"model_group": "gpt-3.5-turbo",
|
||||
"deployment": "azure/chatgpt-v-2",
|
||||
},
|
||||
"model_info": {"id": deployment_id},
|
||||
}
|
||||
}
|
||||
start_time = time.time()
|
||||
response_obj = {"usage": {"total_tokens": tokens_used}}
|
||||
time.sleep(duration)
|
||||
end_time = time.time()
|
||||
lowest_latency_logger.log_success_event(
|
||||
response_obj=response_obj,
|
||||
kwargs=kwargs,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
|
||||
|
||||
def test_get_available_endpoints_tpm_rpm_check():
|
||||
async def _gather_deploy(all_deploys):
|
||||
return await asyncio.gather(*[_deploy(*t) for t in all_deploys])
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ans_rpm", [1, 5]
|
||||
) # 1 should produce nothing, 10 should select first
|
||||
def test_get_available_endpoints_tpm_rpm_check_async(ans_rpm):
|
||||
"""
|
||||
Pass in list of 2 valid models
|
||||
|
||||
|
@ -185,16 +210,68 @@ def test_get_available_endpoints_tpm_rpm_check():
|
|||
assert that only the valid model is returned
|
||||
"""
|
||||
test_cache = DualCache()
|
||||
ans = "1234"
|
||||
non_ans_rpm = 3
|
||||
assert ans_rpm != non_ans_rpm, "invalid test"
|
||||
if ans_rpm < non_ans_rpm:
|
||||
ans = None
|
||||
model_list = [
|
||||
{
|
||||
"model_name": "gpt-3.5-turbo",
|
||||
"litellm_params": {"model": "azure/chatgpt-v-2"},
|
||||
"model_info": {"id": "1234", "rpm": 10},
|
||||
"model_info": {"id": "1234", "rpm": ans_rpm},
|
||||
},
|
||||
{
|
||||
"model_name": "gpt-3.5-turbo",
|
||||
"litellm_params": {"model": "azure/chatgpt-v-2"},
|
||||
"model_info": {"id": "5678", "rpm": 3},
|
||||
"model_info": {"id": "5678", "rpm": non_ans_rpm},
|
||||
},
|
||||
]
|
||||
lowest_latency_logger = LowestLatencyLoggingHandler(
|
||||
router_cache=test_cache, model_list=model_list
|
||||
)
|
||||
model_group = "gpt-3.5-turbo"
|
||||
d1 = [(lowest_latency_logger, "1234", 50, 0.01)] * non_ans_rpm
|
||||
d2 = [(lowest_latency_logger, "5678", 50, 0.01)] * non_ans_rpm
|
||||
asyncio.run(_gather_deploy([*d1, *d2]))
|
||||
## CHECK WHAT'S SELECTED ##
|
||||
d_ans = lowest_latency_logger.get_available_deployments(
|
||||
model_group=model_group, healthy_deployments=model_list
|
||||
)
|
||||
print(d_ans)
|
||||
assert (d_ans and d_ans["model_info"]["id"]) == ans
|
||||
|
||||
|
||||
# test_get_available_endpoints_tpm_rpm_check_async()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ans_rpm", [1, 5]
|
||||
) # 1 should produce nothing, 10 should select first
|
||||
def test_get_available_endpoints_tpm_rpm_check(ans_rpm):
|
||||
"""
|
||||
Pass in list of 2 valid models
|
||||
|
||||
Update cache with 1 model clearly being at tpm/rpm limit
|
||||
|
||||
assert that only the valid model is returned
|
||||
"""
|
||||
test_cache = DualCache()
|
||||
ans = "1234"
|
||||
non_ans_rpm = 3
|
||||
assert ans_rpm != non_ans_rpm, "invalid test"
|
||||
if ans_rpm < non_ans_rpm:
|
||||
ans = None
|
||||
model_list = [
|
||||
{
|
||||
"model_name": "gpt-3.5-turbo",
|
||||
"litellm_params": {"model": "azure/chatgpt-v-2"},
|
||||
"model_info": {"id": "1234", "rpm": ans_rpm},
|
||||
},
|
||||
{
|
||||
"model_name": "gpt-3.5-turbo",
|
||||
"litellm_params": {"model": "azure/chatgpt-v-2"},
|
||||
"model_info": {"id": "5678", "rpm": non_ans_rpm},
|
||||
},
|
||||
]
|
||||
lowest_latency_logger = LowestLatencyLoggingHandler(
|
||||
|
@ -212,10 +289,10 @@ def test_get_available_endpoints_tpm_rpm_check():
|
|||
"model_info": {"id": deployment_id},
|
||||
}
|
||||
}
|
||||
for _ in range(3):
|
||||
for _ in range(non_ans_rpm):
|
||||
start_time = time.time()
|
||||
response_obj = {"usage": {"total_tokens": 50}}
|
||||
time.sleep(0.05)
|
||||
time.sleep(0.01)
|
||||
end_time = time.time()
|
||||
lowest_latency_logger.log_success_event(
|
||||
response_obj=response_obj,
|
||||
|
@ -234,10 +311,10 @@ def test_get_available_endpoints_tpm_rpm_check():
|
|||
"model_info": {"id": deployment_id},
|
||||
}
|
||||
}
|
||||
for _ in range(3):
|
||||
for _ in range(non_ans_rpm):
|
||||
start_time = time.time()
|
||||
response_obj = {"usage": {"total_tokens": 20}}
|
||||
time.sleep(2)
|
||||
time.sleep(0.5)
|
||||
end_time = time.time()
|
||||
lowest_latency_logger.log_success_event(
|
||||
response_obj=response_obj,
|
||||
|
@ -247,17 +324,11 @@ def test_get_available_endpoints_tpm_rpm_check():
|
|||
)
|
||||
|
||||
## CHECK WHAT'S SELECTED ##
|
||||
print(
|
||||
lowest_latency_logger.get_available_deployments(
|
||||
model_group=model_group, healthy_deployments=model_list
|
||||
)
|
||||
)
|
||||
assert (
|
||||
lowest_latency_logger.get_available_deployments(
|
||||
model_group=model_group, healthy_deployments=model_list
|
||||
)["model_info"]["id"]
|
||||
== "1234"
|
||||
d_ans = lowest_latency_logger.get_available_deployments(
|
||||
model_group=model_group, healthy_deployments=model_list
|
||||
)
|
||||
print(d_ans)
|
||||
assert (d_ans and d_ans["model_info"]["id"]) == ans
|
||||
|
||||
|
||||
def test_router_get_available_deployments():
|
||||
|
|
142
tests/test_ratelimit.py
Normal file
142
tests/test_ratelimit.py
Normal file
|
@ -0,0 +1,142 @@
|
|||
# %%
|
||||
import asyncio
|
||||
import os
|
||||
import pytest
|
||||
import random
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
from litellm import utils, Router
|
||||
|
||||
COMPLETION_TOKENS = 5
|
||||
base_model_list = [
|
||||
{
|
||||
"model_name": "gpt-3.5-turbo",
|
||||
"litellm_params": {
|
||||
"model": "gpt-3.5-turbo",
|
||||
"api_key": os.getenv("OPENAI_API_KEY"),
|
||||
"max_tokens": COMPLETION_TOKENS,
|
||||
},
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
class RouterConfig(BaseModel):
|
||||
rpm: int
|
||||
tpm: int
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def router_factory():
|
||||
def create_router(rpm, tpm, routing_strategy):
|
||||
model_list = base_model_list.copy()
|
||||
model_list[0]["rpm"] = rpm
|
||||
model_list[0]["tpm"] = tpm
|
||||
return Router(
|
||||
model_list=model_list,
|
||||
routing_strategy=routing_strategy,
|
||||
debug_level="DEBUG",
|
||||
)
|
||||
|
||||
return create_router
|
||||
|
||||
|
||||
def generate_list_of_messages(num_messages):
|
||||
"""
|
||||
create num_messages new chat conversations
|
||||
"""
|
||||
return [
|
||||
[{"role": "user", "content": f"{i}. Hey, how's it going? {random.random()}"}]
|
||||
for i in range(num_messages)
|
||||
]
|
||||
|
||||
|
||||
def calculate_limits(list_of_messages):
|
||||
"""
|
||||
Return the min rpm and tpm level that would let all messages in list_of_messages be sent this minute
|
||||
"""
|
||||
rpm = len(list_of_messages)
|
||||
tpm = sum(
|
||||
(utils.token_counter(messages=m) + COMPLETION_TOKENS for m in list_of_messages)
|
||||
)
|
||||
return rpm, tpm
|
||||
|
||||
|
||||
async def async_call(router: Router, list_of_messages) -> Any:
|
||||
tasks = [
|
||||
router.acompletion(model="gpt-3.5-turbo", messages=m) for m in list_of_messages
|
||||
]
|
||||
return await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
def sync_call(router: Router, list_of_messages) -> Any:
|
||||
return [
|
||||
router.completion(model="gpt-3.5-turbo", messages=m) for m in list_of_messages
|
||||
]
|
||||
|
||||
|
||||
class ExpectNoException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"num_try_send, num_allowed_send",
|
||||
[
|
||||
(2, 2), # sending as many as allowed, ExpectNoException
|
||||
# (10, 10), # sending as many as allowed, ExpectNoException
|
||||
(3, 2), # Sending more than allowed, ValueError
|
||||
# (10, 9), # Sending more than allowed, ValueError
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"sync_mode", [True, False]
|
||||
) # Use parametrization for sync/async
|
||||
@pytest.mark.parametrize(
|
||||
"routing_strategy",
|
||||
[
|
||||
"usage-based-routing",
|
||||
# "simple-shuffle", # dont expect to rate limit
|
||||
# "least-busy", # dont expect to rate limit
|
||||
# "latency-based-routing",
|
||||
],
|
||||
)
|
||||
def test_rate_limit(
|
||||
router_factory, num_try_send, num_allowed_send, sync_mode, routing_strategy
|
||||
):
|
||||
"""
|
||||
Check if router.completion and router.acompletion can send more messages than they've been limited to.
|
||||
Args:
|
||||
router_factory: makes new router object, without any shared Global state
|
||||
num_try_send (int): number of messages to try to send
|
||||
num_allowed_send (int): max number of messages allowed to be sent in 1 minute
|
||||
sync_mode (bool): if making sync (router.completion) or async (router.acompletion)
|
||||
Raises:
|
||||
ValueError: Error router throws when it hits rate limits
|
||||
ExpectNoException: Signfies that no other error has happened. A NOP
|
||||
"""
|
||||
# Can send more messages then we're going to; so don't expect a rate limit error
|
||||
expected_exception = (
|
||||
ExpectNoException if num_try_send <= num_allowed_send else ValueError
|
||||
)
|
||||
|
||||
list_of_messages = generate_list_of_messages(max(num_try_send, num_allowed_send))
|
||||
rpm, tpm = calculate_limits(list_of_messages[:num_allowed_send])
|
||||
list_of_messages = list_of_messages[:num_try_send]
|
||||
router = router_factory(rpm, tpm, routing_strategy)
|
||||
|
||||
with pytest.raises(expected_exception) as excinfo: # asserts correct type raised
|
||||
if sync_mode:
|
||||
results = sync_call(router, list_of_messages)
|
||||
else:
|
||||
results = asyncio.run(async_call(router, list_of_messages))
|
||||
print(results)
|
||||
if len([i for i in results if i is not None]) != num_try_send:
|
||||
# since not all results got returned, raise rate limit error
|
||||
raise ValueError("No deployments available for selected model")
|
||||
raise ExpectNoException
|
||||
|
||||
print(expected_exception, excinfo)
|
||||
if expected_exception is ValueError:
|
||||
assert "No deployments available for selected model" in str(excinfo.value)
|
||||
else:
|
||||
assert len([i for i in results if i is not None]) == num_try_send
|
Loading…
Add table
Add a link
Reference in a new issue