From 43a06f408cfe1481b5b00f341c0012f70d9139cc Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Tue, 30 Jul 2024 08:18:52 -0700 Subject: [PATCH 1/5] feat add support for alist_batches --- litellm/batches/main.py | 141 +++++++++++++++++++++++++++++++++++++--- litellm/llms/openai.py | 66 +++++++++++++------ 2 files changed, 177 insertions(+), 30 deletions(-) diff --git a/litellm/batches/main.py b/litellm/batches/main.py index 79aefa5f5..3c41cf5d2 100644 --- a/litellm/batches/main.py +++ b/litellm/batches/main.py @@ -20,10 +20,8 @@ import httpx import litellm from litellm import client -from litellm.utils import supports_httpx_timeout - -from ..llms.openai import OpenAIBatchesAPI, OpenAIFilesAPI -from ..types.llms.openai import ( +from litellm.llms.openai import OpenAIBatchesAPI, OpenAIFilesAPI +from litellm.types.llms.openai import ( Batch, CancelBatchRequest, CreateBatchRequest, @@ -34,7 +32,8 @@ from ..types.llms.openai import ( HttpxBinaryResponseContent, RetrieveBatchRequest, ) -from ..types.router import * +from litellm.types.router import GenericLiteLLMParams +from litellm.utils import supports_httpx_timeout ####### ENVIRONMENT VARIABLES ################### openai_batches_instance = OpenAIBatchesAPI() @@ -314,17 +313,139 @@ def retrieve_batch( raise e -def cancel_batch(): - pass +async def alist_batches( + after: Optional[str] = None, + limit: Optional[int] = None, + custom_llm_provider: Literal["openai"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +) -> Batch: + """ + Async: List your organization's batches. + """ + try: + loop = asyncio.get_event_loop() + kwargs["alist_batches"] = True + + # Use a partial function to pass your keyword arguments + func = partial( + list_batches, + after, + limit, + custom_llm_provider, + extra_headers, + extra_body, + **kwargs, + ) + + # Add the context to the function + ctx = contextvars.copy_context() + func_with_context = partial(ctx.run, func) + init_response = await loop.run_in_executor(None, func_with_context) + if asyncio.iscoroutine(init_response): + response = await init_response + else: + response = init_response # type: ignore + + return response + except Exception as e: + raise e -def list_batch(): - pass +def list_batches( + after: Optional[str] = None, + limit: Optional[int] = None, + custom_llm_provider: Literal["openai"] = "openai", + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +): + """ + Lists batches + List your organization's batches. + """ + try: + optional_params = GenericLiteLLMParams(**kwargs) + if custom_llm_provider == "openai": + # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there + api_base = ( + optional_params.api_base + or litellm.api_base + or os.getenv("OPENAI_API_BASE") + or "https://api.openai.com/v1" + ) + organization = ( + optional_params.organization + or litellm.organization + or os.getenv("OPENAI_ORGANIZATION", None) + or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105 + ) + # set API KEY + api_key = ( + optional_params.api_key + or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there + or litellm.openai_key + or os.getenv("OPENAI_API_KEY") + ) + ### TIMEOUT LOGIC ### + timeout = ( + optional_params.timeout or kwargs.get("request_timeout", 600) or 600 + ) + # set timeout for 10 minutes by default -async def acancel_batch(): + if ( + timeout is not None + and isinstance(timeout, httpx.Timeout) + and supports_httpx_timeout(custom_llm_provider) == False + ): + read_timeout = timeout.read or 600 + timeout = read_timeout # default 10 min timeout + elif timeout is not None and not isinstance(timeout, httpx.Timeout): + timeout = float(timeout) # type: ignore + elif timeout is None: + timeout = 600.0 + + _is_async = kwargs.pop("alist_batches", False) is True + + response = openai_batches_instance.list_batches( + _is_async=_is_async, + after=after, + limit=limit, + api_base=api_base, + api_key=api_key, + organization=organization, + timeout=timeout, + max_retries=optional_params.max_retries, + ) + else: + raise litellm.exceptions.BadRequestError( + message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format( + custom_llm_provider + ), + model="n/a", + llm_provider=custom_llm_provider, + response=httpx.Response( + status_code=400, + content="Unsupported provider", + request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"), # type: ignore + ), + ) + return response + except Exception as e: + raise e pass async def alist_batch(): pass + + +def cancel_batch(): + pass + + +async def acancel_batch(): + pass diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index afd49ab14..fab3ff26d 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -2602,26 +2602,52 @@ class OpenAIBatchesAPI(BaseLLM): response = openai_client.batches.cancel(**cancel_batch_data) return response - # def list_batch( - # self, - # list_batch_data: ListBatchRequest, - # api_key: Optional[str], - # api_base: Optional[str], - # timeout: Union[float, httpx.Timeout], - # max_retries: Optional[int], - # organization: Optional[str], - # client: Optional[OpenAI] = None, - # ): - # openai_client: OpenAI = self.get_openai_client( - # api_key=api_key, - # api_base=api_base, - # timeout=timeout, - # max_retries=max_retries, - # organization=organization, - # client=client, - # ) - # response = openai_client.batches.list(**list_batch_data) - # return response + async def alist_batches( + self, + openai_client: AsyncOpenAI, + after: Optional[str] = None, + limit: Optional[int] = None, + ): + verbose_logger.debug("listing batches, after= %s, limit= %s", after, limit) + response = await openai_client.batches.list(after=after, limit=limit) + return response + + def list_batches( + self, + _is_async: bool, + api_key: Optional[str], + api_base: Optional[str], + timeout: Union[float, httpx.Timeout], + max_retries: Optional[int], + organization: Optional[str], + after: Optional[str] = None, + limit: Optional[int] = None, + client: Optional[OpenAI] = None, + ): + openai_client: Optional[Union[OpenAI, AsyncOpenAI]] = self.get_openai_client( + api_key=api_key, + api_base=api_base, + timeout=timeout, + max_retries=max_retries, + organization=organization, + client=client, + _is_async=_is_async, + ) + if openai_client is None: + raise ValueError( + "OpenAI client is not initialized. Make sure api_key is passed or OPENAI_API_KEY is set in the environment." + ) + + if _is_async is True: + if not isinstance(openai_client, AsyncOpenAI): + raise ValueError( + "OpenAI client is not an instance of AsyncOpenAI. Make sure you passed an AsyncOpenAI client." + ) + return self.alist_batches( # type: ignore + openai_client=openai_client, after=after, limit=limit + ) + response = openai_client.batches.list(after=after, limit=limit) + return response class OpenAIAssistantsAPI(BaseLLM): From 4206354ee79e4b880807ab2c857b119c07dbcb4d Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Tue, 30 Jul 2024 08:19:25 -0700 Subject: [PATCH 2/5] test - list batches --- litellm/tests/test_openai_batches_and_files.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/litellm/tests/test_openai_batches_and_files.py b/litellm/tests/test_openai_batches_and_files.py index 14aee6e19..e8bde4d20 100644 --- a/litellm/tests/test_openai_batches_and_files.py +++ b/litellm/tests/test_openai_batches_and_files.py @@ -72,6 +72,10 @@ def test_create_batch(): assert retrieved_batch.id == create_batch_response.id + # list all batches + list_batches = litellm.list_batches(custom_llm_provider="openai", limit=2) + print("list_batches=", list_batches) + file_content = litellm.file_content( file_id=batch_input_file_id, custom_llm_provider="openai" ) @@ -140,6 +144,10 @@ async def test_async_create_batch(): assert retrieved_batch.id == create_batch_response.id + # list all batches + list_batches = await litellm.alist_batches(custom_llm_provider="openai", limit=2) + print("list_batches=", list_batches) + # try to get file content for our original file file_content = await litellm.afile_content( From 563d59a3059f2c90965749c2fb6a27b3308cc4b0 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Tue, 30 Jul 2024 09:46:30 -0700 Subject: [PATCH 3/5] test batches endpoint on proxy --- litellm/batches/main.py | 4 -- litellm/proxy/proxy_server.py | 89 ++++++++++++++++++++++++++- tests/test_openai_batches_endpoint.py | 23 +++++++ 3 files changed, 110 insertions(+), 6 deletions(-) diff --git a/litellm/batches/main.py b/litellm/batches/main.py index 3c41cf5d2..a2ebc664e 100644 --- a/litellm/batches/main.py +++ b/litellm/batches/main.py @@ -439,10 +439,6 @@ def list_batches( pass -async def alist_batch(): - pass - - def cancel_batch(): pass diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index fe9b74874..cbb62289b 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -4898,12 +4898,12 @@ async def create_batch( @router.get( - "/v1/batches{batch_id:path}", + "/v1/batches/{batch_id:path}", dependencies=[Depends(user_api_key_auth)], tags=["batch"], ) @router.get( - "/batches{batch_id:path}", + "/batches/{batch_id:path}", dependencies=[Depends(user_api_key_auth)], tags=["batch"], ) @@ -4993,6 +4993,91 @@ async def retrieve_batch( ) +@router.get( + "/v1/batches", + dependencies=[Depends(user_api_key_auth)], + tags=["batch"], +) +@router.get( + "/batches", + dependencies=[Depends(user_api_key_auth)], + tags=["batch"], +) +async def list_batches( + fastapi_response: Response, + limit: Optional[int] = None, + after: Optional[str] = None, + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + """ + Lists + This is the equivalent of GET https://api.openai.com/v1/batches/ + Supports Identical Params as: https://platform.openai.com/docs/api-reference/batch/list + + Example Curl + ``` + curl http://localhost:4000/v1/batches?limit=2 \ + -H "Authorization: Bearer sk-1234" \ + -H "Content-Type: application/json" \ + + ``` + """ + global proxy_logging_obj + verbose_proxy_logger.debug("GET /v1/batches after={} limit={}".format(after, limit)) + try: + # for now use custom_llm_provider=="openai" -> this will change as LiteLLM adds more providers for acreate_batch + response = await litellm.alist_batches( + custom_llm_provider="openai", + after=after, + limit=limit, + ) + + ### RESPONSE HEADERS ### + hidden_params = getattr(response, "_hidden_params", {}) or {} + model_id = hidden_params.get("model_id", None) or "" + cache_key = hidden_params.get("cache_key", None) or "" + api_base = hidden_params.get("api_base", None) or "" + + fastapi_response.headers.update( + get_custom_headers( + user_api_key_dict=user_api_key_dict, + model_id=model_id, + cache_key=cache_key, + api_base=api_base, + version=version, + model_region=getattr(user_api_key_dict, "allowed_model_region", ""), + ) + ) + + return response + except Exception as e: + await proxy_logging_obj.post_call_failure_hook( + user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data + ) + verbose_proxy_logger.error( + "litellm.proxy.proxy_server.retrieve_batch(): Exception occured - {}".format( + str(e) + ) + ) + verbose_proxy_logger.debug(traceback.format_exc()) + if isinstance(e, HTTPException): + raise ProxyException( + message=getattr(e, "message", str(e.detail)), + type=getattr(e, "type", "None"), + param=getattr(e, "param", "None"), + code=getattr(e, "status_code", status.HTTP_400_BAD_REQUEST), + ) + else: + error_traceback = traceback.format_exc() + error_msg = f"{str(e)}" + raise ProxyException( + message=getattr(e, "message", error_msg), + type=getattr(e, "type", "None"), + param=getattr(e, "param", "None"), + code=getattr(e, "status_code", 500), + ) + + ###################################################################### # END OF /v1/batches Endpoints Implementation diff --git a/tests/test_openai_batches_endpoint.py b/tests/test_openai_batches_endpoint.py index 75e3c3f88..a6e26e782 100644 --- a/tests/test_openai_batches_endpoint.py +++ b/tests/test_openai_batches_endpoint.py @@ -41,6 +41,19 @@ async def get_batch_by_id(session, batch_id): return None +async def list_batches(session): + url = f"{BASE_URL}/v1/batches" + headers = {"Authorization": f"Bearer {API_KEY}"} + + async with session.get(url, headers=headers) as response: + if response.status == 200: + result = await response.json() + return result + else: + print(f"Error: Failed to get batch. Status code: {response.status}") + return None + + @pytest.mark.asyncio async def test_batches_operations(): async with aiohttp.ClientSession() as session: @@ -60,5 +73,15 @@ async def test_batches_operations(): assert get_batch_response["id"] == batch_id assert get_batch_response["input_file_id"] == file_id + # test LIST Batches + list_batch_response = await list_batches(session) + print("response from list batch", list_batch_response) + + assert list_batch_response is not None + assert len(list_batch_response["data"]) > 0 + + element_0 = list_batch_response["data"][0] + assert element_0["id"] is not None + # Test delete file await delete_file(session, file_id) From 1e1bce45947beb471d832179bff7b784f288dad9 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Tue, 30 Jul 2024 09:49:11 -0700 Subject: [PATCH 4/5] docs LIST batches --- docs/my-website/docs/batches.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/my-website/docs/batches.md b/docs/my-website/docs/batches.md index 2199e318f..101d1e505 100644 --- a/docs/my-website/docs/batches.md +++ b/docs/my-website/docs/batches.md @@ -12,6 +12,8 @@ Covers Batches, Files - Create Batch Request +- List Batches + - Retrieve the Specific Batch and File Content @@ -56,6 +58,15 @@ curl http://localhost:4000/v1/batches/batch_abc123 \ -H "Content-Type: application/json" \ ``` + +**List Batches** + +```bash +curl http://localhost:4000/v1/batches \ + -H "Authorization: Bearer sk-1234" \ + -H "Content-Type: application/json" \ +``` + @@ -116,6 +127,13 @@ file_content = await litellm.afile_content( print("file content = ", file_content) ``` +**List Batches** + +```python +list_batches_response = litellm.list_batches(custom_llm_provider="openai", limit=2) +print("list_batches_response=", list_batches_response) +``` + From 66211b42db0854d0777bafbe91efa20e11e2a9de Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Tue, 30 Jul 2024 12:51:39 -0700 Subject: [PATCH 5/5] fix linting errors --- litellm/llms/openai.py | 4 ++-- litellm/proxy/proxy_server.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index fab3ff26d..d2ba7ac13 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -2609,7 +2609,7 @@ class OpenAIBatchesAPI(BaseLLM): limit: Optional[int] = None, ): verbose_logger.debug("listing batches, after= %s, limit= %s", after, limit) - response = await openai_client.batches.list(after=after, limit=limit) + response = await openai_client.batches.list(after=after, limit=limit) # type: ignore return response def list_batches( @@ -2646,7 +2646,7 @@ class OpenAIBatchesAPI(BaseLLM): return self.alist_batches( # type: ignore openai_client=openai_client, after=after, limit=limit ) - response = openai_client.batches.list(after=after, limit=limit) + response = openai_client.batches.list(after=after, limit=limit) # type: ignore return response diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index cbb62289b..5a2970df5 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -5052,7 +5052,9 @@ async def list_batches( return response except Exception as e: await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data + user_api_key_dict=user_api_key_dict, + original_exception=e, + request_data={"after": after, "limit": limit}, ) verbose_proxy_logger.error( "litellm.proxy.proxy_server.retrieve_batch(): Exception occured - {}".format(