(fix) logging Auth errors on datadog (#6995)

* fix get_standard_logging_object_payload

* fix async_post_call_failure_hook

* fix post_call_failure_hook

* fix change

* fix _is_proxy_only_error

* fix async_post_call_failure_hook

* fix getting request body

* remove redundant code

* use a well named original function name for auth errors

* fix logging auth fails on DD

* fix using request body

* use helper for _handle_logging_proxy_only_error
This commit is contained in:
Ishaan Jaff 2024-12-02 23:01:21 -08:00 committed by GitHub
parent 05aa740a7d
commit 9617e7433d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 121 additions and 413 deletions

View file

@ -365,38 +365,6 @@ class DataDogLogger(CustomBatchLogger):
""" """
return return
async def async_post_call_failure_hook(
self,
request_data: dict,
original_exception: Exception,
user_api_key_dict: UserAPIKeyAuth,
):
"""
Handles Proxy Errors (not-related to LLM API), ex: Authentication Errors
"""
import json
_exception_payload = DatadogProxyFailureHookJsonMessage(
exception=str(original_exception),
error_class=str(original_exception.__class__.__name__),
status_code=getattr(original_exception, "status_code", None),
traceback=traceback.format_exc(),
user_api_key_dict=user_api_key_dict.model_dump(),
)
json_payload = json.dumps(_exception_payload)
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
dd_payload = DatadogPayload(
ddsource=self._get_datadog_source(),
ddtags=self._get_datadog_tags(),
hostname=self._get_datadog_hostname(),
message=json_payload,
service=self._get_datadog_service(),
status=DataDogStatus.ERROR,
)
self.log_queue.append(dd_payload)
def _create_v0_logging_payload( def _create_v0_logging_payload(
self, self,
kwargs: Union[dict, Any], kwargs: Union[dict, Any],

View file

@ -2772,7 +2772,6 @@ def get_standard_logging_object_payload(
if cache_hit is True: if cache_hit is True:
id = f"{id}_cache_hit{time.time()}" # do not duplicate the request id id = f"{id}_cache_hit{time.time()}" # do not duplicate the request id
saved_cache_cost = ( saved_cache_cost = (
logging_obj._response_cost_calculator( logging_obj._response_cost_calculator(
result=init_response_obj, cache_hit=False # type: ignore result=init_response_obj, cache_hit=False # type: ignore

View file

@ -270,8 +270,8 @@ async def user_api_key_auth( # noqa: PLR0915
parent_otel_span: Optional[Span] = None parent_otel_span: Optional[Span] = None
start_time = datetime.now() start_time = datetime.now()
route: str = get_request_route(request=request)
try: try:
route: str = get_request_route(request=request)
# get the request body # get the request body
request_data = await _read_request_body(request=request) request_data = await _read_request_body(request=request)
await pre_db_read_auth_checks( await pre_db_read_auth_checks(
@ -1251,12 +1251,18 @@ async def user_api_key_auth( # noqa: PLR0915
) )
# Log this exception to OTEL, Datadog etc # Log this exception to OTEL, Datadog etc
user_api_key_dict = UserAPIKeyAuth(
parent_otel_span=parent_otel_span,
api_key=api_key,
)
request_data = await _read_request_body(request=request)
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.async_log_proxy_authentication_errors( proxy_logging_obj.post_call_failure_hook(
request_data=request_data,
original_exception=e, original_exception=e,
request=request, user_api_key_dict=user_api_key_dict,
parent_otel_span=parent_otel_span, error_type=ProxyErrorTypes.auth_error,
api_key=api_key, route=route,
) )
) )

View file

@ -87,6 +87,7 @@ from litellm.proxy.hooks.max_budget_limiter import _PROXY_MaxBudgetLimiter
from litellm.proxy.hooks.parallel_request_limiter import ( from litellm.proxy.hooks.parallel_request_limiter import (
_PROXY_MaxParallelRequestsHandler, _PROXY_MaxParallelRequestsHandler,
) )
from litellm.proxy.litellm_pre_call_utils import LiteLLMProxyRequestSetup
from litellm.secret_managers.main import str_to_bool from litellm.secret_managers.main import str_to_bool
from litellm.types.integrations.slack_alerting import DEFAULT_ALERT_TYPES from litellm.types.integrations.slack_alerting import DEFAULT_ALERT_TYPES
from litellm.types.utils import CallTypes, LoggedLiteLLMParams from litellm.types.utils import CallTypes, LoggedLiteLLMParams
@ -750,6 +751,8 @@ class ProxyLogging:
request_data: dict, request_data: dict,
original_exception: Exception, original_exception: Exception,
user_api_key_dict: UserAPIKeyAuth, user_api_key_dict: UserAPIKeyAuth,
error_type: Optional[ProxyErrorTypes] = None,
route: Optional[str] = None,
): ):
""" """
Allows users to raise custom exceptions/log when a call fails, without having to deal with parsing Request body. Allows users to raise custom exceptions/log when a call fails, without having to deal with parsing Request body.
@ -787,87 +790,16 @@ class ProxyLogging:
) )
### LOGGING ### ### LOGGING ###
if isinstance(original_exception, HTTPException): if self._is_proxy_only_error(
litellm_logging_obj: Optional[Logging] = request_data.get( original_exception=original_exception, error_type=error_type
"litellm_logging_obj", None ):
await self._handle_logging_proxy_only_error(
request_data=request_data,
user_api_key_dict=user_api_key_dict,
route=route,
original_exception=original_exception,
) )
if litellm_logging_obj is None:
import uuid
request_data["litellm_call_id"] = str(uuid.uuid4())
litellm_logging_obj, data = litellm.utils.function_setup(
original_function="IGNORE_THIS",
rules_obj=litellm.utils.Rules(),
start_time=datetime.now(),
**request_data,
)
if litellm_logging_obj is not None:
## UPDATE LOGGING INPUT
_optional_params = {}
_litellm_params = {}
litellm_param_keys = LoggedLiteLLMParams.__annotations__.keys()
for k, v in request_data.items():
if k in litellm_param_keys:
_litellm_params[k] = v
elif k != "model" and k != "user":
_optional_params[k] = v
litellm_logging_obj.update_environment_variables(
model=request_data.get("model", ""),
user=request_data.get("user", ""),
optional_params=_optional_params,
litellm_params=_litellm_params,
)
input: Union[list, str, dict] = ""
if "messages" in request_data and isinstance(
request_data["messages"], list
):
input = request_data["messages"]
elif "prompt" in request_data and isinstance(
request_data["prompt"], str
):
input = request_data["prompt"]
elif "input" in request_data and isinstance(
request_data["input"], list
):
input = request_data["input"]
litellm_logging_obj.pre_call(
input=input,
api_key="",
)
# log the custom exception
await litellm_logging_obj.async_failure_handler(
exception=original_exception,
traceback_exception=traceback.format_exc(),
)
threading.Thread(
target=litellm_logging_obj.failure_handler,
args=(
original_exception,
traceback.format_exc(),
),
).start()
await self._run_post_call_failure_hook_custom_loggers(
original_exception=original_exception,
request_data=request_data,
user_api_key_dict=user_api_key_dict,
)
return
async def _run_post_call_failure_hook_custom_loggers(
self,
original_exception: Exception,
request_data: dict,
user_api_key_dict: UserAPIKeyAuth,
):
for callback in litellm.callbacks: for callback in litellm.callbacks:
try: try:
_callback: Optional[CustomLogger] = None _callback: Optional[CustomLogger] = None
@ -885,39 +817,113 @@ class ProxyLogging:
) )
except Exception as e: except Exception as e:
raise e raise e
return
async def async_log_proxy_authentication_errors( def _is_proxy_only_error(
self, self,
original_exception: Exception, original_exception: Exception,
request: Request, error_type: Optional[ProxyErrorTypes] = None,
parent_otel_span: Optional[Any], ) -> bool:
api_key: Optional[str], """
Return True if the error is a Proxy Only Error
Prevents double logging of LLM API exceptions
e.g should only return True for:
- Authentication Errors from user_api_key_auth
- HTTP HTTPException (rate limit errors)
"""
return isinstance(original_exception, HTTPException) or (
error_type == ProxyErrorTypes.auth_error
)
async def _handle_logging_proxy_only_error(
self,
request_data: dict,
user_api_key_dict: UserAPIKeyAuth,
route: Optional[str] = None,
original_exception: Optional[Exception] = None,
): ):
""" """
Handler for Logging Authentication Errors on LiteLLM Proxy Handle logging for proxy only errors by calling `litellm_logging_obj.async_failure_handler`
Why not use post_call_failure_hook?
- `post_call_failure_hook` calls `litellm_logging_obj.async_failure_handler`. This led to the Exception being logged twice
What does this handler do? Is triggered when self._is_proxy_only_error() returns True
- Logs Authentication Errors (like invalid API Key passed) to CustomLogger compatible classes (OTEL, Datadog etc)
- calls CustomLogger.async_post_call_failure_hook
""" """
litellm_logging_obj: Optional[Logging] = request_data.get(
"litellm_logging_obj", None
)
if litellm_logging_obj is None:
import uuid
user_api_key_dict = UserAPIKeyAuth( request_data["litellm_call_id"] = str(uuid.uuid4())
parent_otel_span=parent_otel_span, user_api_key_logged_metadata = (
token=_hash_token_if_needed(token=api_key or ""), LiteLLMProxyRequestSetup.get_sanitized_user_information_from_key(
) user_api_key_dict=user_api_key_dict
try: )
request_data = await request.json() )
except json.JSONDecodeError:
# For GET requests or requests without a JSON body litellm_logging_obj, data = litellm.utils.function_setup(
request_data = {} original_function=route or "IGNORE_THIS",
await self._run_post_call_failure_hook_custom_loggers( rules_obj=litellm.utils.Rules(),
original_exception=original_exception, start_time=datetime.now(),
request_data=request_data, **request_data,
user_api_key_dict=user_api_key_dict, )
) if "metadata" not in request_data:
pass request_data["metadata"] = {}
request_data["metadata"].update(user_api_key_logged_metadata)
if litellm_logging_obj is not None:
## UPDATE LOGGING INPUT
_optional_params = {}
_litellm_params = {}
litellm_param_keys = LoggedLiteLLMParams.__annotations__.keys()
for k, v in request_data.items():
if k in litellm_param_keys:
_litellm_params[k] = v
elif k != "model" and k != "user":
_optional_params[k] = v
litellm_logging_obj.update_environment_variables(
model=request_data.get("model", ""),
user=request_data.get("user", ""),
optional_params=_optional_params,
litellm_params=_litellm_params,
)
input: Union[list, str, dict] = ""
if "messages" in request_data and isinstance(
request_data["messages"], list
):
input = request_data["messages"]
litellm_logging_obj.model_call_details["messages"] = input
litellm_logging_obj.call_type = CallTypes.acompletion.value
elif "prompt" in request_data and isinstance(request_data["prompt"], str):
input = request_data["prompt"]
litellm_logging_obj.model_call_details["prompt"] = input
litellm_logging_obj.call_type = CallTypes.atext_completion.value
elif "input" in request_data and isinstance(request_data["input"], list):
input = request_data["input"]
litellm_logging_obj.model_call_details["input"] = input
litellm_logging_obj.call_type = CallTypes.aembedding.value
litellm_logging_obj.pre_call(
input=input,
api_key="",
)
# log the custom exception
await litellm_logging_obj.async_failure_handler(
exception=original_exception,
traceback_exception=traceback.format_exc(),
)
threading.Thread(
target=litellm_logging_obj.failure_handler,
args=(
original_exception,
traceback.format_exc(),
),
).start()
async def post_call_success_hook( async def post_call_success_hook(
self, self,

View file

@ -346,84 +346,6 @@ async def test_datadog_logging():
print(e) print(e)
@pytest.mark.asyncio
async def test_datadog_post_call_failure_hook():
"""Test logging proxy failures (e.g., authentication errors) to DataDog"""
try:
from litellm.integrations.datadog.datadog import DataDogLogger
os.environ["DD_SITE"] = "https://fake.datadoghq.com"
os.environ["DD_API_KEY"] = "anything"
dd_logger = DataDogLogger()
# Create a mock for the async_client's post method
mock_post = AsyncMock()
mock_post.return_value.status_code = 202
mock_post.return_value.text = "Accepted"
dd_logger.async_client.post = mock_post
# Create a test exception
class AuthenticationError(Exception):
def __init__(self):
self.status_code = 401
super().__init__("Invalid API key")
test_exception = AuthenticationError()
# Create test request data and user API key dict
request_data = {
"model": "gpt-4",
"messages": [{"role": "user", "content": "Hello"}],
}
user_api_key_dict = UserAPIKeyAuth(
api_key="fake_key", user_id="test_user", team_id="test_team"
)
# Call the failure hook
await dd_logger.async_post_call_failure_hook(
request_data=request_data,
original_exception=test_exception,
user_api_key_dict=user_api_key_dict,
)
# Wait for the periodic flush
await asyncio.sleep(6)
# Assert that the mock was called
assert mock_post.called, "HTTP request was not made"
# Get the arguments of the last call
args, kwargs = mock_post.call_args
# Verify endpoint
assert kwargs["url"].endswith("/api/v2/logs"), "Incorrect DataDog endpoint"
# Decode and verify payload
body = kwargs["data"]
with gzip.open(io.BytesIO(body), "rb") as f:
body = f.read().decode("utf-8")
body = json.loads(body)
assert len(body) == 1, "Expected one log entry"
log_entry = body[0]
assert log_entry["status"] == "error", "Expected error status"
assert log_entry["service"] == "litellm-server"
# Verify message content
message = json.loads(log_entry["message"])
print("logged message", json.dumps(message, indent=2))
assert message["exception"] == "Invalid API key"
assert message["error_class"] == "AuthenticationError"
assert message["status_code"] == 401
assert "traceback" in message
assert message["user_api_key_dict"]["api_key"] == "fake_key"
except Exception as e:
pytest.fail(f"Test failed with exception: {str(e)}")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_datadog_payload_environment_variables(): async def test_datadog_payload_environment_variables():
"""Test that DataDog payload correctly includes environment variables in the payload structure""" """Test that DataDog payload correctly includes environment variables in the payload structure"""

View file

@ -2125,196 +2125,3 @@ async def test_proxy_server_prisma_setup_invalid_db():
if _old_db_url: if _old_db_url:
os.environ["DATABASE_URL"] = _old_db_url os.environ["DATABASE_URL"] = _old_db_url
@pytest.mark.asyncio
async def test_async_log_proxy_authentication_errors():
"""
Test if async_log_proxy_authentication_errors correctly logs authentication errors through custom loggers
"""
import json
from fastapi import Request
from litellm.proxy.utils import ProxyLogging
from litellm.caching import DualCache
from litellm.integrations.custom_logger import CustomLogger
# Create a mock custom logger to verify it's called
class MockCustomLogger(CustomLogger):
def __init__(self):
self.called = False
self.exception_logged = None
self.request_data_logged = None
self.user_api_key_dict_logged = None
async def async_post_call_failure_hook(
self,
request_data: dict,
original_exception: Exception,
user_api_key_dict: UserAPIKeyAuth,
):
self.called = True
self.exception_logged = original_exception
self.request_data_logged = request_data
print("logged request_data", request_data)
if isinstance(request_data, AsyncMock):
self.request_data_logged = (
await request_data()
) # get the actual value from AsyncMock
else:
self.request_data_logged = request_data
self.user_api_key_dict_logged = user_api_key_dict
# Create test data
test_data = {"model": "gpt-4", "messages": [{"role": "user", "content": "Hello"}]}
# Create a mock request
request = Request(scope={"type": "http", "method": "POST"})
request._json = AsyncMock(return_value=test_data)
# Create a test exception
test_exception = Exception("Invalid API Key")
# Initialize ProxyLogging
mock_logger = MockCustomLogger()
litellm.callbacks = [mock_logger]
proxy_logging_obj = ProxyLogging(user_api_key_cache=DualCache())
# Call the method
await proxy_logging_obj.async_log_proxy_authentication_errors(
original_exception=test_exception,
request=request,
parent_otel_span=None,
api_key="test-key",
)
# Verify the mock logger was called with correct parameters
assert mock_logger.called == True
assert mock_logger.exception_logged == test_exception
assert mock_logger.request_data_logged == test_data
assert mock_logger.user_api_key_dict_logged is not None
assert (
mock_logger.user_api_key_dict_logged.token is not None
) # token should be hashed
@pytest.mark.asyncio
async def test_async_log_proxy_authentication_errors_get_request():
"""
Test if async_log_proxy_authentication_errors correctly handles GET requests
that don't have a JSON body
"""
import json
from fastapi import Request
from litellm.proxy.utils import ProxyLogging
from litellm.caching import DualCache
from litellm.integrations.custom_logger import CustomLogger
class MockCustomLogger(CustomLogger):
def __init__(self):
self.called = False
self.exception_logged = None
self.request_data_logged = None
self.user_api_key_dict_logged = None
async def async_post_call_failure_hook(
self,
request_data: dict,
original_exception: Exception,
user_api_key_dict: UserAPIKeyAuth,
):
self.called = True
self.exception_logged = original_exception
self.request_data_logged = request_data
self.user_api_key_dict_logged = user_api_key_dict
# Create a mock GET request
request = Request(scope={"type": "http", "method": "GET"})
# Mock the json() method to raise JSONDecodeError
async def mock_json():
raise json.JSONDecodeError("Expecting value", "", 0)
request.json = mock_json
# Create a test exception
test_exception = Exception("Invalid API Key")
# Initialize ProxyLogging
mock_logger = MockCustomLogger()
litellm.callbacks = [mock_logger]
proxy_logging_obj = ProxyLogging(user_api_key_cache=DualCache())
# Call the method
await proxy_logging_obj.async_log_proxy_authentication_errors(
original_exception=test_exception,
request=request,
parent_otel_span=None,
api_key="test-key",
)
# Verify the mock logger was called with correct parameters
assert mock_logger.called == True
assert mock_logger.exception_logged == test_exception
assert mock_logger.user_api_key_dict_logged is not None
assert mock_logger.user_api_key_dict_logged.token is not None
@pytest.mark.asyncio
async def test_async_log_proxy_authentication_errors_no_api_key():
"""
Test if async_log_proxy_authentication_errors correctly handles requests
with no API key provided
"""
from fastapi import Request
from litellm.proxy.utils import ProxyLogging
from litellm.caching import DualCache
from litellm.integrations.custom_logger import CustomLogger
class MockCustomLogger(CustomLogger):
def __init__(self):
self.called = False
self.exception_logged = None
self.request_data_logged = None
self.user_api_key_dict_logged = None
async def async_post_call_failure_hook(
self,
request_data: dict,
original_exception: Exception,
user_api_key_dict: UserAPIKeyAuth,
):
self.called = True
self.exception_logged = original_exception
self.request_data_logged = request_data
self.user_api_key_dict_logged = user_api_key_dict
# Create test data
test_data = {"model": "gpt-4", "messages": [{"role": "user", "content": "Hello"}]}
# Create a mock request
request = Request(scope={"type": "http", "method": "POST"})
request._json = AsyncMock(return_value=test_data)
# Create a test exception
test_exception = Exception("No API Key Provided")
# Initialize ProxyLogging
mock_logger = MockCustomLogger()
litellm.callbacks = [mock_logger]
proxy_logging_obj = ProxyLogging(user_api_key_cache=DualCache())
# Call the method with api_key=None
await proxy_logging_obj.async_log_proxy_authentication_errors(
original_exception=test_exception,
request=request,
parent_otel_span=None,
api_key=None,
)
# Verify the mock logger was called with correct parameters
assert mock_logger.called == True
assert mock_logger.exception_logged == test_exception
assert mock_logger.user_api_key_dict_logged is not None
assert (
mock_logger.user_api_key_dict_logged.token == ""
) # Empty token for no API key