refactor(batches/main.py): working refactored azure client init on batches

This commit is contained in:
Krrish Dholakia 2025-03-11 14:36:38 -07:00
parent 9855e46208
commit 1516240bab
3 changed files with 51 additions and 19 deletions

View file

@ -111,6 +111,7 @@ def create_batch(
proxy_server_request = kwargs.get("proxy_server_request", None) proxy_server_request = kwargs.get("proxy_server_request", None)
model_info = kwargs.get("model_info", None) model_info = kwargs.get("model_info", None)
_is_async = kwargs.pop("acreate_batch", False) is True _is_async = kwargs.pop("acreate_batch", False) is True
litellm_params = get_litellm_params(**kwargs)
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None) litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None)
### TIMEOUT LOGIC ### ### TIMEOUT LOGIC ###
timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600 timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600
@ -217,6 +218,7 @@ def create_batch(
timeout=timeout, timeout=timeout,
max_retries=optional_params.max_retries, max_retries=optional_params.max_retries,
create_batch_data=_create_batch_request, create_batch_data=_create_batch_request,
litellm_params=litellm_params,
) )
elif custom_llm_provider == "vertex_ai": elif custom_llm_provider == "vertex_ai":
api_base = optional_params.api_base or "" api_base = optional_params.api_base or ""
@ -320,15 +322,12 @@ def retrieve_batch(
""" """
try: try:
optional_params = GenericLiteLLMParams(**kwargs) optional_params = GenericLiteLLMParams(**kwargs)
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None) litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None)
### TIMEOUT LOGIC ### ### TIMEOUT LOGIC ###
timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600 timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600
litellm_params = get_litellm_params( litellm_params = get_litellm_params(
custom_llm_provider=custom_llm_provider, custom_llm_provider=custom_llm_provider,
litellm_call_id=kwargs.get("litellm_call_id", None), **kwargs,
litellm_trace_id=kwargs.get("litellm_trace_id"),
litellm_metadata=kwargs.get("litellm_metadata"),
) )
litellm_logging_obj.update_environment_variables( litellm_logging_obj.update_environment_variables(
model=None, model=None,
@ -424,6 +423,7 @@ def retrieve_batch(
timeout=timeout, timeout=timeout,
max_retries=optional_params.max_retries, max_retries=optional_params.max_retries,
retrieve_batch_data=_retrieve_batch_request, retrieve_batch_data=_retrieve_batch_request,
litellm_params=litellm_params,
) )
elif custom_llm_provider == "vertex_ai": elif custom_llm_provider == "vertex_ai":
api_base = optional_params.api_base or "" api_base = optional_params.api_base or ""
@ -526,6 +526,10 @@ def list_batches(
try: try:
# set API KEY # set API KEY
optional_params = GenericLiteLLMParams(**kwargs) optional_params = GenericLiteLLMParams(**kwargs)
litellm_params = get_litellm_params(
custom_llm_provider=custom_llm_provider,
**kwargs,
)
api_key = ( api_key = (
optional_params.api_key optional_params.api_key
or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
@ -603,6 +607,7 @@ def list_batches(
api_version=api_version, api_version=api_version,
timeout=timeout, timeout=timeout,
max_retries=optional_params.max_retries, max_retries=optional_params.max_retries,
litellm_params=litellm_params,
) )
else: else:
raise litellm.exceptions.BadRequestError( raise litellm.exceptions.BadRequestError(
@ -678,6 +683,10 @@ def cancel_batch(
""" """
try: try:
optional_params = GenericLiteLLMParams(**kwargs) optional_params = GenericLiteLLMParams(**kwargs)
litellm_params = get_litellm_params(
custom_llm_provider=custom_llm_provider,
**kwargs,
)
### TIMEOUT LOGIC ### ### TIMEOUT LOGIC ###
timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600 timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600
# set timeout for 10 minutes by default # set timeout for 10 minutes by default
@ -765,6 +774,7 @@ def cancel_batch(
timeout=timeout, timeout=timeout,
max_retries=optional_params.max_retries, max_retries=optional_params.max_retries,
cancel_batch_data=_cancel_batch_request, cancel_batch_data=_cancel_batch_request,
litellm_params=litellm_params,
) )
else: else:
raise litellm.exceptions.BadRequestError( raise litellm.exceptions.BadRequestError(

View file

@ -16,8 +16,10 @@ from litellm.types.llms.openai import (
) )
from litellm.types.utils import LiteLLMBatch from litellm.types.utils import LiteLLMBatch
from ..common_utils import BaseAzureLLM
class AzureBatchesAPI:
class AzureBatchesAPI(BaseAzureLLM):
""" """
Azure methods to support for batches Azure methods to support for batches
- create_batch() - create_batch()
@ -34,28 +36,25 @@ class AzureBatchesAPI:
api_key: Optional[str], api_key: Optional[str],
api_base: Optional[str], api_base: Optional[str],
timeout: Union[float, httpx.Timeout], timeout: Union[float, httpx.Timeout],
litellm_params: dict,
max_retries: Optional[int], max_retries: Optional[int],
api_version: Optional[str] = None, api_version: Optional[str] = None,
client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None, client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None,
_is_async: bool = False, _is_async: bool = False,
) -> Optional[Union[AzureOpenAI, AsyncAzureOpenAI]]: ) -> Optional[Union[AzureOpenAI, AsyncAzureOpenAI]]:
received_args = locals()
openai_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None openai_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None
if client is None: if client is None:
data = {} azure_client_params = self.initialize_azure_sdk_client(
for k, v in received_args.items(): litellm_params=litellm_params,
if k == "self" or k == "client" or k == "_is_async": api_key=api_key,
pass model_name="",
elif k == "api_base" and v is not None: api_version=api_version,
data["azure_endpoint"] = v api_base=api_base,
elif v is not None: )
data[k] = v
if "api_version" not in data:
data["api_version"] = litellm.AZURE_DEFAULT_API_VERSION
if _is_async is True: if _is_async is True:
openai_client = AsyncAzureOpenAI(**data) openai_client = AsyncAzureOpenAI(**azure_client_params)
else: else:
openai_client = AzureOpenAI(**data) # type: ignore openai_client = AzureOpenAI(**azure_client_params) # type: ignore
else: else:
openai_client = client openai_client = client
@ -79,6 +78,7 @@ class AzureBatchesAPI:
timeout: Union[float, httpx.Timeout], timeout: Union[float, httpx.Timeout],
max_retries: Optional[int], max_retries: Optional[int],
client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None, client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = None,
litellm_params: Optional[dict] = None,
) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]: ) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]:
azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = ( azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = (
self.get_azure_openai_client( self.get_azure_openai_client(
@ -89,6 +89,7 @@ class AzureBatchesAPI:
max_retries=max_retries, max_retries=max_retries,
client=client, client=client,
_is_async=_is_async, _is_async=_is_async,
litellm_params=litellm_params or {},
) )
) )
if azure_client is None: if azure_client is None:
@ -125,6 +126,7 @@ class AzureBatchesAPI:
timeout: Union[float, httpx.Timeout], timeout: Union[float, httpx.Timeout],
max_retries: Optional[int], max_retries: Optional[int],
client: Optional[AzureOpenAI] = None, client: Optional[AzureOpenAI] = None,
litellm_params: Optional[dict] = None,
): ):
azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = ( azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = (
self.get_azure_openai_client( self.get_azure_openai_client(
@ -135,6 +137,7 @@ class AzureBatchesAPI:
max_retries=max_retries, max_retries=max_retries,
client=client, client=client,
_is_async=_is_async, _is_async=_is_async,
litellm_params=litellm_params or {},
) )
) )
if azure_client is None: if azure_client is None:
@ -173,6 +176,7 @@ class AzureBatchesAPI:
timeout: Union[float, httpx.Timeout], timeout: Union[float, httpx.Timeout],
max_retries: Optional[int], max_retries: Optional[int],
client: Optional[AzureOpenAI] = None, client: Optional[AzureOpenAI] = None,
litellm_params: Optional[dict] = None,
): ):
azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = ( azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = (
self.get_azure_openai_client( self.get_azure_openai_client(
@ -183,6 +187,7 @@ class AzureBatchesAPI:
max_retries=max_retries, max_retries=max_retries,
client=client, client=client,
_is_async=_is_async, _is_async=_is_async,
litellm_params=litellm_params or {},
) )
) )
if azure_client is None: if azure_client is None:
@ -212,6 +217,7 @@ class AzureBatchesAPI:
after: Optional[str] = None, after: Optional[str] = None,
limit: Optional[int] = None, limit: Optional[int] = None,
client: Optional[AzureOpenAI] = None, client: Optional[AzureOpenAI] = None,
litellm_params: Optional[dict] = None,
): ):
azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = ( azure_client: Optional[Union[AzureOpenAI, AsyncAzureOpenAI]] = (
self.get_azure_openai_client( self.get_azure_openai_client(
@ -222,6 +228,7 @@ class AzureBatchesAPI:
api_version=api_version, api_version=api_version,
client=client, client=client,
_is_async=_is_async, _is_async=_is_async,
litellm_params=litellm_params or {},
) )
) )
if azure_client is None: if azure_client is None:

