fix(lowest_latency.py): allow setting a buffer for getting values within a certain latency threshold

if an endpoint is slow - it's completion time might not be updated till the call is completed. This prevents us from overloading those endpoints, in a simple way.
This commit is contained in:
Krrish Dholakia 2024-04-30 12:00:26 -07:00
parent 398d503590
commit 90cdfef1c1
2 changed files with 121 additions and 7 deletions

View file

@ -4,6 +4,7 @@ from pydantic import BaseModel, Extra, Field, root_validator
import dotenv, os, requests, random import dotenv, os, requests, random
from typing import Optional, Union, List, Dict from typing import Optional, Union, List, Dict
from datetime import datetime, timedelta from datetime import datetime, timedelta
import random
dotenv.load_dotenv() # Loading env variables using dotenv dotenv.load_dotenv() # Loading env variables using dotenv
import traceback import traceback
@ -29,6 +30,7 @@ class LiteLLMBase(BaseModel):
class RoutingArgs(LiteLLMBase): class RoutingArgs(LiteLLMBase):
ttl: int = 1 * 60 * 60 # 1 hour ttl: int = 1 * 60 * 60 # 1 hour
lowest_latency_buffer: float = 0
class LowestLatencyLoggingHandler(CustomLogger): class LowestLatencyLoggingHandler(CustomLogger):
@ -314,8 +316,12 @@ class LowestLatencyLoggingHandler(CustomLogger):
# randomly sample from all_deployments, incase all deployments have latency=0.0 # randomly sample from all_deployments, incase all deployments have latency=0.0
_items = all_deployments.items() _items = all_deployments.items()
all_deployments = random.sample(list(_items), len(_items)) all_deployments = random.sample(list(_items), len(_items))
all_deployments = dict(all_deployments) all_deployments = dict(all_deployments)
### GET AVAILABLE DEPLOYMENTS ### filter out any deployments > tpm/rpm limits
potential_deployments = []
for item, item_map in all_deployments.items(): for item, item_map in all_deployments.items():
## get the item from model list ## get the item from model list
_deployment = None _deployment = None
@ -364,17 +370,33 @@ class LowestLatencyLoggingHandler(CustomLogger):
# End of Debugging Logic # End of Debugging Logic
# -------------- # # -------------- #
if item_latency == 0: if (
deployment = _deployment
break
elif (
item_tpm + input_tokens > _deployment_tpm item_tpm + input_tokens > _deployment_tpm
or item_rpm + 1 > _deployment_rpm or item_rpm + 1 > _deployment_rpm
): # if user passed in tpm / rpm in the model_list ): # if user passed in tpm / rpm in the model_list
continue continue
elif item_latency < lowest_latency: else:
lowest_latency = item_latency potential_deployments.append((_deployment, item_latency))
deployment = _deployment
if len(potential_deployments) == 0:
return None
# Sort potential deployments by latency
sorted_deployments = sorted(potential_deployments, key=lambda x: x[1])
# Find lowest latency deployment
lowest_latency = sorted_deployments[0][1]
# Find deployments within buffer of lowest latency
buffer = self.routing_args.lowest_latency_buffer * lowest_latency
valid_deployments = [
x for x in sorted_deployments if x[1] <= lowest_latency + buffer
]
# Pick a random deployment from valid deployments
random_valid_deployment = random.choice(valid_deployments)
deployment = random_valid_deployment[0]
if request_kwargs is not None and "metadata" in request_kwargs: if request_kwargs is not None and "metadata" in request_kwargs:
request_kwargs["metadata"][ request_kwargs["metadata"][
"_latency_per_deployment" "_latency_per_deployment"

View file

@ -631,3 +631,95 @@ async def test_lowest_latency_routing_first_pick():
# assert that len(deployments) >1 # assert that len(deployments) >1
assert len(deployments) > 1 assert len(deployments) > 1
@pytest.mark.parametrize("buffer", [0, 1])
@pytest.mark.asyncio
async def test_lowest_latency_routing_buffer(buffer):
"""
Allow shuffling calls within a certain latency buffer
"""
model_list = [
{
"model_name": "azure-model",
"litellm_params": {
"model": "azure/gpt-turbo",
"api_key": "os.environ/AZURE_FRANCE_API_KEY",
"api_base": "https://openai-france-1234.openai.azure.com",
"rpm": 1440,
},
"model_info": {"id": 1},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "azure/gpt-35-turbo",
"api_key": "os.environ/AZURE_EUROPE_API_KEY",
"api_base": "https://my-endpoint-europe-berri-992.openai.azure.com",
"rpm": 6,
},
"model_info": {"id": 2},
},
]
router = Router(
model_list=model_list,
routing_strategy="latency-based-routing",
set_verbose=False,
num_retries=3,
routing_strategy_args={"lowest_latency_buffer": buffer},
) # type: ignore
## DEPLOYMENT 1 ##
deployment_id = 1
kwargs = {
"litellm_params": {
"metadata": {
"model_group": "azure-model",
},
"model_info": {"id": 1},
}
}
start_time = time.time()
response_obj = {"usage": {"total_tokens": 50}}
time.sleep(3)
end_time = time.time()
router.lowestlatency_logger.log_success_event(
response_obj=response_obj,
kwargs=kwargs,
start_time=start_time,
end_time=end_time,
)
## DEPLOYMENT 2 ##
deployment_id = 2
kwargs = {
"litellm_params": {
"metadata": {
"model_group": "azure-model",
},
"model_info": {"id": 2},
}
}
start_time = time.time()
response_obj = {"usage": {"total_tokens": 20}}
time.sleep(2)
end_time = time.time()
router.lowestlatency_logger.log_success_event(
response_obj=response_obj,
kwargs=kwargs,
start_time=start_time,
end_time=end_time,
)
## CHECK WHAT'S SELECTED ##
# print(router.lowesttpm_logger.get_available_deployments(model_group="azure-model"))
selected_deployments = {}
for _ in range(50):
print(router.get_available_deployment(model="azure-model"))
selected_deployments[
router.get_available_deployment(model="azure-model")["model_info"]["id"]
] = 1
if buffer == 0:
assert len(selected_deployments.keys()) == 1
else:
assert len(selected_deployments.keys()) == 2