from typing import Callable, Optional, Union import httpx from litellm._logging import verbose_logger from litellm.llms.base_llm.transformation import BaseLLMException from litellm.secret_managers.main import get_secret_str class AzureOpenAIError(BaseLLMException): def __init__( self, status_code, message, request: Optional[httpx.Request] = None, response: Optional[httpx.Response] = None, headers: Optional[Union[httpx.Headers, dict]] = None, ): super().__init__( status_code=status_code, message=message, request=request, response=response, headers=headers, ) def process_azure_headers(headers: Union[httpx.Headers, dict]) -> dict: openai_headers = {} if "x-ratelimit-limit-requests" in headers: openai_headers["x-ratelimit-limit-requests"] = headers[ "x-ratelimit-limit-requests" ] if "x-ratelimit-remaining-requests" in headers: openai_headers["x-ratelimit-remaining-requests"] = headers[ "x-ratelimit-remaining-requests" ] if "x-ratelimit-limit-tokens" in headers: openai_headers["x-ratelimit-limit-tokens"] = headers["x-ratelimit-limit-tokens"] if "x-ratelimit-remaining-tokens" in headers: openai_headers["x-ratelimit-remaining-tokens"] = headers[ "x-ratelimit-remaining-tokens" ] llm_response_headers = { "{}-{}".format("llm_provider", k): v for k, v in headers.items() } return {**llm_response_headers, **openai_headers} def get_azure_ad_token_from_entrata_id( tenant_id: str, client_id: str, client_secret: str, scope: str = "https://cognitiveservices.azure.com/.default", ) -> Callable[[], str]: """ Get Azure AD token provider from `client_id`, `client_secret`, and `tenant_id` Args: tenant_id: str client_id: str client_secret: str scope: str Returns: callable that returns a bearer token. """ from azure.identity import ( ClientSecretCredential, DefaultAzureCredential, get_bearer_token_provider, ) verbose_logger.debug("Getting Azure AD Token from Entrata ID") if tenant_id.startswith("os.environ/"): _tenant_id = get_secret_str(tenant_id) else: _tenant_id = tenant_id if client_id.startswith("os.environ/"): _client_id = get_secret_str(client_id) else: _client_id = client_id if client_secret.startswith("os.environ/"): _client_secret = get_secret_str(client_secret) else: _client_secret = client_secret verbose_logger.debug( "tenant_id %s, client_id %s, client_secret %s", _tenant_id, _client_id, _client_secret, ) if _tenant_id is None or _client_id is None or _client_secret is None: raise ValueError("tenant_id, client_id, and client_secret must be provided") credential = ClientSecretCredential(_tenant_id, _client_id, _client_secret) verbose_logger.debug("credential %s", credential) token_provider = get_bearer_token_provider(credential, scope) verbose_logger.debug("token_provider %s", token_provider) return token_provider