Merge pull request #3283 from BerriAI/litellm_debug_lowest_latency

[Fix] Add better observability for debugging lowest latency routing
This commit is contained in:
Ishaan Jaff 2024-04-24 20:42:52 -07:00 committed by GitHub
commit 2c7f4695d9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 198 additions and 1 deletions

View file

@ -139,6 +139,28 @@ class SlackAlerting:
except Exception as e: except Exception as e:
raise e raise e
def _get_deployment_latencies_to_alert(self, metadata=None):
if metadata is None:
return None
if "_latency_per_deployment" in metadata:
# Translate model_id to -> api_base
# _latency_per_deployment is a dictionary that looks like this:
"""
_latency_per_deployment: {
api_base: 0.01336697916666667
}
"""
_message_to_send = ""
_deployment_latencies = metadata["_latency_per_deployment"]
if len(_deployment_latencies) == 0:
return None
for api_base, latency in _deployment_latencies.items():
_message_to_send += f"\n{api_base}: {round(latency,2)}s"
_message_to_send = "```" + _message_to_send + "```"
return _message_to_send
async def response_taking_too_long_callback( async def response_taking_too_long_callback(
self, self,
kwargs, # kwargs to completion kwargs, # kwargs to completion
@ -165,6 +187,21 @@ class SlackAlerting:
request_info = self._add_langfuse_trace_id_to_alert( request_info = self._add_langfuse_trace_id_to_alert(
request_info=request_info, kwargs=kwargs request_info=request_info, kwargs=kwargs
) )
# add deployment latencies to alert
if (
kwargs is not None
and "litellm_params" in kwargs
and "metadata" in kwargs["litellm_params"]
):
_metadata = kwargs["litellm_params"]["metadata"]
_deployment_latency_map = self._get_deployment_latencies_to_alert(
metadata=_metadata
)
if _deployment_latency_map is not None:
request_info += (
f"\nAvailable Deployment Latencies\n{_deployment_latency_map}"
)
await self.send_alert( await self.send_alert(
message=slow_message + request_info, message=slow_message + request_info,
level="Low", level="Low",
@ -243,6 +280,14 @@ class SlackAlerting:
alerting_message = ( alerting_message = (
f"`Requests are hanging - {self.alerting_threshold}s+ request time`" f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
) )
# add deployment latencies to alert
_deployment_latency_map = self._get_deployment_latencies_to_alert(
metadata=request_data.get("metadata", {})
)
if _deployment_latency_map is not None:
request_info += f"\nDeployment Latencies\n{_deployment_latency_map}"
await self.send_alert( await self.send_alert(
message=alerting_message + request_info, message=alerting_message + request_info,
level="Medium", level="Medium",

View file

@ -454,6 +454,7 @@ class Router:
model=model, model=model,
messages=messages, messages=messages,
specific_deployment=kwargs.pop("specific_deployment", None), specific_deployment=kwargs.pop("specific_deployment", None),
request_kwargs=kwargs,
) )
# debug how often this deployment picked # debug how often this deployment picked
@ -2831,6 +2832,7 @@ class Router:
messages: Optional[List[Dict[str, str]]] = None, messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None, input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False, specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
): ):
""" """
Async implementation of 'get_available_deployments'. Async implementation of 'get_available_deployments'.
@ -2846,6 +2848,7 @@ class Router:
messages=messages, messages=messages,
input=input, input=input,
specific_deployment=specific_deployment, specific_deployment=specific_deployment,
request_kwargs=request_kwargs,
) )
model, healthy_deployments = self._common_checks_available_deployment( model, healthy_deployments = self._common_checks_available_deployment(
@ -2949,6 +2952,7 @@ class Router:
messages: Optional[List[Dict[str, str]]] = None, messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None, input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False, specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
): ):
""" """
Returns the deployment based on routing strategy Returns the deployment based on routing strategy
@ -3035,7 +3039,9 @@ class Router:
and self.lowestlatency_logger is not None and self.lowestlatency_logger is not None
): ):
deployment = self.lowestlatency_logger.get_available_deployments( deployment = self.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=healthy_deployments model_group=model,
healthy_deployments=healthy_deployments,
request_kwargs=request_kwargs,
) )
elif ( elif (
self.routing_strategy == "usage-based-routing" self.routing_strategy == "usage-based-routing"

View file

@ -11,6 +11,7 @@ from litellm.caching import DualCache
from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.custom_logger import CustomLogger
from litellm import ModelResponse from litellm import ModelResponse
from litellm import token_counter from litellm import token_counter
import litellm
class LiteLLMBase(BaseModel): class LiteLLMBase(BaseModel):
@ -126,6 +127,61 @@ class LowestLatencyLoggingHandler(CustomLogger):
traceback.print_exc() traceback.print_exc()
pass pass
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""
Check if Timeout Error, if timeout set deployment latency -> 100
"""
try:
_exception = kwargs.get("exception", None)
if isinstance(_exception, litellm.Timeout):
if kwargs["litellm_params"].get("metadata") is None:
pass
else:
model_group = kwargs["litellm_params"]["metadata"].get(
"model_group", None
)
id = kwargs["litellm_params"].get("model_info", {}).get("id", None)
if model_group is None or id is None:
return
elif isinstance(id, int):
id = str(id)
# ------------
# Setup values
# ------------
"""
{
{model_group}_map: {
id: {
"latency": [..]
f"{date:hour:minute}" : {"tpm": 34, "rpm": 3}
}
}
}
"""
latency_key = f"{model_group}_map"
request_count_dict = (
self.router_cache.get_cache(key=latency_key) or {}
)
if id not in request_count_dict:
request_count_dict[id] = {}
## Latency
request_count_dict[id].setdefault("latency", []).append(100.0)
self.router_cache.set_cache(
key=latency_key,
value=request_count_dict,
ttl=self.routing_args.ttl,
) # reset map within window
else:
# do nothing if it's not a timeout error
return
except Exception as e:
traceback.print_exc()
pass
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
try: try:
""" """
@ -216,12 +272,14 @@ class LowestLatencyLoggingHandler(CustomLogger):
healthy_deployments: list, healthy_deployments: list,
messages: Optional[List[Dict[str, str]]] = None, messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None, input: Optional[Union[str, List]] = None,
request_kwargs: Optional[Dict] = None,
): ):
""" """
Returns a deployment with the lowest latency Returns a deployment with the lowest latency
""" """
# get list of potential deployments # get list of potential deployments
latency_key = f"{model_group}_map" latency_key = f"{model_group}_map"
_latency_per_deployment = {}
request_count_dict = self.router_cache.get_cache(key=latency_key) or {} request_count_dict = self.router_cache.get_cache(key=latency_key) or {}
@ -298,4 +356,14 @@ class LowestLatencyLoggingHandler(CustomLogger):
elif item_latency < lowest_latency: elif item_latency < lowest_latency:
lowest_latency = item_latency lowest_latency = item_latency
deployment = _deployment deployment = _deployment
# _latency_per_deployment is used for debuggig
_deployment_api_base = _deployment.get("litellm_params", {}).get(
"api_base", ""
)
_latency_per_deployment[_deployment_api_base] = item_latency
if request_kwargs is not None and "metadata" in request_kwargs:
request_kwargs["metadata"][
"_latency_per_deployment"
] = _latency_per_deployment
return deployment return deployment

View file

@ -477,3 +477,81 @@ async def test_router_completion_streaming():
# asyncio.run(test_router_completion_streaming()) # asyncio.run(test_router_completion_streaming())
@pytest.mark.asyncio
async def test_lowest_latency_routing_with_timeouts():
"""
PROD Test:
- Endpoint 1: triggers timeout errors (it takes 10+ seconds to respond)
- Endpoint 2: Responds in under 1s
- Run 5 requests to collect data on latency
- Run Wait till cache is filled with data
- Run 10 more requests
- All requests should have been routed to endpoint 2
"""
import litellm
litellm.set_verbose = True
router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/slow-endpoint",
"api_base": "https://exampleopenaiendpoint-production-c715.up.railway.app/", # If you are Krrish, this is OpenAI Endpoint3 on our Railway endpoint :)
"api_key": "fake-key",
},
"model_info": {"id": "slow-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
routing_strategy="latency-based-routing",
set_verbose=True,
debug_level="DEBUG",
timeout=1,
) # type: ignore
# make 4 requests
for _ in range(4):
try:
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
except Exception as e:
print("got exception", e)
await asyncio.sleep(1)
print("done sending initial requests to collect latency")
"""
Note: for debugging
- By this point: slow-endpoint should have timed out 3-4 times and should be heavily penalized :)
- The next 10 requests should all be routed to the fast-endpoint
"""
deployments = {}
# make 10 requests
for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
if _picked_model_id not in deployments:
deployments[_picked_model_id] = 1
else:
deployments[_picked_model_id] += 1
print("deployments", deployments)
# ALL the Requests should have been routed to the fast-endpoint
assert deployments["fast-endpoint"] == 10