feat(router.py): adding latency-based routing strategy

This commit is contained in:
Krrish Dholakia 2023-11-21 21:19:14 -08:00
parent 2f3e13e43b
commit 76f46902ed
2 changed files with 156 additions and 90 deletions

View file

@ -46,11 +46,14 @@ class Router:
num_retries: int = 0,
timeout: float = 600,
default_litellm_params = {}, # default params for Router.chat.completion.create
routing_strategy: Literal["simple-shuffle", "least-busy", "usage-based-routing"] = "simple-shuffle") -> None:
routing_strategy: Literal["simple-shuffle", "least-busy", "usage-based-routing", "latency-based-routing"] = "simple-shuffle") -> None:
if model_list:
self.set_model_list(model_list)
self.healthy_deployments: List = self.model_list
self.deployment_latency_map = {}
for m in model_list:
self.deployment_latency_map[m["litellm_params"]["model"]] = 0
self.num_retries = num_retries
@ -122,6 +125,22 @@ class Router:
except Exception as e:
pass
return healthy_deployments
def weighted_shuffle_by_latency(self, items):
# Sort the items by latency
sorted_items = sorted(items, key=lambda x: x[1])
# Get only the latencies
latencies = [i[1] for i in sorted_items]
# Calculate the sum of all latencies
total_latency = sum(latencies)
# Calculate the weight for each latency (lower latency = higher weight)
weights = [total_latency-latency for latency in latencies]
# Get a weighted random item
if sum(weights) == 0:
chosen_item = random.choice(sorted_items)[0]
else:
chosen_item = random.choices(sorted_items, weights=weights, k=1)[0][0]
return chosen_item
def set_model_list(self, model_list: list):
self.model_list = model_list
@ -155,6 +174,21 @@ class Router:
potential_deployments.append(item)
item = random.choice(potential_deployments)
return item or item[0]
elif self.routing_strategy == "latency-based-routing":
returned_item = None
lowest_latency = float('inf')
### get potential deployments
potential_deployments = []
for item in self.model_list:
if item["model_name"] == model:
potential_deployments.append(item)
### shuffles with priority for lowest latency
# items_with_latencies = [('A', 10), ('B', 20), ('C', 30), ('D', 40)]
items_with_latencies = []
for item in potential_deployments:
items_with_latencies.append((item, self.deployment_latency_map[item["litellm_params"]["model"]]))
returned_item = self.weighted_shuffle_by_latency(items_with_latencies)
return returned_item
elif self.routing_strategy == "usage-based-routing":
return self.get_usage_based_available_deployment(model=model, messages=messages, input=input)
@ -238,14 +272,23 @@ class Router:
Example usage:
response = router.completion(model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hey, how's it going?"}]
"""
# pick the one that is available (lowest TPM/RPM)
deployment = self.get_available_deployment(model=model, messages=messages)
data = deployment["litellm_params"]
for k, v in self.default_litellm_params.items():
if k not in data: # prioritize model-specific params > default router params
data[k] = v
return litellm.completion(**{**data, "messages": messages, "caching": self.cache_responses, **kwargs})
try:
# pick the one that is available (lowest TPM/RPM)
deployment = self.get_available_deployment(model=model, messages=messages)
data = deployment["litellm_params"]
for k, v in self.default_litellm_params.items():
if k not in data: # prioritize model-specific params > default router params
data[k] = v
return litellm.completion(**{**data, "messages": messages, "caching": self.cache_responses, **kwargs})
except Exception as e:
if self.num_retries > 0:
kwargs["model"] = model
kwargs["messages"] = messages
kwargs["original_exception"] = e
kwargs["original_function"] = self.completion
return self.function_with_retries(**kwargs)
else:
raise e
async def acompletion(self,
@ -261,9 +304,6 @@ class Router:
if k not in data: # prioritize model-specific params > default router params
data[k] = v
response = await litellm.acompletion(**{**data, "messages": messages, "caching": self.cache_responses, **kwargs})
# client = AsyncOpenAI()
# print(f"MAKING OPENAI CALL")
# response = await client.chat.completions.create(model=model, messages=messages)
return response
except Exception as e:
if self.num_retries > 0:
@ -282,17 +322,26 @@ class Router:
is_fallback: Optional[bool] = False,
is_async: Optional[bool] = False,
**kwargs):
try:
messages=[{"role": "user", "content": prompt}]
# pick the one that is available (lowest TPM/RPM)
deployment = self.get_available_deployment(model=model, messages=messages)
messages=[{"role": "user", "content": prompt}]
# pick the one that is available (lowest TPM/RPM)
deployment = self.get_available_deployment(model=model, messages=messages)
data = deployment["litellm_params"]
for k, v in self.default_litellm_params.items():
if k not in data: # prioritize model-specific params > default router params
data[k] = v
# call via litellm.completion()
return litellm.text_completion(**{**data, "prompt": prompt, "caching": self.cache_responses, **kwargs}) # type: ignore
data = deployment["litellm_params"]
for k, v in self.default_litellm_params.items():
if k not in data: # prioritize model-specific params > default router params
data[k] = v
# call via litellm.completion()
return litellm.text_completion(**{**data, "prompt": prompt, "caching": self.cache_responses, **kwargs}) # type: ignore
except Exception as e:
if self.num_retries > 0:
kwargs["model"] = model
kwargs["messages"] = messages
kwargs["original_exception"] = e
kwargs["original_function"] = self.completion
return self.function_with_retries(**kwargs)
else:
raise e
def embedding(self,
model: str,
@ -344,6 +393,20 @@ class Router:
else:
total_tokens = completion_response['usage']['total_tokens']
self._set_deployment_usage(model_name, total_tokens)
self.deployment_latency_map[model_name] = (end_time - start_time).total_seconds()
def deployment_callback_on_failure(
self,
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
model_name = kwargs.get('model', None) # i.e. gpt35turbo
custom_llm_provider = kwargs.get("litellm_params", {}).get('custom_llm_provider', None) # i.e. azure
if custom_llm_provider:
model_name = f"{custom_llm_provider}/{model_name}"
self.deployment_latency_map[model_name] = float('inf')
def get_usage_based_available_deployment(self,
model: str,