(Refactor) /v1/messages to follow simpler logic for Anthropic API spec (#9013)

* anthropic_messages_handler v0

* fix /messages

* working messages with router methods

* test_anthropic_messages_handler_litellm_router_non_streaming

* test_anthropic_messages_litellm_router_non_streaming_with_logging

* AnthropicMessagesConfig

* _handle_anthropic_messages_response_logging

* working with /v1/messages endpoint

* working /v1/messages endpoint

* refactor to use router factory function

* use aanthropic_messages

* use BaseConfig for Anthropic /v1/messages

* track api key, team on /v1/messages endpoint

* fix get_logging_payload

* BaseAnthropicMessagesTest

* align test config

* test_anthropic_messages_with_thinking

* test_anthropic_streaming_with_thinking

* fix - display anthropic url for debugging

* test_bad_request_error_handling

* test_anthropic_messages_router_streaming_with_bad_request

* fix ProxyException

* test_bad_request_error_handling_streaming

* use provider_specific_header

* test_anthropic_messages_with_extra_headers

* test_anthropic_messages_to_wildcard_model

* fix gcs pub sub test

* standard_logging_payload

* fix unit testing for anthopic /v1/messages support

* fix pass through anthropic messages api

* delete dead code

* fix anthropic pass through response

* revert change to spend tracking utils

* fix get_litellm_metadata_from_kwargs

* fix spend logs payload json

* proxy_pass_through_endpoint_tests

* TestAnthropicPassthroughBasic

* fix pass through tests

* test_async_vertex_proxy_route_api_key_auth

* _handle_anthropic_messages_response_logging

* vertex_credentials

* test_set_default_vertex_config

* test_anthropic_messages_litellm_router_non_streaming_with_logging

* test_ageneric_api_call_with_fallbacks_basic

* test__aadapter_completion
This commit is contained in:
Ishaan Jaff 2025-03-06 00:43:08 -08:00 committed by GitHub
parent 5ab29de9d1
commit 84a83f8c51
25 changed files with 1581 additions and 1027 deletions

View file

@ -580,6 +580,9 @@ class Router:
self.amoderation = self.factory_function(
litellm.amoderation, call_type="moderation"
)
self.aanthropic_messages = self.factory_function(
litellm.anthropic_messages, call_type="anthropic_messages"
)
def discard(self):
"""
@ -2349,6 +2352,89 @@ class Router:
self.fail_calls[model] += 1
raise e
async def _ageneric_api_call_with_fallbacks(
self, model: str, original_function: Callable, **kwargs
):
"""
Make a generic LLM API call through the router, this allows you to use retries/fallbacks with litellm router
Args:
model: The model to use
handler_function: The handler function to call (e.g., litellm.anthropic_messages)
**kwargs: Additional arguments to pass to the handler function
Returns:
The response from the handler function
"""
handler_name = original_function.__name__
try:
verbose_router_logger.debug(
f"Inside _ageneric_api_call() - handler: {handler_name}, model: {model}; kwargs: {kwargs}"
)
parent_otel_span = _get_parent_otel_span_from_kwargs(kwargs)
deployment = await self.async_get_available_deployment(
model=model,
request_kwargs=kwargs,
messages=kwargs.get("messages", None),
specific_deployment=kwargs.pop("specific_deployment", None),
)
self._update_kwargs_with_deployment(deployment=deployment, kwargs=kwargs)
data = deployment["litellm_params"].copy()
model_name = data["model"]
model_client = self._get_async_openai_model_client(
deployment=deployment,
kwargs=kwargs,
)
self.total_calls[model_name] += 1
response = original_function(
**{
**data,
"caching": self.cache_responses,
"client": model_client,
**kwargs,
}
)
rpm_semaphore = self._get_client(
deployment=deployment,
kwargs=kwargs,
client_type="max_parallel_requests",
)
if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
):
async with rpm_semaphore:
"""
- Check rpm limits before making the call
- If allowed, increment the rpm limit (allows global value to be updated, concurrency-safe)
"""
await self.async_routing_strategy_pre_call_checks(
deployment=deployment, parent_otel_span=parent_otel_span
)
response = await response # type: ignore
else:
await self.async_routing_strategy_pre_call_checks(
deployment=deployment, parent_otel_span=parent_otel_span
)
response = await response # type: ignore
self.success_calls[model_name] += 1
verbose_router_logger.info(
f"{handler_name}(model={model_name})\033[32m 200 OK\033[0m"
)
return response
except Exception as e:
verbose_router_logger.info(
f"{handler_name}(model={model})\033[31m Exception {str(e)}\033[0m"
)
if model is not None:
self.fail_calls[model] += 1
raise e
def embedding(
self,
model: str,
@ -2869,10 +2955,14 @@ class Router:
def factory_function(
self,
original_function: Callable,
call_type: Literal["assistants", "moderation"] = "assistants",
call_type: Literal[
"assistants", "moderation", "anthropic_messages"
] = "assistants",
):
async def new_function(
custom_llm_provider: Optional[Literal["openai", "azure"]] = None,
custom_llm_provider: Optional[
Literal["openai", "azure", "anthropic"]
] = None,
client: Optional["AsyncOpenAI"] = None,
**kwargs,
):
@ -2889,13 +2979,18 @@ class Router:
original_function=original_function,
**kwargs,
)
elif call_type == "anthropic_messages":
return await self._ageneric_api_call_with_fallbacks( # type: ignore
original_function=original_function,
**kwargs,
)
return new_function
async def _pass_through_assistants_endpoint_factory(
self,
original_function: Callable,
custom_llm_provider: Optional[Literal["openai", "azure"]] = None,
custom_llm_provider: Optional[Literal["openai", "azure", "anthropic"]] = None,
client: Optional[AsyncOpenAI] = None,
**kwargs,
):