litellm/litellm/proxy/pass_through_endpoints/success_handler.py
Krish Dholakia 7e9d8b58f6
LiteLLM Minor Fixes & Improvements (11/23/2024) (#6870)
* feat(pass_through_endpoints/): support logging anthropic/gemini pass through calls to langfuse/s3/etc.

* fix(utils.py): allow disabling end user cost tracking with new param

Allows proxy admin to disable cost tracking for end user - keeps prometheus metrics small

* docs(configs.md): add disable_end_user_cost_tracking reference to docs

* feat(key_management_endpoints.py): add support for restricting access to `/key/generate` by team/proxy level role

Enables admin to restrict key creation, and assign team admins to handle distributing keys

* test(test_key_management.py): add unit testing for personal / team key restriction checks

* docs: add docs on restricting key creation

* docs(finetuned_models.md): add new guide on calling finetuned models

* docs(input.md): cleanup anthropic supported params

Closes https://github.com/BerriAI/litellm/issues/6856

* test(test_embedding.py): add test for passing extra headers via embedding

* feat(cohere/embed): pass client to async embedding

* feat(rerank.py): add `/v1/rerank` if missing for cohere base url

Closes https://github.com/BerriAI/litellm/issues/6844

* fix(main.py): pass extra_headers param to openai

Fixes https://github.com/BerriAI/litellm/issues/6836

* fix(litellm_logging.py): don't disable global callbacks when dynamic callbacks are set

Fixes issue where global callbacks - e.g. prometheus were overriden when langfuse was set dynamically

* fix(handler.py): fix linting error

* fix: fix typing

* build: add conftest to proxy_admin_ui_tests/

* test: fix test

* fix: fix linting errors

* test: fix test

* fix: fix pass through testing
2024-11-23 15:17:40 +05:30

127 lines
4.3 KiB
Python

import json
import re
import threading
from datetime import datetime
from typing import Optional, Union
import httpx
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.litellm_core_utils.litellm_logging import (
get_standard_logging_object_payload,
)
from litellm.llms.vertex_ai_and_google_ai_studio.gemini.vertex_and_google_ai_studio_gemini import (
VertexLLM,
)
from litellm.proxy._types import PassThroughEndpointLoggingResultValues
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.types.utils import StandardPassThroughResponseObject
from .llm_provider_handlers.anthropic_passthrough_logging_handler import (
AnthropicPassthroughLoggingHandler,
)
from .llm_provider_handlers.vertex_passthrough_logging_handler import (
VertexPassthroughLoggingHandler,
)
class PassThroughEndpointLogging:
def __init__(self):
self.TRACKED_VERTEX_ROUTES = [
"generateContent",
"streamGenerateContent",
"predict",
]
# Anthropic
self.TRACKED_ANTHROPIC_ROUTES = ["/messages"]
async def pass_through_async_success_handler(
self,
httpx_response: httpx.Response,
response_body: Optional[dict],
logging_obj: LiteLLMLoggingObj,
url_route: str,
result: str,
start_time: datetime,
end_time: datetime,
cache_hit: bool,
**kwargs,
):
standard_logging_response_object: Optional[
PassThroughEndpointLoggingResultValues
] = None
if self.is_vertex_route(url_route):
vertex_passthrough_logging_handler_result = (
VertexPassthroughLoggingHandler.vertex_passthrough_handler(
httpx_response=httpx_response,
logging_obj=logging_obj,
url_route=url_route,
result=result,
start_time=start_time,
end_time=end_time,
cache_hit=cache_hit,
**kwargs,
)
)
standard_logging_response_object = (
vertex_passthrough_logging_handler_result["result"]
)
kwargs = vertex_passthrough_logging_handler_result["kwargs"]
elif self.is_anthropic_route(url_route):
anthropic_passthrough_logging_handler_result = (
AnthropicPassthroughLoggingHandler.anthropic_passthrough_handler(
httpx_response=httpx_response,
response_body=response_body or {},
logging_obj=logging_obj,
url_route=url_route,
result=result,
start_time=start_time,
end_time=end_time,
cache_hit=cache_hit,
**kwargs,
)
)
standard_logging_response_object = (
anthropic_passthrough_logging_handler_result["result"]
)
kwargs = anthropic_passthrough_logging_handler_result["kwargs"]
if standard_logging_response_object is None:
standard_logging_response_object = StandardPassThroughResponseObject(
response=httpx_response.text
)
threading.Thread(
target=logging_obj.success_handler,
args=(
standard_logging_response_object,
start_time,
end_time,
cache_hit,
),
).start()
await logging_obj.async_success_handler(
result=(
json.dumps(result)
if isinstance(result, dict)
else standard_logging_response_object
),
start_time=start_time,
end_time=end_time,
cache_hit=False,
**kwargs,
)
def is_vertex_route(self, url_route: str):
for route in self.TRACKED_VERTEX_ROUTES:
if route in url_route:
return True
return False
def is_anthropic_route(self, url_route: str):
for route in self.TRACKED_ANTHROPIC_ROUTES:
if route in url_route:
return True
return False