forked from phoenix/litellm-mirror
* feat(router.py): add check for max fallback depth Prevent infinite loop for fallbacks Closes https://github.com/BerriAI/litellm/issues/6498 * test: update test * (fix) Prometheus - Log Postgres DB latency, status on prometheus (#6484) * fix logging DB fails on prometheus * unit testing log to otel wrapper * unit testing for service logger + prometheus * use LATENCY buckets for service logging * fix service logging * docs clarify vertex vs gemini * (router_strategy/) ensure all async functions use async cache methods (#6489) * fix router strat * use async set / get cache in router_strategy * add coverage for router strategy * fix imports * fix batch_get_cache * use async methods for least busy * fix least busy use async methods * fix test_dual_cache_increment * test async_get_available_deployment when routing_strategy="least-busy" * (fix) proxy - fix when `STORE_MODEL_IN_DB` should be set (#6492) * set store_model_in_db at the top * correctly use store_model_in_db global * (fix) `PrometheusServicesLogger` `_get_metric` should return metric in Registry (#6486) * fix logging DB fails on prometheus * unit testing log to otel wrapper * unit testing for service logger + prometheus * use LATENCY buckets for service logging * fix service logging * fix _get_metric in prom services logger * add clear doc string * unit testing for prom service logger * bump: version 1.51.0 → 1.51.1 * Add `azure/gpt-4o-mini-2024-07-18` to model_prices_and_context_window.json (#6477) * Update utils.py (#6468) Fixed missing keys * (perf) Litellm redis router fix - ~100ms improvement (#6483) * docs(exception_mapping.md): add missing exception types Fixes https://github.com/Aider-AI/aider/issues/2120#issuecomment-2438971183 * fix(main.py): register custom model pricing with specific key Ensure custom model pricing is registered to the specific model+provider key combination * test: make testing more robust for custom pricing * fix(redis_cache.py): instrument otel logging for sync redis calls ensures complete coverage for all redis cache calls * refactor: pass parent_otel_span for redis caching calls in router allows for more observability into what calls are causing latency issues * test: update tests with new params * refactor: ensure e2e otel tracing for router * refactor(router.py): add more otel tracing acrosss router catch all latency issues for router requests * fix: fix linting error * fix(router.py): fix linting error * fix: fix test * test: fix tests * fix(dual_cache.py): pass ttl to redis cache * fix: fix param * perf(cooldown_cache.py): improve cooldown cache, to store cache results in memory for 5s, prevents redis call from being made on each request reduces 100ms latency per call with caching enabled on router * fix: fix test * fix(cooldown_cache.py): handle if a result is None * fix(cooldown_cache.py): add debug statements * refactor(dual_cache.py): move to using an in-memory check for batch get cache, to prevent redis from being hit for every call * fix(cooldown_cache.py): fix linting erropr * build: merge main --------- Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com> Co-authored-by: Xingyao Wang <xingyao@all-hands.dev> Co-authored-by: vibhanshu-ob <115142120+vibhanshu-ob@users.noreply.github.com>
357 lines
11 KiB
Python
357 lines
11 KiB
Python
import asyncio
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(
|
|
0, os.path.abspath("../..")
|
|
) # Adds the parent directory to the system path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import litellm
|
|
from litellm import Router
|
|
from litellm.integrations.custom_logger import CustomLogger
|
|
from typing import Any, Dict
|
|
|
|
|
|
import sys
|
|
import os
|
|
from typing import List, Dict
|
|
|
|
sys.path.insert(0, os.path.abspath("../.."))
|
|
|
|
from litellm.router_utils.fallback_event_handlers import (
|
|
run_async_fallback,
|
|
run_sync_fallback,
|
|
log_success_fallback_event,
|
|
log_failure_fallback_event,
|
|
)
|
|
|
|
|
|
# Helper function to create a Router instance
|
|
def create_test_router():
|
|
return Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gpt-3.5-turbo",
|
|
"litellm_params": {
|
|
"model": "gpt-3.5-turbo",
|
|
"api_key": os.getenv("OPENAI_API_KEY"),
|
|
},
|
|
},
|
|
{
|
|
"model_name": "gpt-4",
|
|
"litellm_params": {
|
|
"model": "gpt-4",
|
|
"api_key": os.getenv("OPENAI_API_KEY"),
|
|
},
|
|
},
|
|
],
|
|
fallbacks=[{"gpt-3.5-turbo": ["gpt-4"]}],
|
|
)
|
|
|
|
|
|
router: Router = create_test_router()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"original_function",
|
|
[router._acompletion, router._atext_completion, router._aembedding],
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_run_async_fallback(original_function):
|
|
"""
|
|
Basic test - given a list of fallback models, run the original function with the fallback models
|
|
"""
|
|
litellm.set_verbose = True
|
|
fallback_model_group = ["gpt-4"]
|
|
original_model_group = "gpt-3.5-turbo"
|
|
original_exception = litellm.exceptions.InternalServerError(
|
|
message="Simulated error",
|
|
llm_provider="openai",
|
|
model="gpt-3.5-turbo",
|
|
)
|
|
|
|
request_kwargs = {
|
|
"mock_response": "hello this is a test for run_async_fallback",
|
|
"metadata": {"previous_models": ["gpt-3.5-turbo"]},
|
|
}
|
|
|
|
if original_function == router._aembedding:
|
|
request_kwargs["input"] = "hello this is a test for run_async_fallback"
|
|
elif original_function == router._atext_completion:
|
|
request_kwargs["prompt"] = "hello this is a test for run_async_fallback"
|
|
elif original_function == router._acompletion:
|
|
request_kwargs["messages"] = [{"role": "user", "content": "Hello, world!"}]
|
|
|
|
result = await run_async_fallback(
|
|
litellm_router=router,
|
|
original_function=original_function,
|
|
num_retries=1,
|
|
fallback_model_group=fallback_model_group,
|
|
original_model_group=original_model_group,
|
|
original_exception=original_exception,
|
|
max_fallbacks=5,
|
|
fallback_depth=0,
|
|
**request_kwargs
|
|
)
|
|
|
|
assert result is not None
|
|
|
|
if original_function == router._acompletion:
|
|
assert isinstance(result, litellm.ModelResponse)
|
|
elif original_function == router._atext_completion:
|
|
assert isinstance(result, litellm.TextCompletionResponse)
|
|
elif original_function == router._aembedding:
|
|
assert isinstance(result, litellm.EmbeddingResponse)
|
|
|
|
|
|
@pytest.mark.parametrize("original_function", [router._completion, router._embedding])
|
|
def test_run_sync_fallback(original_function):
|
|
litellm.set_verbose = True
|
|
fallback_model_group = ["gpt-4"]
|
|
original_model_group = "gpt-3.5-turbo"
|
|
original_exception = litellm.exceptions.InternalServerError(
|
|
message="Simulated error",
|
|
llm_provider="openai",
|
|
model="gpt-3.5-turbo",
|
|
)
|
|
|
|
request_kwargs = {
|
|
"mock_response": "hello this is a test for run_async_fallback",
|
|
"metadata": {"previous_models": ["gpt-3.5-turbo"]},
|
|
}
|
|
|
|
if original_function == router._embedding:
|
|
request_kwargs["input"] = "hello this is a test for run_async_fallback"
|
|
elif original_function == router._completion:
|
|
request_kwargs["messages"] = [{"role": "user", "content": "Hello, world!"}]
|
|
result = run_sync_fallback(
|
|
router,
|
|
original_function=original_function,
|
|
num_retries=1,
|
|
fallback_model_group=fallback_model_group,
|
|
original_model_group=original_model_group,
|
|
original_exception=original_exception,
|
|
**request_kwargs
|
|
)
|
|
|
|
assert result is not None
|
|
|
|
if original_function == router._completion:
|
|
assert isinstance(result, litellm.ModelResponse)
|
|
elif original_function == router._embedding:
|
|
assert isinstance(result, litellm.EmbeddingResponse)
|
|
|
|
|
|
class CustomTestLogger(CustomLogger):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.success_fallback_events = []
|
|
self.failure_fallback_events = []
|
|
|
|
async def log_success_fallback_event(
|
|
self, original_model_group, kwargs, original_exception
|
|
):
|
|
print(
|
|
"in log_success_fallback_event for original_model_group: ",
|
|
original_model_group,
|
|
)
|
|
self.success_fallback_events.append(
|
|
(original_model_group, kwargs, original_exception)
|
|
)
|
|
|
|
async def log_failure_fallback_event(
|
|
self, original_model_group, kwargs, original_exception
|
|
):
|
|
print(
|
|
"in log_failure_fallback_event for original_model_group: ",
|
|
original_model_group,
|
|
)
|
|
self.failure_fallback_events.append(
|
|
(original_model_group, kwargs, original_exception)
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_log_success_fallback_event():
|
|
"""
|
|
Tests that successful fallback events are logged correctly
|
|
"""
|
|
original_model_group = "gpt-3.5-turbo"
|
|
kwargs = {"messages": [{"role": "user", "content": "Hello, world!"}]}
|
|
original_exception = litellm.exceptions.InternalServerError(
|
|
message="Simulated error",
|
|
llm_provider="openai",
|
|
model="gpt-3.5-turbo",
|
|
)
|
|
|
|
logger = CustomTestLogger()
|
|
litellm.callbacks = [logger]
|
|
|
|
# This test mainly checks if the function runs without errors
|
|
await log_success_fallback_event(original_model_group, kwargs, original_exception)
|
|
|
|
await asyncio.sleep(0.5)
|
|
assert len(logger.success_fallback_events) == 1
|
|
assert len(logger.failure_fallback_events) == 0
|
|
assert logger.success_fallback_events[0] == (
|
|
original_model_group,
|
|
kwargs,
|
|
original_exception,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_log_failure_fallback_event():
|
|
"""
|
|
Tests that failed fallback events are logged correctly
|
|
"""
|
|
original_model_group = "gpt-3.5-turbo"
|
|
kwargs = {"messages": [{"role": "user", "content": "Hello, world!"}]}
|
|
original_exception = litellm.exceptions.InternalServerError(
|
|
message="Simulated error",
|
|
llm_provider="openai",
|
|
model="gpt-3.5-turbo",
|
|
)
|
|
|
|
logger = CustomTestLogger()
|
|
litellm.callbacks = [logger]
|
|
|
|
# This test mainly checks if the function runs without errors
|
|
await log_failure_fallback_event(original_model_group, kwargs, original_exception)
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
assert len(logger.failure_fallback_events) == 1
|
|
assert len(logger.success_fallback_events) == 0
|
|
assert logger.failure_fallback_events[0] == (
|
|
original_model_group,
|
|
kwargs,
|
|
original_exception,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"original_function", [router._acompletion, router._atext_completion]
|
|
)
|
|
async def test_failed_fallbacks_raise_most_recent_exception(original_function):
|
|
"""
|
|
Tests that if all fallbacks fail, the most recent occuring exception is raised
|
|
|
|
meaning the exception from the last fallback model is raised
|
|
"""
|
|
fallback_model_group = ["gpt-4"]
|
|
original_model_group = "gpt-3.5-turbo"
|
|
original_exception = litellm.exceptions.InternalServerError(
|
|
message="Simulated error",
|
|
llm_provider="openai",
|
|
model="gpt-3.5-turbo",
|
|
)
|
|
|
|
request_kwargs: Dict[str, Any] = {
|
|
"metadata": {"previous_models": ["gpt-3.5-turbo"]}
|
|
}
|
|
|
|
if original_function == router._aembedding:
|
|
request_kwargs["input"] = "hello this is a test for run_async_fallback"
|
|
elif original_function == router._atext_completion:
|
|
request_kwargs["prompt"] = "hello this is a test for run_async_fallback"
|
|
elif original_function == router._acompletion:
|
|
request_kwargs["messages"] = [{"role": "user", "content": "Hello, world!"}]
|
|
|
|
with pytest.raises(litellm.exceptions.RateLimitError):
|
|
await run_async_fallback(
|
|
litellm_router=router,
|
|
original_function=original_function,
|
|
num_retries=1,
|
|
fallback_model_group=fallback_model_group,
|
|
original_model_group=original_model_group,
|
|
original_exception=original_exception,
|
|
mock_response="litellm.RateLimitError",
|
|
max_fallbacks=5,
|
|
fallback_depth=0,
|
|
**request_kwargs
|
|
)
|
|
|
|
|
|
router_2 = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gpt-3.5-turbo",
|
|
"litellm_params": {
|
|
"model": "gpt-3.5-turbo",
|
|
"api_key": os.getenv("OPENAI_API_KEY"),
|
|
},
|
|
},
|
|
{
|
|
"model_name": "gpt-4",
|
|
"litellm_params": {
|
|
"model": "gpt-4",
|
|
"api_key": "very-fake-key",
|
|
},
|
|
},
|
|
{
|
|
"model_name": "fake-openai-endpoint-2",
|
|
"litellm_params": {
|
|
"model": "openai/fake-openai-endpoint-2",
|
|
"api_key": "working-key-since-this-is-fake-endpoint",
|
|
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
|
|
},
|
|
},
|
|
],
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"original_function", [router_2._acompletion, router_2._atext_completion]
|
|
)
|
|
async def test_multiple_fallbacks(original_function):
|
|
"""
|
|
Tests that if multiple fallbacks passed:
|
|
- fallback 1 = bad configured deployment / failing endpoint
|
|
- fallback 2 = working deployment / working endpoint
|
|
|
|
Assert that:
|
|
- a success response is received from the working endpoint (fallback 2)
|
|
"""
|
|
fallback_model_group = ["gpt-4", "fake-openai-endpoint-2"]
|
|
original_model_group = "gpt-3.5-turbo"
|
|
original_exception = Exception("Simulated error")
|
|
|
|
request_kwargs: Dict[str, Any] = {
|
|
"metadata": {"previous_models": ["gpt-3.5-turbo"]}
|
|
}
|
|
|
|
if original_function == router_2._aembedding:
|
|
request_kwargs["input"] = "hello this is a test for run_async_fallback"
|
|
elif original_function == router_2._atext_completion:
|
|
request_kwargs["prompt"] = "hello this is a test for run_async_fallback"
|
|
elif original_function == router_2._acompletion:
|
|
request_kwargs["messages"] = [{"role": "user", "content": "Hello, world!"}]
|
|
|
|
result = await run_async_fallback(
|
|
litellm_router=router_2,
|
|
original_function=original_function,
|
|
num_retries=1,
|
|
fallback_model_group=fallback_model_group,
|
|
original_model_group=original_model_group,
|
|
original_exception=original_exception,
|
|
max_fallbacks=5,
|
|
fallback_depth=0,
|
|
**request_kwargs
|
|
)
|
|
|
|
print(result)
|
|
|
|
print(result._hidden_params)
|
|
|
|
assert (
|
|
result._hidden_params["api_base"]
|
|
== "https://exampleopenaiendpoint-production.up.railway.app/"
|
|
)
|