View file

@ -219,11 +219,12 @@ def test_select_azure_base_url_called(setup_mocks):
CallTypes.acompletion, CallTypes.acompletion,
CallTypes.atext_completion, CallTypes.atext_completion,
CallTypes.aembedding, CallTypes.aembedding,
# CallTypes.arerank,
CallTypes.atranscription, CallTypes.atranscription,
CallTypes.aspeech, CallTypes.aspeech,
CallTypes.aimage_generation, CallTypes.aimage_generation,
# BATCHES ENDPOINTS # BATCHES ENDPOINTS
CallTypes.acreate_batch,
CallTypes.aretrieve_batch,
# ASSISTANT ENDPOINTS # ASSISTANT ENDPOINTS
], ],
) )
@ -260,6 +261,12 @@ async def test_ensure_initialize_azure_sdk_client_always_used(call_type):
"arerank": {"input": "Hello, how are you?"}, "arerank": {"input": "Hello, how are you?"},
"atranscription": {"file": "path/to/file"}, "atranscription": {"file": "path/to/file"},
"aspeech": {"input": "Hello, how are you?", "voice": "female"}, "aspeech": {"input": "Hello, how are you?", "voice": "female"},
"acreate_batch": {
"completion_window": 10,
"endpoint": "https://test.openai.azure.com",
"input_file_id": "123",
},
"aretrieve_batch": {"batch_id": "123"},
} }
# Get appropriate input for this call type # Get appropriate input for this call type
@ -270,6 +277,14 @@ async def test_ensure_initialize_azure_sdk_client_always_used(call_type):
patch_target = ( patch_target = (
"litellm.main.azure_audio_transcriptions.initialize_azure_sdk_client" "litellm.main.azure_audio_transcriptions.initialize_azure_sdk_client"
) )
elif call_type == CallTypes.arerank:
patch_target = (
"litellm.rerank_api.main.azure_rerank.initialize_azure_sdk_client"
)
elif call_type == CallTypes.acreate_batch or call_type == CallTypes.aretrieve_batch:
patch_target = (
"litellm.batches.main.azure_batches_instance.initialize_azure_sdk_client"
)
# Mock the initialize_azure_sdk_client function # Mock the initialize_azure_sdk_client function
with patch(patch_target) as mock_init_azure: with patch(patch_target) as mock_init_azure: