From 58c4b024479d7a7549ffc102c68d0906c3d0afc0 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 15:50:13 -0700 Subject: [PATCH 01/13] feat - make anthropic async --- litellm/llms/anthropic.py | 365 ++++++++++++++-------- litellm/llms/custom_httpx/http_handler.py | 4 +- litellm/main.py | 2 + 3 files changed, 231 insertions(+), 140 deletions(-) diff --git a/litellm/llms/anthropic.py b/litellm/llms/anthropic.py index b7b078b9b..db41ae6e3 100644 --- a/litellm/llms/anthropic.py +++ b/litellm/llms/anthropic.py @@ -7,6 +7,9 @@ from typing import Callable, Optional, List from litellm.utils import ModelResponse, Usage, map_finish_reason, CustomStreamWrapper import litellm from .prompt_templates.factory import prompt_factory, custom_prompt +from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler + +async_handler = AsyncHTTPHandler() import httpx @@ -36,7 +39,9 @@ class AnthropicConfig: to pass metadata to anthropic, it's {"user_id": "any-relevant-information"} """ - max_tokens: Optional[int] = 4096 # anthropic requires a default value (Opus, Sonnet, and Haiku have the same default) + max_tokens: Optional[int] = ( + 4096 # anthropic requires a default value (Opus, Sonnet, and Haiku have the same default) + ) stop_sequences: Optional[list] = None temperature: Optional[int] = None top_p: Optional[int] = None @@ -46,7 +51,9 @@ class AnthropicConfig: def __init__( self, - max_tokens: Optional[int] = 4096, # You can pass in a value yourself or use the default value 4096 + max_tokens: Optional[ + int + ] = 4096, # You can pass in a value yourself or use the default value 4096 stop_sequences: Optional[list] = None, temperature: Optional[int] = None, top_p: Optional[int] = None, @@ -95,6 +102,169 @@ def validate_environment(api_key, user_headers): return headers +def process_response( + model, + response, + model_response, + _is_function_call, + stream, + logging_obj, + api_key, + data, + messages, + print_verbose, +): + ## LOGGING + logging_obj.post_call( + input=messages, + api_key=api_key, + original_response=response.text, + additional_args={"complete_input_dict": data}, + ) + print_verbose(f"raw model_response: {response.text}") + ## RESPONSE OBJECT + try: + completion_response = response.json() + except: + raise AnthropicError(message=response.text, status_code=response.status_code) + if "error" in completion_response: + raise AnthropicError( + message=str(completion_response["error"]), + status_code=response.status_code, + ) + elif len(completion_response["content"]) == 0: + raise AnthropicError( + message="No content in response", + status_code=response.status_code, + ) + else: + text_content = "" + tool_calls = [] + for content in completion_response["content"]: + if content["type"] == "text": + text_content += content["text"] + ## TOOL CALLING + elif content["type"] == "tool_use": + tool_calls.append( + { + "id": content["id"], + "type": "function", + "function": { + "name": content["name"], + "arguments": json.dumps(content["input"]), + }, + } + ) + + _message = litellm.Message( + tool_calls=tool_calls, + content=text_content or None, + ) + model_response.choices[0].message = _message # type: ignore + model_response._hidden_params["original_response"] = completion_response[ + "content" + ] # allow user to access raw anthropic tool calling response + + model_response.choices[0].finish_reason = map_finish_reason( + completion_response["stop_reason"] + ) + + print_verbose(f"_is_function_call: {_is_function_call}; stream: {stream}") + if _is_function_call and stream: + print_verbose("INSIDE ANTHROPIC STREAMING TOOL CALLING CONDITION BLOCK") + # return an iterator + streaming_model_response = ModelResponse(stream=True) + streaming_model_response.choices[0].finish_reason = model_response.choices[ + 0 + ].finish_reason + # streaming_model_response.choices = [litellm.utils.StreamingChoices()] + streaming_choice = litellm.utils.StreamingChoices() + streaming_choice.index = model_response.choices[0].index + _tool_calls = [] + print_verbose( + f"type of model_response.choices[0]: {type(model_response.choices[0])}" + ) + print_verbose(f"type of streaming_choice: {type(streaming_choice)}") + if isinstance(model_response.choices[0], litellm.Choices): + if getattr( + model_response.choices[0].message, "tool_calls", None + ) is not None and isinstance( + model_response.choices[0].message.tool_calls, list + ): + for tool_call in model_response.choices[0].message.tool_calls: + _tool_call = {**tool_call.dict(), "index": 0} + _tool_calls.append(_tool_call) + delta_obj = litellm.utils.Delta( + content=getattr(model_response.choices[0].message, "content", None), + role=model_response.choices[0].message.role, + tool_calls=_tool_calls, + ) + streaming_choice.delta = delta_obj + streaming_model_response.choices = [streaming_choice] + completion_stream = ModelResponseIterator( + model_response=streaming_model_response + ) + print_verbose( + "Returns anthropic CustomStreamWrapper with 'cached_response' streaming object" + ) + return CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="cached_response", + logging_obj=logging_obj, + ) + + ## CALCULATING USAGE + prompt_tokens = completion_response["usage"]["input_tokens"] + completion_tokens = completion_response["usage"]["output_tokens"] + total_tokens = prompt_tokens + completion_tokens + + model_response["created"] = int(time.time()) + model_response["model"] = model + usage = Usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + ) + model_response.usage = usage + return model_response + + +async def acompletion_function( + model: str, + messages: list, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + api_key, + logging_obj, + stream, + _is_function_call, + data=None, + optional_params=None, + litellm_params=None, + logger_fn=None, + headers={}, +): + response = await async_handler.post( + api_base, headers=headers, data=json.dumps(data) + ) + return process_response( + model=model, + response=response, + model_response=model_response, + _is_function_call=_is_function_call, + stream=stream, + logging_obj=logging_obj, + api_key=api_key, + data=data, + messages=messages, + print_verbose=print_verbose, + ) + + def completion( model: str, messages: list, @@ -106,6 +276,7 @@ def completion( api_key, logging_obj, optional_params=None, + acompletion=None, litellm_params=None, logger_fn=None, headers={}, @@ -184,148 +355,66 @@ def completion( }, ) print_verbose(f"_is_function_call: {_is_function_call}") - ## COMPLETION CALL - if ( - stream and not _is_function_call - ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) - print_verbose("makes anthropic streaming POST request") - data["stream"] = stream - response = requests.post( - api_base, - headers=headers, - data=json.dumps(data), - stream=stream, - ) - - if response.status_code != 200: - raise AnthropicError( - status_code=response.status_code, message=response.text - ) - - return response.iter_lines() - else: - response = requests.post(api_base, headers=headers, data=json.dumps(data)) - if response.status_code != 200: - raise AnthropicError( - status_code=response.status_code, message=response.text - ) - - ## LOGGING - logging_obj.post_call( - input=messages, - api_key=api_key, - original_response=response.text, - additional_args={"complete_input_dict": data}, - ) - print_verbose(f"raw model_response: {response.text}") - ## RESPONSE OBJECT - try: - completion_response = response.json() - except: - raise AnthropicError( - message=response.text, status_code=response.status_code - ) - if "error" in completion_response: - raise AnthropicError( - message=str(completion_response["error"]), - status_code=response.status_code, - ) - elif len(completion_response["content"]) == 0: - raise AnthropicError( - message="No content in response", - status_code=response.status_code, - ) + if acompletion == True: + if optional_params.get("stream", False): + pass else: - text_content = "" - tool_calls = [] - for content in completion_response["content"]: - if content["type"] == "text": - text_content += content["text"] - ## TOOL CALLING - elif content["type"] == "tool_use": - tool_calls.append( - { - "id": content["id"], - "type": "function", - "function": { - "name": content["name"], - "arguments": json.dumps(content["input"]), - }, - } - ) - - _message = litellm.Message( - tool_calls=tool_calls, - content=text_content or None, + return acompletion_function( + model=model, + messages=messages, + data=data, + api_base=api_base, + custom_prompt_dict=custom_prompt_dict, + model_response=model_response, + print_verbose=print_verbose, + encoding=encoding, + api_key=api_key, + logging_obj=logging_obj, + optional_params=optional_params, + stream=stream, + _is_function_call=_is_function_call, + litellm_params=litellm_params, + logger_fn=logger_fn, + headers=headers, ) - model_response.choices[0].message = _message # type: ignore - model_response._hidden_params["original_response"] = completion_response[ - "content" - ] # allow user to access raw anthropic tool calling response - - model_response.choices[0].finish_reason = map_finish_reason( - completion_response["stop_reason"] + else: + ## COMPLETION CALL + if ( + stream and not _is_function_call + ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) + print_verbose("makes anthropic streaming POST request") + data["stream"] = stream + response = requests.post( + api_base, + headers=headers, + data=json.dumps(data), + stream=stream, ) - print_verbose(f"_is_function_call: {_is_function_call}; stream: {stream}") - if _is_function_call and stream: - print_verbose("INSIDE ANTHROPIC STREAMING TOOL CALLING CONDITION BLOCK") - # return an iterator - streaming_model_response = ModelResponse(stream=True) - streaming_model_response.choices[0].finish_reason = model_response.choices[ - 0 - ].finish_reason - # streaming_model_response.choices = [litellm.utils.StreamingChoices()] - streaming_choice = litellm.utils.StreamingChoices() - streaming_choice.index = model_response.choices[0].index - _tool_calls = [] - print_verbose( - f"type of model_response.choices[0]: {type(model_response.choices[0])}" - ) - print_verbose(f"type of streaming_choice: {type(streaming_choice)}") - if isinstance(model_response.choices[0], litellm.Choices): - if getattr( - model_response.choices[0].message, "tool_calls", None - ) is not None and isinstance( - model_response.choices[0].message.tool_calls, list - ): - for tool_call in model_response.choices[0].message.tool_calls: - _tool_call = {**tool_call.dict(), "index": 0} - _tool_calls.append(_tool_call) - delta_obj = litellm.utils.Delta( - content=getattr(model_response.choices[0].message, "content", None), - role=model_response.choices[0].message.role, - tool_calls=_tool_calls, - ) - streaming_choice.delta = delta_obj - streaming_model_response.choices = [streaming_choice] - completion_stream = ModelResponseIterator( - model_response=streaming_model_response - ) - print_verbose( - "Returns anthropic CustomStreamWrapper with 'cached_response' streaming object" - ) - return CustomStreamWrapper( - completion_stream=completion_stream, - model=model, - custom_llm_provider="cached_response", - logging_obj=logging_obj, + if response.status_code != 200: + raise AnthropicError( + status_code=response.status_code, message=response.text ) - ## CALCULATING USAGE - prompt_tokens = completion_response["usage"]["input_tokens"] - completion_tokens = completion_response["usage"]["output_tokens"] - total_tokens = prompt_tokens + completion_tokens - - model_response["created"] = int(time.time()) - model_response["model"] = model - usage = Usage( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, - ) - model_response.usage = usage - return model_response + return response.iter_lines() + else: + response = requests.post(api_base, headers=headers, data=json.dumps(data)) + if response.status_code != 200: + raise AnthropicError( + status_code=response.status_code, message=response.text + ) + return process_response( + model=model, + response=response, + model_response=model_response, + _is_function_call=_is_function_call, + stream=stream, + logging_obj=logging_obj, + api_key=api_key, + data=data, + messages=messages, + print_verbose=print_verbose, + ) class ModelResponseIterator: diff --git a/litellm/llms/custom_httpx/http_handler.py b/litellm/llms/custom_httpx/http_handler.py index 10314d831..51723a2f9 100644 --- a/litellm/llms/custom_httpx/http_handler.py +++ b/litellm/llms/custom_httpx/http_handler.py @@ -1,5 +1,5 @@ import httpx, asyncio -from typing import Optional +from typing import Optional, Union class AsyncHTTPHandler: @@ -25,7 +25,7 @@ class AsyncHTTPHandler: async def post( self, url: str, - data: Optional[dict] = None, + data: Optional[Union[dict, str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, ): diff --git a/litellm/main.py b/litellm/main.py index 5a9eb6e45..b7e5a3ba9 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -304,6 +304,7 @@ async def acompletion( or custom_llm_provider == "vertex_ai" or custom_llm_provider == "gemini" or custom_llm_provider == "sagemaker" + or custom_llm_provider == "anthropic" or custom_llm_provider in litellm.openai_compatible_providers ): # currently implemented aiohttp calls for just azure, openai, hf, ollama, vertex ai soon all. init_response = await loop.run_in_executor(None, func_with_context) @@ -1184,6 +1185,7 @@ def completion( model=model, messages=messages, api_base=api_base, + acompletion=acompletion, custom_prompt_dict=litellm.custom_prompt_dict, model_response=model_response, print_verbose=print_verbose, From 8e5e99533b77ae66390885a1e0c40fba5f7b651a Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 17:34:23 -0700 Subject: [PATCH 02/13] async streaming for anthropic --- litellm/main.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/litellm/main.py b/litellm/main.py index b7e5a3ba9..75382b8ce 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -1197,19 +1197,6 @@ def completion( logging_obj=logging, headers=headers, ) - if ( - "stream" in optional_params - and optional_params["stream"] == True - and not isinstance(response, CustomStreamWrapper) - ): - # don't try to access stream object, - response = CustomStreamWrapper( - response, - model, - custom_llm_provider="anthropic", - logging_obj=logging, - ) - if optional_params.get("stream", False) or acompletion == True: ## LOGGING logging.post_call( From 7849c29f70a12660e0798237888c6656cd6213cc Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 17:36:56 -0700 Subject: [PATCH 03/13] async anthropic streaming --- litellm/utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/litellm/utils.py b/litellm/utils.py index 5153a414b..1e67d63e5 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -8710,7 +8710,9 @@ class CustomStreamWrapper: return hold, curr_chunk def handle_anthropic_chunk(self, chunk): - str_line = chunk.decode("utf-8") # Convert bytes to string + str_line = chunk + if isinstance(chunk, bytes): # Handle binary data + str_line = chunk.decode("utf-8") # Convert bytes to string text = "" is_finished = False finish_reason = None @@ -9970,6 +9972,7 @@ class CustomStreamWrapper: or self.custom_llm_provider == "custom_openai" or self.custom_llm_provider == "text-completion-openai" or self.custom_llm_provider == "azure_text" + or self.custom_llm_provider == "anthropic" or self.custom_llm_provider == "huggingface" or self.custom_llm_provider == "ollama" or self.custom_llm_provider == "ollama_chat" From 5c796b436512d1f9addb303c634db1842d9116a3 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 17:53:06 -0700 Subject: [PATCH 04/13] async streaming anthropic --- litellm/llms/anthropic.py | 78 +++++++++++++++++++++-- litellm/llms/custom_httpx/http_handler.py | 19 ++++-- 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/litellm/llms/anthropic.py b/litellm/llms/anthropic.py index db41ae6e3..47e485ecb 100644 --- a/litellm/llms/anthropic.py +++ b/litellm/llms/anthropic.py @@ -9,8 +9,6 @@ import litellm from .prompt_templates.factory import prompt_factory, custom_prompt from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler -async_handler = AsyncHTTPHandler() - import httpx @@ -18,6 +16,11 @@ class AnthropicConstants(Enum): HUMAN_PROMPT = "\n\nHuman: " AI_PROMPT = "\n\nAssistant: " + # constants from https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/_constants.py + + +async_handler = AsyncHTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0)) + class AnthropicError(Exception): def __init__(self, status_code, message): @@ -230,6 +233,42 @@ def process_response( return model_response +async def acompletion_stream_function( + model: str, + messages: list, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + api_key, + logging_obj, + stream, + _is_function_call, + data=None, + optional_params=None, + litellm_params=None, + logger_fn=None, + headers={}, +): + response = await async_handler.post( + api_base, headers=headers, data=json.dumps(data) + ) + + if response.status_code != 200: + raise AnthropicError(status_code=response.status_code, message=response.text) + + completion_stream = response.aiter_lines() + + streamwrapper = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="anthropic", + logging_obj=logging_obj, + ) + return streamwrapper + + async def acompletion_function( model: str, messages: list, @@ -356,8 +395,29 @@ def completion( ) print_verbose(f"_is_function_call: {_is_function_call}") if acompletion == True: - if optional_params.get("stream", False): - pass + if ( + stream and not _is_function_call + ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) + print_verbose("makes async anthropic streaming POST request") + data["stream"] = stream + return acompletion_stream_function( + model=model, + messages=messages, + data=data, + api_base=api_base, + custom_prompt_dict=custom_prompt_dict, + model_response=model_response, + print_verbose=print_verbose, + encoding=encoding, + api_key=api_key, + logging_obj=logging_obj, + optional_params=optional_params, + stream=stream, + _is_function_call=_is_function_call, + litellm_params=litellm_params, + logger_fn=logger_fn, + headers=headers, + ) else: return acompletion_function( model=model, @@ -396,7 +456,15 @@ def completion( status_code=response.status_code, message=response.text ) - return response.iter_lines() + completion_stream = response.iter_lines() + streaming_response = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="anthropic", + logging_obj=logging_obj, + ) + return streaming_response + else: response = requests.post(api_base, headers=headers, data=json.dumps(data)) if response.status_code != 200: diff --git a/litellm/llms/custom_httpx/http_handler.py b/litellm/llms/custom_httpx/http_handler.py index 51723a2f9..c008b0593 100644 --- a/litellm/llms/custom_httpx/http_handler.py +++ b/litellm/llms/custom_httpx/http_handler.py @@ -1,15 +1,21 @@ import httpx, asyncio -from typing import Optional, Union +from typing import Optional, Union, Mapping, Any + +# https://www.python-httpx.org/advanced/timeouts +_DEFAULT_TIMEOUT = httpx.Timeout(timeout=5.0, connect=5.0) class AsyncHTTPHandler: - def __init__(self, concurrent_limit=1000): + def __init__( + self, timeout: httpx.Timeout = _DEFAULT_TIMEOUT, concurrent_limit=1000 + ): # Create a client with a connection pool self.client = httpx.AsyncClient( + timeout=timeout, limits=httpx.Limits( max_connections=concurrent_limit, max_keepalive_connections=concurrent_limit, - ) + ), ) async def close(self): @@ -25,12 +31,15 @@ class AsyncHTTPHandler: async def post( self, url: str, - data: Optional[Union[dict, str]] = None, + data: Optional[Union[dict, str]] = None, # type: ignore params: Optional[dict] = None, headers: Optional[dict] = None, ): response = await self.client.post( - url, data=data, params=params, headers=headers + url, + data=data, # type: ignore + params=params, + headers=headers, ) return response From 2cf41d3d9f4e5af65f517ef7a842ba2f07574a2c Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 17:54:19 -0700 Subject: [PATCH 05/13] async ahtropic streaming --- litellm/llms/anthropic_text.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/litellm/llms/anthropic_text.py b/litellm/llms/anthropic_text.py index bccc8c769..c9a9adfc2 100644 --- a/litellm/llms/anthropic_text.py +++ b/litellm/llms/anthropic_text.py @@ -4,7 +4,7 @@ from enum import Enum import requests import time from typing import Callable, Optional -from litellm.utils import ModelResponse, Usage +from litellm.utils import ModelResponse, Usage, CustomStreamWrapper import litellm from .prompt_templates.factory import prompt_factory, custom_prompt import httpx @@ -162,8 +162,15 @@ def completion( raise AnthropicError( status_code=response.status_code, message=response.text ) + completion_stream = response.iter_lines() + stream_response = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="anthropic", + logging_obj=logging_obj, + ) + return stream_response - return response.iter_lines() else: response = requests.post(api_base, headers=headers, data=json.dumps(data)) if response.status_code != 200: From 548b2b686118306dd751cd6a1815982ad675d70f Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 17:55:26 -0700 Subject: [PATCH 06/13] test - async claude streaming --- litellm/tests/test_streaming.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index ed6298752..a8e3f7680 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -831,22 +831,25 @@ def test_bedrock_claude_3_streaming(): pytest.fail(f"Error occurred: {e}") -def test_claude_3_streaming_finish_reason(): +@pytest.mark.asyncio +async def test_claude_3_streaming_finish_reason(): try: litellm.set_verbose = True messages = [ {"role": "system", "content": "Be helpful"}, {"role": "user", "content": "What do you know?"}, ] - response: ModelResponse = completion( # type: ignore + response: ModelResponse = await litellm.acompletion( # type: ignore model="claude-3-opus-20240229", messages=messages, stream=True, + max_tokens=10, ) complete_response = "" # Add any assertions here to check the response num_finish_reason = 0 - for idx, chunk in enumerate(response): + async for chunk in response: + print(f"chunk: {chunk}") if isinstance(chunk, ModelResponse): if chunk.choices[0].finish_reason is not None: num_finish_reason += 1 From 9be6b7ec7c450de262555f8f5a29231528b9efd2 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 18:07:41 -0700 Subject: [PATCH 07/13] ci/cd run again --- litellm/tests/test_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index a8e3f7680..5e7609db9 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -846,7 +846,7 @@ async def test_claude_3_streaming_finish_reason(): max_tokens=10, ) complete_response = "" - # Add any assertions here to check the response + # Add any assertions here to-check the response num_finish_reason = 0 async for chunk in response: print(f"chunk: {chunk}") From fcf5aa278b1563fc9479e50705fcba21e258b7e8 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 18:19:28 -0700 Subject: [PATCH 08/13] fix - use anthropic class for clients --- litellm/llms/anthropic.py | 762 +++++++++++++++++++------------------- litellm/main.py | 5 +- 2 files changed, 389 insertions(+), 378 deletions(-) diff --git a/litellm/llms/anthropic.py b/litellm/llms/anthropic.py index 47e485ecb..3eca11bef 100644 --- a/litellm/llms/anthropic.py +++ b/litellm/llms/anthropic.py @@ -8,7 +8,7 @@ from litellm.utils import ModelResponse, Usage, map_finish_reason, CustomStreamW import litellm from .prompt_templates.factory import prompt_factory, custom_prompt from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler - +from .base import BaseLLM import httpx @@ -19,9 +19,6 @@ class AnthropicConstants(Enum): # constants from https://github.com/anthropics/anthropic-sdk-python/blob/main/src/anthropic/_constants.py -async_handler = AsyncHTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0)) - - class AnthropicError(Exception): def __init__(self, status_code, message): self.status_code = status_code @@ -105,384 +102,402 @@ def validate_environment(api_key, user_headers): return headers -def process_response( - model, - response, - model_response, - _is_function_call, - stream, - logging_obj, - api_key, - data, - messages, - print_verbose, -): - ## LOGGING - logging_obj.post_call( - input=messages, - api_key=api_key, - original_response=response.text, - additional_args={"complete_input_dict": data}, - ) - print_verbose(f"raw model_response: {response.text}") - ## RESPONSE OBJECT - try: - completion_response = response.json() - except: - raise AnthropicError(message=response.text, status_code=response.status_code) - if "error" in completion_response: - raise AnthropicError( - message=str(completion_response["error"]), - status_code=response.status_code, - ) - elif len(completion_response["content"]) == 0: - raise AnthropicError( - message="No content in response", - status_code=response.status_code, - ) - else: - text_content = "" - tool_calls = [] - for content in completion_response["content"]: - if content["type"] == "text": - text_content += content["text"] - ## TOOL CALLING - elif content["type"] == "tool_use": - tool_calls.append( - { - "id": content["id"], - "type": "function", - "function": { - "name": content["name"], - "arguments": json.dumps(content["input"]), - }, - } - ) - - _message = litellm.Message( - tool_calls=tool_calls, - content=text_content or None, - ) - model_response.choices[0].message = _message # type: ignore - model_response._hidden_params["original_response"] = completion_response[ - "content" - ] # allow user to access raw anthropic tool calling response - - model_response.choices[0].finish_reason = map_finish_reason( - completion_response["stop_reason"] +class AnthropicChatCompletion(BaseLLM): + def __init__(self) -> None: + super().__init__() + self.async_handler = AsyncHTTPHandler( + timeout=httpx.Timeout(timeout=600.0, connect=5.0) ) - print_verbose(f"_is_function_call: {_is_function_call}; stream: {stream}") - if _is_function_call and stream: - print_verbose("INSIDE ANTHROPIC STREAMING TOOL CALLING CONDITION BLOCK") - # return an iterator - streaming_model_response = ModelResponse(stream=True) - streaming_model_response.choices[0].finish_reason = model_response.choices[ - 0 - ].finish_reason - # streaming_model_response.choices = [litellm.utils.StreamingChoices()] - streaming_choice = litellm.utils.StreamingChoices() - streaming_choice.index = model_response.choices[0].index - _tool_calls = [] - print_verbose( - f"type of model_response.choices[0]: {type(model_response.choices[0])}" + def process_response( + self, + model, + response, + model_response, + _is_function_call, + stream, + logging_obj, + api_key, + data, + messages, + print_verbose, + ): + ## LOGGING + logging_obj.post_call( + input=messages, + api_key=api_key, + original_response=response.text, + additional_args={"complete_input_dict": data}, ) - print_verbose(f"type of streaming_choice: {type(streaming_choice)}") - if isinstance(model_response.choices[0], litellm.Choices): - if getattr( - model_response.choices[0].message, "tool_calls", None - ) is not None and isinstance( - model_response.choices[0].message.tool_calls, list - ): - for tool_call in model_response.choices[0].message.tool_calls: - _tool_call = {**tool_call.dict(), "index": 0} - _tool_calls.append(_tool_call) - delta_obj = litellm.utils.Delta( - content=getattr(model_response.choices[0].message, "content", None), - role=model_response.choices[0].message.role, - tool_calls=_tool_calls, - ) - streaming_choice.delta = delta_obj - streaming_model_response.choices = [streaming_choice] - completion_stream = ModelResponseIterator( - model_response=streaming_model_response - ) - print_verbose( - "Returns anthropic CustomStreamWrapper with 'cached_response' streaming object" - ) - return CustomStreamWrapper( - completion_stream=completion_stream, - model=model, - custom_llm_provider="cached_response", - logging_obj=logging_obj, - ) - - ## CALCULATING USAGE - prompt_tokens = completion_response["usage"]["input_tokens"] - completion_tokens = completion_response["usage"]["output_tokens"] - total_tokens = prompt_tokens + completion_tokens - - model_response["created"] = int(time.time()) - model_response["model"] = model - usage = Usage( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, - ) - model_response.usage = usage - return model_response - - -async def acompletion_stream_function( - model: str, - messages: list, - api_base: str, - custom_prompt_dict: dict, - model_response: ModelResponse, - print_verbose: Callable, - encoding, - api_key, - logging_obj, - stream, - _is_function_call, - data=None, - optional_params=None, - litellm_params=None, - logger_fn=None, - headers={}, -): - response = await async_handler.post( - api_base, headers=headers, data=json.dumps(data) - ) - - if response.status_code != 200: - raise AnthropicError(status_code=response.status_code, message=response.text) - - completion_stream = response.aiter_lines() - - streamwrapper = CustomStreamWrapper( - completion_stream=completion_stream, - model=model, - custom_llm_provider="anthropic", - logging_obj=logging_obj, - ) - return streamwrapper - - -async def acompletion_function( - model: str, - messages: list, - api_base: str, - custom_prompt_dict: dict, - model_response: ModelResponse, - print_verbose: Callable, - encoding, - api_key, - logging_obj, - stream, - _is_function_call, - data=None, - optional_params=None, - litellm_params=None, - logger_fn=None, - headers={}, -): - response = await async_handler.post( - api_base, headers=headers, data=json.dumps(data) - ) - return process_response( - model=model, - response=response, - model_response=model_response, - _is_function_call=_is_function_call, - stream=stream, - logging_obj=logging_obj, - api_key=api_key, - data=data, - messages=messages, - print_verbose=print_verbose, - ) - - -def completion( - model: str, - messages: list, - api_base: str, - custom_prompt_dict: dict, - model_response: ModelResponse, - print_verbose: Callable, - encoding, - api_key, - logging_obj, - optional_params=None, - acompletion=None, - litellm_params=None, - logger_fn=None, - headers={}, -): - headers = validate_environment(api_key, headers) - _is_function_call = False - messages = copy.deepcopy(messages) - optional_params = copy.deepcopy(optional_params) - if model in custom_prompt_dict: - # check if the model has a registered custom prompt - model_prompt_details = custom_prompt_dict[model] - prompt = custom_prompt( - role_dict=model_prompt_details["roles"], - initial_prompt_value=model_prompt_details["initial_prompt_value"], - final_prompt_value=model_prompt_details["final_prompt_value"], - messages=messages, - ) - else: - # Separate system prompt from rest of message - system_prompt_indices = [] - system_prompt = "" - for idx, message in enumerate(messages): - if message["role"] == "system": - system_prompt += message["content"] - system_prompt_indices.append(idx) - if len(system_prompt_indices) > 0: - for idx in reversed(system_prompt_indices): - messages.pop(idx) - if len(system_prompt) > 0: - optional_params["system"] = system_prompt - # Format rest of message according to anthropic guidelines + print_verbose(f"raw model_response: {response.text}") + ## RESPONSE OBJECT try: - messages = prompt_factory( - model=model, messages=messages, custom_llm_provider="anthropic" + completion_response = response.json() + except: + raise AnthropicError( + message=response.text, status_code=response.status_code ) - except Exception as e: - raise AnthropicError(status_code=400, message=str(e)) - - ## Load Config - config = litellm.AnthropicConfig.get_config() - for k, v in config.items(): - if ( - k not in optional_params - ): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in - optional_params[k] = v - - ## Handle Tool Calling - if "tools" in optional_params: - _is_function_call = True - headers["anthropic-beta"] = "tools-2024-04-04" - - anthropic_tools = [] - for tool in optional_params["tools"]: - new_tool = tool["function"] - new_tool["input_schema"] = new_tool.pop("parameters") # rename key - anthropic_tools.append(new_tool) - - optional_params["tools"] = anthropic_tools - - stream = optional_params.pop("stream", None) - - data = { - "model": model, - "messages": messages, - **optional_params, - } - - ## LOGGING - logging_obj.pre_call( - input=messages, - api_key=api_key, - additional_args={ - "complete_input_dict": data, - "api_base": api_base, - "headers": headers, - }, - ) - print_verbose(f"_is_function_call: {_is_function_call}") - if acompletion == True: - if ( - stream and not _is_function_call - ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) - print_verbose("makes async anthropic streaming POST request") - data["stream"] = stream - return acompletion_stream_function( - model=model, - messages=messages, - data=data, - api_base=api_base, - custom_prompt_dict=custom_prompt_dict, - model_response=model_response, - print_verbose=print_verbose, - encoding=encoding, - api_key=api_key, - logging_obj=logging_obj, - optional_params=optional_params, - stream=stream, - _is_function_call=_is_function_call, - litellm_params=litellm_params, - logger_fn=logger_fn, - headers=headers, + if "error" in completion_response: + raise AnthropicError( + message=str(completion_response["error"]), + status_code=response.status_code, + ) + elif len(completion_response["content"]) == 0: + raise AnthropicError( + message="No content in response", + status_code=response.status_code, ) else: - return acompletion_function( - model=model, + text_content = "" + tool_calls = [] + for content in completion_response["content"]: + if content["type"] == "text": + text_content += content["text"] + ## TOOL CALLING + elif content["type"] == "tool_use": + tool_calls.append( + { + "id": content["id"], + "type": "function", + "function": { + "name": content["name"], + "arguments": json.dumps(content["input"]), + }, + } + ) + + _message = litellm.Message( + tool_calls=tool_calls, + content=text_content or None, + ) + model_response.choices[0].message = _message # type: ignore + model_response._hidden_params["original_response"] = completion_response[ + "content" + ] # allow user to access raw anthropic tool calling response + + model_response.choices[0].finish_reason = map_finish_reason( + completion_response["stop_reason"] + ) + + print_verbose(f"_is_function_call: {_is_function_call}; stream: {stream}") + if _is_function_call and stream: + print_verbose("INSIDE ANTHROPIC STREAMING TOOL CALLING CONDITION BLOCK") + # return an iterator + streaming_model_response = ModelResponse(stream=True) + streaming_model_response.choices[0].finish_reason = model_response.choices[ + 0 + ].finish_reason + # streaming_model_response.choices = [litellm.utils.StreamingChoices()] + streaming_choice = litellm.utils.StreamingChoices() + streaming_choice.index = model_response.choices[0].index + _tool_calls = [] + print_verbose( + f"type of model_response.choices[0]: {type(model_response.choices[0])}" + ) + print_verbose(f"type of streaming_choice: {type(streaming_choice)}") + if isinstance(model_response.choices[0], litellm.Choices): + if getattr( + model_response.choices[0].message, "tool_calls", None + ) is not None and isinstance( + model_response.choices[0].message.tool_calls, list + ): + for tool_call in model_response.choices[0].message.tool_calls: + _tool_call = {**tool_call.dict(), "index": 0} + _tool_calls.append(_tool_call) + delta_obj = litellm.utils.Delta( + content=getattr(model_response.choices[0].message, "content", None), + role=model_response.choices[0].message.role, + tool_calls=_tool_calls, + ) + streaming_choice.delta = delta_obj + streaming_model_response.choices = [streaming_choice] + completion_stream = ModelResponseIterator( + model_response=streaming_model_response + ) + print_verbose( + "Returns anthropic CustomStreamWrapper with 'cached_response' streaming object" + ) + return CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="cached_response", + logging_obj=logging_obj, + ) + + ## CALCULATING USAGE + prompt_tokens = completion_response["usage"]["input_tokens"] + completion_tokens = completion_response["usage"]["output_tokens"] + total_tokens = prompt_tokens + completion_tokens + + model_response["created"] = int(time.time()) + model_response["model"] = model + usage = Usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + ) + model_response.usage = usage + return model_response + + async def acompletion_stream_function( + self, + model: str, + messages: list, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + api_key, + logging_obj, + stream, + _is_function_call, + data=None, + optional_params=None, + litellm_params=None, + logger_fn=None, + headers={}, + ): + response = await self.async_handler.post( + api_base, headers=headers, data=json.dumps(data) + ) + + if response.status_code != 200: + raise AnthropicError( + status_code=response.status_code, message=response.text + ) + + completion_stream = response.aiter_lines() + + streamwrapper = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="anthropic", + logging_obj=logging_obj, + ) + return streamwrapper + + async def acompletion_function( + self, + model: str, + messages: list, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + api_key, + logging_obj, + stream, + _is_function_call, + data=None, + optional_params=None, + litellm_params=None, + logger_fn=None, + headers={}, + ): + response = await self.async_handler.post( + api_base, headers=headers, data=json.dumps(data) + ) + return self.process_response( + model=model, + response=response, + model_response=model_response, + _is_function_call=_is_function_call, + stream=stream, + logging_obj=logging_obj, + api_key=api_key, + data=data, + messages=messages, + print_verbose=print_verbose, + ) + + def completion( + self, + model: str, + messages: list, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + api_key, + logging_obj, + optional_params=None, + acompletion=None, + litellm_params=None, + logger_fn=None, + headers={}, + ): + headers = validate_environment(api_key, headers) + _is_function_call = False + messages = copy.deepcopy(messages) + optional_params = copy.deepcopy(optional_params) + if model in custom_prompt_dict: + # check if the model has a registered custom prompt + model_prompt_details = custom_prompt_dict[model] + prompt = custom_prompt( + role_dict=model_prompt_details["roles"], + initial_prompt_value=model_prompt_details["initial_prompt_value"], + final_prompt_value=model_prompt_details["final_prompt_value"], messages=messages, - data=data, - api_base=api_base, - custom_prompt_dict=custom_prompt_dict, - model_response=model_response, - print_verbose=print_verbose, - encoding=encoding, - api_key=api_key, - logging_obj=logging_obj, - optional_params=optional_params, - stream=stream, - _is_function_call=_is_function_call, - litellm_params=litellm_params, - logger_fn=logger_fn, - headers=headers, ) - else: - ## COMPLETION CALL - if ( - stream and not _is_function_call - ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) - print_verbose("makes anthropic streaming POST request") - data["stream"] = stream - response = requests.post( - api_base, - headers=headers, - data=json.dumps(data), - stream=stream, - ) - - if response.status_code != 200: - raise AnthropicError( - status_code=response.status_code, message=response.text - ) - - completion_stream = response.iter_lines() - streaming_response = CustomStreamWrapper( - completion_stream=completion_stream, - model=model, - custom_llm_provider="anthropic", - logging_obj=logging_obj, - ) - return streaming_response - else: - response = requests.post(api_base, headers=headers, data=json.dumps(data)) - if response.status_code != 200: - raise AnthropicError( - status_code=response.status_code, message=response.text + # Separate system prompt from rest of message + system_prompt_indices = [] + system_prompt = "" + for idx, message in enumerate(messages): + if message["role"] == "system": + system_prompt += message["content"] + system_prompt_indices.append(idx) + if len(system_prompt_indices) > 0: + for idx in reversed(system_prompt_indices): + messages.pop(idx) + if len(system_prompt) > 0: + optional_params["system"] = system_prompt + # Format rest of message according to anthropic guidelines + try: + messages = prompt_factory( + model=model, messages=messages, custom_llm_provider="anthropic" ) - return process_response( - model=model, - response=response, - model_response=model_response, - _is_function_call=_is_function_call, - stream=stream, - logging_obj=logging_obj, - api_key=api_key, - data=data, - messages=messages, - print_verbose=print_verbose, - ) + except Exception as e: + raise AnthropicError(status_code=400, message=str(e)) + + ## Load Config + config = litellm.AnthropicConfig.get_config() + for k, v in config.items(): + if ( + k not in optional_params + ): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in + optional_params[k] = v + + ## Handle Tool Calling + if "tools" in optional_params: + _is_function_call = True + headers["anthropic-beta"] = "tools-2024-04-04" + + anthropic_tools = [] + for tool in optional_params["tools"]: + new_tool = tool["function"] + new_tool["input_schema"] = new_tool.pop("parameters") # rename key + anthropic_tools.append(new_tool) + + optional_params["tools"] = anthropic_tools + + stream = optional_params.pop("stream", None) + + data = { + "model": model, + "messages": messages, + **optional_params, + } + + ## LOGGING + logging_obj.pre_call( + input=messages, + api_key=api_key, + additional_args={ + "complete_input_dict": data, + "api_base": api_base, + "headers": headers, + }, + ) + print_verbose(f"_is_function_call: {_is_function_call}") + if acompletion == True: + if ( + stream and not _is_function_call + ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) + print_verbose("makes async anthropic streaming POST request") + data["stream"] = stream + return self.acompletion_stream_function( + model=model, + messages=messages, + data=data, + api_base=api_base, + custom_prompt_dict=custom_prompt_dict, + model_response=model_response, + print_verbose=print_verbose, + encoding=encoding, + api_key=api_key, + logging_obj=logging_obj, + optional_params=optional_params, + stream=stream, + _is_function_call=_is_function_call, + litellm_params=litellm_params, + logger_fn=logger_fn, + headers=headers, + ) + else: + return self.acompletion_function( + model=model, + messages=messages, + data=data, + api_base=api_base, + custom_prompt_dict=custom_prompt_dict, + model_response=model_response, + print_verbose=print_verbose, + encoding=encoding, + api_key=api_key, + logging_obj=logging_obj, + optional_params=optional_params, + stream=stream, + _is_function_call=_is_function_call, + litellm_params=litellm_params, + logger_fn=logger_fn, + headers=headers, + ) + else: + ## COMPLETION CALL + if ( + stream and not _is_function_call + ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) + print_verbose("makes anthropic streaming POST request") + data["stream"] = stream + response = requests.post( + api_base, + headers=headers, + data=json.dumps(data), + stream=stream, + ) + + if response.status_code != 200: + raise AnthropicError( + status_code=response.status_code, message=response.text + ) + + completion_stream = response.iter_lines() + streaming_response = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="anthropic", + logging_obj=logging_obj, + ) + return streaming_response + + else: + response = requests.post( + api_base, headers=headers, data=json.dumps(data) + ) + if response.status_code != 200: + raise AnthropicError( + status_code=response.status_code, message=response.text + ) + return self.process_response( + model=model, + response=response, + model_response=model_response, + _is_function_call=_is_function_call, + stream=stream, + logging_obj=logging_obj, + api_key=api_key, + data=data, + messages=messages, + print_verbose=print_verbose, + ) + + def embedding(self): + # logic for parsing in - calling - parsing out model embedding calls + pass class ModelResponseIterator: @@ -509,8 +524,3 @@ class ModelResponseIterator: raise StopAsyncIteration self.is_done = True return self.model_response - - -def embedding(): - # logic for parsing in - calling - parsing out model embedding calls - pass diff --git a/litellm/main.py b/litellm/main.py index 75382b8ce..a387c9147 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -39,7 +39,6 @@ from litellm.utils import ( get_optional_params_image_gen, ) from .llms import ( - anthropic, anthropic_text, together_ai, ai21, @@ -68,6 +67,7 @@ from .llms import ( from .llms.openai import OpenAIChatCompletion, OpenAITextCompletion from .llms.azure import AzureChatCompletion from .llms.azure_text import AzureTextCompletion +from .llms.anthropic import AnthropicChatCompletion from .llms.huggingface_restapi import Huggingface from .llms.prompt_templates.factory import ( prompt_factory, @@ -99,6 +99,7 @@ from litellm.utils import ( dotenv.load_dotenv() # Loading env variables using dotenv openai_chat_completions = OpenAIChatCompletion() openai_text_completions = OpenAITextCompletion() +anthropic_chat_completions = AnthropicChatCompletion() azure_chat_completions = AzureChatCompletion() azure_text_completions = AzureTextCompletion() huggingface = Huggingface() @@ -1181,7 +1182,7 @@ def completion( or get_secret("ANTHROPIC_API_BASE") or "https://api.anthropic.com/v1/messages" ) - response = anthropic.completion( + response = anthropic_chat_completions.completion( model=model, messages=messages, api_base=api_base, From 2622f0351bc79b0021126725967822a60c4a1408 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 18:26:52 -0700 Subject: [PATCH 09/13] (ci/cd) run again --- litellm/tests/test_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index 5e7609db9..6041788ba 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -2288,7 +2288,7 @@ async def test_acompletion_claude_3_function_call_with_streaming(): elif chunk.choices[0].finish_reason is not None: # last chunk validate_final_streaming_function_calling_chunk(chunk=chunk) idx += 1 - # raise Exception("it worked!") + # raise Exception("it worked! ") except Exception as e: pytest.fail(f"Error occurred: {e}") From f08486448c0543d3d91df5123a547dfddba82e87 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 18:28:07 -0700 Subject: [PATCH 10/13] fix - test streaming --- .circleci/config.yml | 1 + litellm/tests/test_streaming.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 92892d3ff..2bad708cb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,6 +42,7 @@ jobs: pip install lunary==0.2.5 pip install "langfuse==2.7.3" pip install numpydoc + pip install nest-asyncio==1.6.0 pip install traceloop-sdk==0.0.69 pip install openai pip install prisma diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index 6041788ba..4ca61303f 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -26,6 +26,9 @@ litellm.logging = False litellm.set_verbose = True litellm.num_retries = 3 litellm.cache = None +import nest_asyncio + +nest_asyncio.apply() score = 0 From a38d3b17c5bb499f073ce162b879dfb8fdce3327 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 19:16:27 -0700 Subject: [PATCH 11/13] ci/cd run async handler --- litellm/llms/anthropic.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/litellm/llms/anthropic.py b/litellm/llms/anthropic.py index 3eca11bef..d836ed8db 100644 --- a/litellm/llms/anthropic.py +++ b/litellm/llms/anthropic.py @@ -105,9 +105,6 @@ def validate_environment(api_key, user_headers): class AnthropicChatCompletion(BaseLLM): def __init__(self) -> None: super().__init__() - self.async_handler = AsyncHTTPHandler( - timeout=httpx.Timeout(timeout=600.0, connect=5.0) - ) def process_response( self, @@ -258,6 +255,9 @@ class AnthropicChatCompletion(BaseLLM): logger_fn=None, headers={}, ): + self.async_handler = AsyncHTTPHandler( + timeout=httpx.Timeout(timeout=600.0, connect=5.0) + ) response = await self.async_handler.post( api_base, headers=headers, data=json.dumps(data) ) @@ -296,6 +296,9 @@ class AnthropicChatCompletion(BaseLLM): logger_fn=None, headers={}, ): + self.async_handler = AsyncHTTPHandler( + timeout=httpx.Timeout(timeout=600.0, connect=5.0) + ) response = await self.async_handler.post( api_base, headers=headers, data=json.dumps(data) ) From 9be250c0f0852c4a9851ed2612b97f97b06d54e4 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 19:27:26 -0700 Subject: [PATCH 12/13] add exit and aenter --- litellm/llms/custom_httpx/http_handler.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/litellm/llms/custom_httpx/http_handler.py b/litellm/llms/custom_httpx/http_handler.py index c008b0593..67e6c80da 100644 --- a/litellm/llms/custom_httpx/http_handler.py +++ b/litellm/llms/custom_httpx/http_handler.py @@ -22,6 +22,13 @@ class AsyncHTTPHandler: # Close the client when you're done with it await self.client.aclose() + async def __aenter__(self): + return self.client + + async def __aexit__(self): + # close the client when exiting + await self.client.aclose() + async def get( self, url: str, params: Optional[dict] = None, headers: Optional[dict] = None ): From d51e853b609f36c6e4d1a223df6f701b3bca5204 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 6 Apr 2024 19:28:51 -0700 Subject: [PATCH 13/13] undo adding next-asyncio --- .circleci/config.yml | 1 - litellm/tests/test_streaming.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 2bad708cb..92892d3ff 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,7 +42,6 @@ jobs: pip install lunary==0.2.5 pip install "langfuse==2.7.3" pip install numpydoc - pip install nest-asyncio==1.6.0 pip install traceloop-sdk==0.0.69 pip install openai pip install prisma diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index 4ca61303f..6041788ba 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -26,9 +26,6 @@ litellm.logging = False litellm.set_verbose = True litellm.num_retries = 3 litellm.cache = None -import nest_asyncio - -nest_asyncio.apply() score = 0