litellm-mirror/litellm/llms/azure/common_utils.py
Ishaan Jaff 62a1cdec47 (code quality) run ruff rule to ban unused imports (#7313)
* remove unused imports

* fix AmazonConverseConfig

* fix test

* fix import

* ruff check fixes

* test fixes

* fix testing

* fix imports
2024-12-19 12:33:42 -08:00

104 lines
3.2 KiB
Python

from typing import Callable, Optional, Union
import httpx
from litellm._logging import verbose_logger
from litellm.llms.base_llm.chat.transformation import BaseLLMException
from litellm.secret_managers.main import get_secret_str
class AzureOpenAIError(BaseLLMException):
def __init__(
self,
status_code,
message,
request: Optional[httpx.Request] = None,
response: Optional[httpx.Response] = None,
headers: Optional[Union[httpx.Headers, dict]] = None,
):
super().__init__(
status_code=status_code,
message=message,
request=request,
response=response,
headers=headers,
)
def process_azure_headers(headers: Union[httpx.Headers, dict]) -> dict:
openai_headers = {}
if "x-ratelimit-limit-requests" in headers:
openai_headers["x-ratelimit-limit-requests"] = headers[
"x-ratelimit-limit-requests"
]
if "x-ratelimit-remaining-requests" in headers:
openai_headers["x-ratelimit-remaining-requests"] = headers[
"x-ratelimit-remaining-requests"
]
if "x-ratelimit-limit-tokens" in headers:
openai_headers["x-ratelimit-limit-tokens"] = headers["x-ratelimit-limit-tokens"]
if "x-ratelimit-remaining-tokens" in headers:
openai_headers["x-ratelimit-remaining-tokens"] = headers[
"x-ratelimit-remaining-tokens"
]
llm_response_headers = {
"{}-{}".format("llm_provider", k): v for k, v in headers.items()
}
return {**llm_response_headers, **openai_headers}
def get_azure_ad_token_from_entrata_id(
tenant_id: str,
client_id: str,
client_secret: str,
scope: str = "https://cognitiveservices.azure.com/.default",
) -> Callable[[], str]:
"""
Get Azure AD token provider from `client_id`, `client_secret`, and `tenant_id`
Args:
tenant_id: str
client_id: str
client_secret: str
scope: str
Returns:
callable that returns a bearer token.
"""
from azure.identity import ClientSecretCredential, get_bearer_token_provider
verbose_logger.debug("Getting Azure AD Token from Entrata ID")
if tenant_id.startswith("os.environ/"):
_tenant_id = get_secret_str(tenant_id)
else:
_tenant_id = tenant_id
if client_id.startswith("os.environ/"):
_client_id = get_secret_str(client_id)
else:
_client_id = client_id
if client_secret.startswith("os.environ/"):
_client_secret = get_secret_str(client_secret)
else:
_client_secret = client_secret
verbose_logger.debug(
"tenant_id %s, client_id %s, client_secret %s",
_tenant_id,
_client_id,
_client_secret,
)
if _tenant_id is None or _client_id is None or _client_secret is None:
raise ValueError("tenant_id, client_id, and client_secret must be provided")
credential = ClientSecretCredential(_tenant_id, _client_id, _client_secret)
verbose_logger.debug("credential %s", credential)
token_provider = get_bearer_token_provider(credential, scope)
verbose_logger.debug("token_provider %s", token_provider)
return token_provider