Merge pull request #4318 from BerriAI/litellm_use_custom_routing_strat

[Feat] allow using custom router strategy
This commit is contained in:
Ishaan Jaff 2024-06-20 15:11:51 -07:00 committed by GitHub
commit aa8f0637d1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 343 additions and 4 deletions

View file

@ -95,7 +95,7 @@ print(response)
- `router.image_generation()` - completion calls in OpenAI `/v1/images/generations` endpoint format
- `router.aimage_generation()` - async image generation calls
## Advanced - Routing Strategies
## Advanced - Routing Strategies ⭐️
#### Routing Strategies - Weighted Pick, Rate Limit Aware, Least Busy, Latency Based, Cost Based
Router provides 4 strategies for routing your calls across multiple deployments:
@ -262,7 +262,7 @@ if response is not None:
)
```
### Set Time Window
#### Set Time Window
Set time window for how far back to consider when averaging latency for a deployment.
@ -278,7 +278,7 @@ router_settings:
routing_strategy_args: {"ttl": 10}
```
### Set Lowest Latency Buffer
#### Set Lowest Latency Buffer
Set a buffer within which deployments are candidates for making calls to.
@ -468,6 +468,122 @@ asyncio.run(router_acompletion())
```
</TabItem>
<TabItem value="custom" label="Custom Routing Strategy">
**Plugin a custom routing strategy to select deployments**
Step 1. Define your custom routing strategy
```python
from litellm.router import CustomRoutingStrategyBase
class CustomRoutingStrategy(CustomRoutingStrategyBase):
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
print("In CUSTOM async get available deployment")
model_list = router.model_list
print("router model list=", model_list)
for model in model_list:
if isinstance(model, dict):
if model["litellm_params"]["model"] == "openai/very-special-endpoint":
return model
pass
def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass
```
Step 2. Initialize Router with custom routing strategy
```python
from litellm import Router
router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/very-special-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/", # If you are Krrish, this is OpenAI Endpoint3 on our Railway endpoint :)
"api_key": "fake-key",
},
"model_info": {"id": "very-special-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
timeout=1,
) # type: ignore
router.set_custom_routing_strategy(CustomRoutingStrategy()) # 👈 Set your routing strategy here
```
Step 3. Test your routing strategy. Expect your custom routing strategy to be called when running `router.acompletion` requests
```python
for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
print("picked model=", _picked_model_id)
```
</TabItem>
<TabItem value="lowest-cost" label="Lowest Cost Routing (Async)">
Picks a deployment based on the lowest cost
@ -563,7 +679,6 @@ asyncio.run(router_acompletion())
```
</TabItem>
</Tabs>
## Basic Reliability

View file

@ -69,6 +69,7 @@ from litellm.types.router import (
AlertingConfig,
AllowedFailsPolicy,
AssistantsTypedDict,
CustomRoutingStrategyBase,
Deployment,
DeploymentTypedDict,
LiteLLM_Params,
@ -4814,6 +4815,29 @@ class Router:
except Exception as e:
pass
def set_custom_routing_strategy(
self, CustomRoutingStrategy: CustomRoutingStrategyBase
):
"""
Sets get_available_deployment and async_get_available_deployment on an instanced of litellm.Router
Use this to set your custom routing strategy
Args:
CustomRoutingStrategy: litellm.router.CustomRoutingStrategyBase
"""
setattr(
self,
"get_available_deployment",
CustomRoutingStrategy.get_available_deployment,
)
setattr(
self,
"async_get_available_deployment",
CustomRoutingStrategy.async_get_available_deployment,
)
def flush_cache(self):
litellm.cache = None
self.cache.flush_cache()

View file

@ -0,0 +1,150 @@
import asyncio
import os
import random
import sys
import time
import traceback
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
import copy
import os
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
from typing import Dict, List, Optional, Union
import pytest
import litellm
from litellm import Router
router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/very-special-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/", # If you are Krrish, this is OpenAI Endpoint3 on our Railway endpoint :)
"api_key": "fake-key",
},
"model_info": {"id": "very-special-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
)
from litellm.router import CustomRoutingStrategyBase
class CustomRoutingStrategy(CustomRoutingStrategyBase):
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
print("In CUSTOM async get available deployment")
model_list = router.model_list
print("router model list=", model_list)
for model in model_list:
if isinstance(model, dict):
if model["litellm_params"]["model"] == "openai/very-special-endpoint":
return model
pass
def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass
@pytest.mark.asyncio
async def test_custom_routing():
import litellm
litellm.set_verbose = True
router.set_custom_routing_strategy(CustomRoutingStrategy())
# make 4 requests
for _ in range(4):
try:
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
except Exception as e:
print("got exception", e)
await asyncio.sleep(1)
print("done sending initial requests to collect latency")
"""
Note: for debugging
- By this point: slow-endpoint should have timed out 3-4 times and should be heavily penalized :)
- The next 10 requests should all be routed to the fast-endpoint
"""
deployments = {}
# make 10 requests
for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
if _picked_model_id not in deployments:
deployments[_picked_model_id] = 1
else:
deployments[_picked_model_id] += 1
print("deployments", deployments)
# ALL the Requests should have been routed to the fast-endpoint
# assert deployments["fast-endpoint"] == 10

View file

@ -451,3 +451,53 @@ class ModelGroupInfo(BaseModel):
class AssistantsTypedDict(TypedDict):
custom_llm_provider: Literal["azure", "openai"]
litellm_params: LiteLLMParamsTypedDict
class CustomRoutingStrategyBase:
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass
def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass