mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
Litellm router max depth (#6501)
* feat(router.py): add check for max fallback depth Prevent infinite loop for fallbacks Closes https://github.com/BerriAI/litellm/issues/6498 * test: update test * (fix) Prometheus - Log Postgres DB latency, status on prometheus (#6484) * fix logging DB fails on prometheus * unit testing log to otel wrapper * unit testing for service logger + prometheus * use LATENCY buckets for service logging * fix service logging * docs clarify vertex vs gemini * (router_strategy/) ensure all async functions use async cache methods (#6489) * fix router strat * use async set / get cache in router_strategy * add coverage for router strategy * fix imports * fix batch_get_cache * use async methods for least busy * fix least busy use async methods * fix test_dual_cache_increment * test async_get_available_deployment when routing_strategy="least-busy" * (fix) proxy - fix when `STORE_MODEL_IN_DB` should be set (#6492) * set store_model_in_db at the top * correctly use store_model_in_db global * (fix) `PrometheusServicesLogger` `_get_metric` should return metric in Registry (#6486) * fix logging DB fails on prometheus * unit testing log to otel wrapper * unit testing for service logger + prometheus * use LATENCY buckets for service logging * fix service logging * fix _get_metric in prom services logger * add clear doc string * unit testing for prom service logger * bump: version 1.51.0 → 1.51.1 * Add `azure/gpt-4o-mini-2024-07-18` to model_prices_and_context_window.json (#6477) * Update utils.py (#6468) Fixed missing keys * (perf) Litellm redis router fix - ~100ms improvement (#6483) * docs(exception_mapping.md): add missing exception types Fixes https://github.com/Aider-AI/aider/issues/2120#issuecomment-2438971183 * fix(main.py): register custom model pricing with specific key Ensure custom model pricing is registered to the specific model+provider key combination * test: make testing more robust for custom pricing * fix(redis_cache.py): instrument otel logging for sync redis calls ensures complete coverage for all redis cache calls * refactor: pass parent_otel_span for redis caching calls in router allows for more observability into what calls are causing latency issues * test: update tests with new params * refactor: ensure e2e otel tracing for router * refactor(router.py): add more otel tracing acrosss router catch all latency issues for router requests * fix: fix linting error * fix(router.py): fix linting error * fix: fix test * test: fix tests * fix(dual_cache.py): pass ttl to redis cache * fix: fix param * perf(cooldown_cache.py): improve cooldown cache, to store cache results in memory for 5s, prevents redis call from being made on each request reduces 100ms latency per call with caching enabled on router * fix: fix test * fix(cooldown_cache.py): handle if a result is None * fix(cooldown_cache.py): add debug statements * refactor(dual_cache.py): move to using an in-memory check for batch get cache, to prevent redis from being hit for every call * fix(cooldown_cache.py): fix linting erropr * build: merge main --------- Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com> Co-authored-by: Xingyao Wang <xingyao@all-hands.dev> Co-authored-by: vibhanshu-ob <115142120+vibhanshu-ob@users.noreply.github.com>
This commit is contained in:
parent
4bde45e7f2
commit
10e8d7aa45
11 changed files with 165 additions and 235 deletions
|
@ -189,6 +189,9 @@ class Router:
|
|||
default_priority: Optional[int] = None,
|
||||
## RELIABILITY ##
|
||||
num_retries: Optional[int] = None,
|
||||
max_fallbacks: Optional[
|
||||
int
|
||||
] = None, # max fallbacks to try before exiting the call. Defaults to 5.
|
||||
timeout: Optional[float] = None,
|
||||
default_litellm_params: Optional[
|
||||
dict
|
||||
|
@ -410,6 +413,13 @@ class Router:
|
|||
else:
|
||||
self.num_retries = openai.DEFAULT_MAX_RETRIES
|
||||
|
||||
if max_fallbacks is not None:
|
||||
self.max_fallbacks = max_fallbacks
|
||||
elif litellm.max_fallbacks is not None:
|
||||
self.max_fallbacks = litellm.max_fallbacks
|
||||
else:
|
||||
self.max_fallbacks = litellm.ROUTER_MAX_FALLBACKS
|
||||
|
||||
self.timeout = timeout or litellm.request_timeout
|
||||
|
||||
self.retry_after = retry_after
|
||||
|
@ -2672,8 +2682,19 @@ class Router:
|
|||
if original_model_group is None:
|
||||
raise e
|
||||
|
||||
input_kwargs = {
|
||||
"litellm_router": self,
|
||||
"original_exception": original_exception,
|
||||
**kwargs,
|
||||
}
|
||||
|
||||
if "max_fallbacks" not in input_kwargs:
|
||||
input_kwargs["max_fallbacks"] = self.max_fallbacks
|
||||
if "fallback_depth" not in input_kwargs:
|
||||
input_kwargs["fallback_depth"] = 0
|
||||
|
||||
try:
|
||||
verbose_router_logger.debug("Trying to fallback b/w models")
|
||||
verbose_router_logger.info("Trying to fallback b/w models")
|
||||
if isinstance(e, litellm.ContextWindowExceededError):
|
||||
if context_window_fallbacks is not None:
|
||||
fallback_model_group: Optional[List[str]] = (
|
||||
|
@ -2685,13 +2706,16 @@ class Router:
|
|||
if fallback_model_group is None:
|
||||
raise original_exception
|
||||
|
||||
input_kwargs.update(
|
||||
{
|
||||
"fallback_model_group": fallback_model_group,
|
||||
"original_model_group": original_model_group,
|
||||
}
|
||||
)
|
||||
|
||||
response = await run_async_fallback(
|
||||
*args,
|
||||
litellm_router=self,
|
||||
fallback_model_group=fallback_model_group,
|
||||
original_model_group=original_model_group,
|
||||
original_exception=original_exception,
|
||||
**kwargs,
|
||||
**input_kwargs,
|
||||
)
|
||||
return response
|
||||
|
||||
|
@ -2718,13 +2742,16 @@ class Router:
|
|||
if fallback_model_group is None:
|
||||
raise original_exception
|
||||
|
||||
input_kwargs.update(
|
||||
{
|
||||
"fallback_model_group": fallback_model_group,
|
||||
"original_model_group": original_model_group,
|
||||
}
|
||||
)
|
||||
|
||||
response = await run_async_fallback(
|
||||
*args,
|
||||
litellm_router=self,
|
||||
fallback_model_group=fallback_model_group,
|
||||
original_model_group=original_model_group,
|
||||
original_exception=original_exception,
|
||||
**kwargs,
|
||||
**input_kwargs,
|
||||
)
|
||||
return response
|
||||
else:
|
||||
|
@ -2767,13 +2794,16 @@ class Router:
|
|||
original_exception.message += f"No fallback model group found for original model_group={model_group}. Fallbacks={fallbacks}" # type: ignore
|
||||
raise original_exception
|
||||
|
||||
input_kwargs.update(
|
||||
{
|
||||
"fallback_model_group": fallback_model_group,
|
||||
"original_model_group": original_model_group,
|
||||
}
|
||||
)
|
||||
|
||||
response = await run_async_fallback(
|
||||
*args,
|
||||
litellm_router=self,
|
||||
fallback_model_group=fallback_model_group,
|
||||
original_model_group=original_model_group,
|
||||
original_exception=original_exception,
|
||||
**kwargs,
|
||||
**input_kwargs,
|
||||
)
|
||||
return response
|
||||
except Exception as new_exception:
|
||||
|
@ -2982,7 +3012,9 @@ class Router:
|
|||
Handler for making a call to the .completion()/.embeddings()/etc. functions.
|
||||
"""
|
||||
model_group = kwargs.get("model")
|
||||
response = await original_function(*args, **kwargs)
|
||||
response = original_function(*args, **kwargs)
|
||||
if inspect.iscoroutinefunction(response) or inspect.isawaitable(response):
|
||||
response = await response
|
||||
## PROCESS RESPONSE HEADERS
|
||||
await self.set_response_headers(response=response, model_group=model_group)
|
||||
|
||||
|
@ -3080,120 +3112,38 @@ class Router:
|
|||
|
||||
def function_with_fallbacks(self, *args, **kwargs):
|
||||
"""
|
||||
Try calling the function_with_retries
|
||||
If it fails after num_retries, fall back to another model group
|
||||
Sync wrapper for async_function_with_fallbacks
|
||||
|
||||
Wrapped to reduce code duplication and prevent bugs.
|
||||
"""
|
||||
model_group: Optional[str] = kwargs.get("model")
|
||||
fallbacks = kwargs.get("fallbacks", self.fallbacks)
|
||||
context_window_fallbacks = kwargs.get(
|
||||
"context_window_fallbacks", self.context_window_fallbacks
|
||||
)
|
||||
content_policy_fallbacks = kwargs.get(
|
||||
"content_policy_fallbacks", self.content_policy_fallbacks
|
||||
)
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
def run_in_new_loop():
|
||||
"""Run the coroutine in a new event loop within this thread."""
|
||||
new_loop = asyncio.new_event_loop()
|
||||
try:
|
||||
asyncio.set_event_loop(new_loop)
|
||||
return new_loop.run_until_complete(
|
||||
self.async_function_with_fallbacks(*args, **kwargs)
|
||||
)
|
||||
finally:
|
||||
new_loop.close()
|
||||
asyncio.set_event_loop(None)
|
||||
|
||||
try:
|
||||
self._handle_mock_testing_fallbacks(
|
||||
kwargs=kwargs,
|
||||
model_group=model_group,
|
||||
fallbacks=fallbacks,
|
||||
context_window_fallbacks=context_window_fallbacks,
|
||||
content_policy_fallbacks=content_policy_fallbacks,
|
||||
)
|
||||
response = self.function_with_retries(*args, **kwargs)
|
||||
return response
|
||||
except Exception as e:
|
||||
original_exception = e
|
||||
original_model_group: Optional[str] = kwargs.get("model")
|
||||
verbose_router_logger.debug(f"An exception occurs {original_exception}")
|
||||
# First, try to get the current event loop
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
if original_model_group is None:
|
||||
raise e
|
||||
# If we're already in an event loop, run in a separate thread
|
||||
# to avoid nested event loop issues
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(run_in_new_loop)
|
||||
return future.result()
|
||||
|
||||
try:
|
||||
verbose_router_logger.debug(
|
||||
f"Trying to fallback b/w models. Initial model group: {model_group}"
|
||||
)
|
||||
if (
|
||||
isinstance(e, litellm.ContextWindowExceededError)
|
||||
and context_window_fallbacks is not None
|
||||
):
|
||||
fallback_model_group = None
|
||||
|
||||
fallback_model_group: Optional[List[str]] = (
|
||||
self._get_fallback_model_group_from_fallbacks(
|
||||
fallbacks=context_window_fallbacks,
|
||||
model_group=model_group,
|
||||
)
|
||||
)
|
||||
|
||||
if fallback_model_group is None:
|
||||
raise original_exception
|
||||
|
||||
return run_sync_fallback(
|
||||
*args,
|
||||
litellm_router=self,
|
||||
fallback_model_group=fallback_model_group,
|
||||
original_model_group=original_model_group,
|
||||
original_exception=original_exception,
|
||||
**kwargs,
|
||||
)
|
||||
elif (
|
||||
isinstance(e, litellm.ContentPolicyViolationError)
|
||||
and content_policy_fallbacks is not None
|
||||
):
|
||||
fallback_model_group: Optional[List[str]] = (
|
||||
self._get_fallback_model_group_from_fallbacks(
|
||||
fallbacks=content_policy_fallbacks,
|
||||
model_group=model_group,
|
||||
)
|
||||
)
|
||||
|
||||
if fallback_model_group is None:
|
||||
raise original_exception
|
||||
|
||||
return run_sync_fallback(
|
||||
*args,
|
||||
litellm_router=self,
|
||||
fallback_model_group=fallback_model_group,
|
||||
original_model_group=original_model_group,
|
||||
original_exception=original_exception,
|
||||
**kwargs,
|
||||
)
|
||||
elif fallbacks is not None:
|
||||
verbose_router_logger.debug(f"inside model fallbacks: {fallbacks}")
|
||||
fallback_model_group = None
|
||||
generic_fallback_idx: Optional[int] = None
|
||||
for idx, item in enumerate(fallbacks):
|
||||
if isinstance(item, dict):
|
||||
if list(item.keys())[0] == model_group:
|
||||
fallback_model_group = item[model_group]
|
||||
break
|
||||
elif list(item.keys())[0] == "*":
|
||||
generic_fallback_idx = idx
|
||||
elif isinstance(item, str):
|
||||
fallback_model_group = [fallbacks.pop(idx)]
|
||||
## if none, check for generic fallback
|
||||
if (
|
||||
fallback_model_group is None
|
||||
and generic_fallback_idx is not None
|
||||
):
|
||||
fallback_model_group = fallbacks[generic_fallback_idx]["*"]
|
||||
|
||||
if fallback_model_group is None:
|
||||
raise original_exception
|
||||
|
||||
return run_sync_fallback(
|
||||
*args,
|
||||
litellm_router=self,
|
||||
fallback_model_group=fallback_model_group,
|
||||
original_model_group=original_model_group,
|
||||
original_exception=original_exception,
|
||||
**kwargs,
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
raise original_exception
|
||||
except RuntimeError:
|
||||
# No running event loop, we can safely run in this thread
|
||||
return run_in_new_loop()
|
||||
|
||||
def _get_fallback_model_group_from_fallbacks(
|
||||
self,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue