diff --git a/litellm/integrations/slack_alerting.py b/litellm/integrations/slack_alerting.py index a71890021..64f9b5384 100644 --- a/litellm/integrations/slack_alerting.py +++ b/litellm/integrations/slack_alerting.py @@ -139,6 +139,28 @@ class SlackAlerting: except Exception as e: raise e + def _get_deployment_latencies_to_alert(self, metadata=None): + + if metadata is None: + return None + + if "_latency_per_deployment" in metadata: + # Translate model_id to -> api_base + # _latency_per_deployment is a dictionary that looks like this: + """ + _latency_per_deployment: { + api_base: 0.01336697916666667 + } + """ + _message_to_send = "" + _deployment_latencies = metadata["_latency_per_deployment"] + if len(_deployment_latencies) == 0: + return None + for api_base, latency in _deployment_latencies.items(): + _message_to_send += f"\n{api_base}: {round(latency,2)}s" + _message_to_send = "```" + _message_to_send + "```" + return _message_to_send + async def response_taking_too_long_callback( self, kwargs, # kwargs to completion @@ -165,6 +187,21 @@ class SlackAlerting: request_info = self._add_langfuse_trace_id_to_alert( request_info=request_info, kwargs=kwargs ) + # add deployment latencies to alert + if ( + kwargs is not None + and "litellm_params" in kwargs + and "metadata" in kwargs["litellm_params"] + ): + _metadata = kwargs["litellm_params"]["metadata"] + + _deployment_latency_map = self._get_deployment_latencies_to_alert( + metadata=_metadata + ) + if _deployment_latency_map is not None: + request_info += ( + f"\nAvailable Deployment Latencies\n{_deployment_latency_map}" + ) await self.send_alert( message=slow_message + request_info, level="Low", @@ -243,6 +280,14 @@ class SlackAlerting: alerting_message = ( f"`Requests are hanging - {self.alerting_threshold}s+ request time`" ) + + # add deployment latencies to alert + _deployment_latency_map = self._get_deployment_latencies_to_alert( + metadata=request_data.get("metadata", {}) + ) + if _deployment_latency_map is not None: + request_info += f"\nDeployment Latencies\n{_deployment_latency_map}" + await self.send_alert( message=alerting_message + request_info, level="Medium", diff --git a/litellm/router.py b/litellm/router.py index 672eb097d..371d8e8eb 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -454,6 +454,7 @@ class Router: model=model, messages=messages, specific_deployment=kwargs.pop("specific_deployment", None), + request_kwargs=kwargs, ) # debug how often this deployment picked @@ -2831,6 +2832,7 @@ class Router: messages: Optional[List[Dict[str, str]]] = None, input: Optional[Union[str, List]] = None, specific_deployment: Optional[bool] = False, + request_kwargs: Optional[Dict] = None, ): """ Async implementation of 'get_available_deployments'. @@ -2846,6 +2848,7 @@ class Router: messages=messages, input=input, specific_deployment=specific_deployment, + request_kwargs=request_kwargs, ) model, healthy_deployments = self._common_checks_available_deployment( @@ -2949,6 +2952,7 @@ class Router: messages: Optional[List[Dict[str, str]]] = None, input: Optional[Union[str, List]] = None, specific_deployment: Optional[bool] = False, + request_kwargs: Optional[Dict] = None, ): """ Returns the deployment based on routing strategy @@ -3035,7 +3039,9 @@ class Router: and self.lowestlatency_logger is not None ): deployment = self.lowestlatency_logger.get_available_deployments( - model_group=model, healthy_deployments=healthy_deployments + model_group=model, + healthy_deployments=healthy_deployments, + request_kwargs=request_kwargs, ) elif ( self.routing_strategy == "usage-based-routing" diff --git a/litellm/router_strategy/lowest_latency.py b/litellm/router_strategy/lowest_latency.py index 387f8c210..9e80275ad 100644 --- a/litellm/router_strategy/lowest_latency.py +++ b/litellm/router_strategy/lowest_latency.py @@ -11,6 +11,7 @@ from litellm.caching import DualCache from litellm.integrations.custom_logger import CustomLogger from litellm import ModelResponse from litellm import token_counter +import litellm class LiteLLMBase(BaseModel): @@ -126,6 +127,61 @@ class LowestLatencyLoggingHandler(CustomLogger): traceback.print_exc() pass + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + """ + Check if Timeout Error, if timeout set deployment latency -> 100 + """ + try: + _exception = kwargs.get("exception", None) + if isinstance(_exception, litellm.Timeout): + if kwargs["litellm_params"].get("metadata") is None: + pass + else: + model_group = kwargs["litellm_params"]["metadata"].get( + "model_group", None + ) + + id = kwargs["litellm_params"].get("model_info", {}).get("id", None) + if model_group is None or id is None: + return + elif isinstance(id, int): + id = str(id) + + # ------------ + # Setup values + # ------------ + """ + { + {model_group}_map: { + id: { + "latency": [..] + f"{date:hour:minute}" : {"tpm": 34, "rpm": 3} + } + } + } + """ + latency_key = f"{model_group}_map" + request_count_dict = ( + self.router_cache.get_cache(key=latency_key) or {} + ) + + if id not in request_count_dict: + request_count_dict[id] = {} + + ## Latency + request_count_dict[id].setdefault("latency", []).append(100.0) + self.router_cache.set_cache( + key=latency_key, + value=request_count_dict, + ttl=self.routing_args.ttl, + ) # reset map within window + else: + # do nothing if it's not a timeout error + return + except Exception as e: + traceback.print_exc() + pass + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): try: """ @@ -216,12 +272,14 @@ class LowestLatencyLoggingHandler(CustomLogger): healthy_deployments: list, messages: Optional[List[Dict[str, str]]] = None, input: Optional[Union[str, List]] = None, + request_kwargs: Optional[Dict] = None, ): """ Returns a deployment with the lowest latency """ # get list of potential deployments latency_key = f"{model_group}_map" + _latency_per_deployment = {} request_count_dict = self.router_cache.get_cache(key=latency_key) or {} @@ -298,4 +356,14 @@ class LowestLatencyLoggingHandler(CustomLogger): elif item_latency < lowest_latency: lowest_latency = item_latency deployment = _deployment + + # _latency_per_deployment is used for debuggig + _deployment_api_base = _deployment.get("litellm_params", {}).get( + "api_base", "" + ) + _latency_per_deployment[_deployment_api_base] = item_latency + if request_kwargs is not None and "metadata" in request_kwargs: + request_kwargs["metadata"][ + "_latency_per_deployment" + ] = _latency_per_deployment return deployment diff --git a/litellm/tests/test_lowest_latency_routing.py b/litellm/tests/test_lowest_latency_routing.py index 6af690a0d..4b93853f4 100644 --- a/litellm/tests/test_lowest_latency_routing.py +++ b/litellm/tests/test_lowest_latency_routing.py @@ -477,3 +477,81 @@ async def test_router_completion_streaming(): # asyncio.run(test_router_completion_streaming()) + + +@pytest.mark.asyncio +async def test_lowest_latency_routing_with_timeouts(): + """ + PROD Test: + - Endpoint 1: triggers timeout errors (it takes 10+ seconds to respond) + - Endpoint 2: Responds in under 1s + - Run 5 requests to collect data on latency + - Run Wait till cache is filled with data + - Run 10 more requests + - All requests should have been routed to endpoint 2 + """ + import litellm + + litellm.set_verbose = True + + router = Router( + model_list=[ + { + "model_name": "azure-model", + "litellm_params": { + "model": "openai/slow-endpoint", + "api_base": "https://exampleopenaiendpoint-production-c715.up.railway.app/", # If you are Krrish, this is OpenAI Endpoint3 on our Railway endpoint :) + "api_key": "fake-key", + }, + "model_info": {"id": "slow-endpoint"}, + }, + { + "model_name": "azure-model", + "litellm_params": { + "model": "openai/fast-endpoint", + "api_base": "https://exampleopenaiendpoint-production.up.railway.app/", + "api_key": "fake-key", + }, + "model_info": {"id": "fast-endpoint"}, + }, + ], + routing_strategy="latency-based-routing", + set_verbose=True, + debug_level="DEBUG", + timeout=1, + ) # type: ignore + + # make 4 requests + for _ in range(4): + try: + response = await router.acompletion( + model="azure-model", messages=[{"role": "user", "content": "hello"}] + ) + print(response) + except Exception as e: + print("got exception", e) + + await asyncio.sleep(1) + print("done sending initial requests to collect latency") + """ + Note: for debugging + - By this point: slow-endpoint should have timed out 3-4 times and should be heavily penalized :) + - The next 10 requests should all be routed to the fast-endpoint + """ + + deployments = {} + # make 10 requests + for _ in range(10): + response = await router.acompletion( + model="azure-model", messages=[{"role": "user", "content": "hello"}] + ) + print(response) + _picked_model_id = response._hidden_params["model_id"] + if _picked_model_id not in deployments: + deployments[_picked_model_id] = 1 + else: + deployments[_picked_model_id] += 1 + print("deployments", deployments) + + # ALL the Requests should have been routed to the fast-endpoint + assert deployments["fast-endpoint"] == 10