From 324bf027f50a68dac85c4fe6419f38b8226d62e6 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Tue, 28 May 2024 16:29:09 -0700 Subject: [PATCH] fix(anthropic.py): fix parallel streaming on anthropic.py prevent parallel requests from cancelling each other Fixes https://github.com/BerriAI/litellm/issues/3881 --- litellm/llms/anthropic.py | 15 +- litellm/llms/custom_httpx/http_handler.py | 3 +- litellm/tests/test_streaming.py | 314 ++++++++++------------ 3 files changed, 152 insertions(+), 180 deletions(-) diff --git a/litellm/llms/anthropic.py b/litellm/llms/anthropic.py index 1ca048523..ec6854a0f 100644 --- a/litellm/llms/anthropic.py +++ b/litellm/llms/anthropic.py @@ -379,13 +379,12 @@ class AnthropicChatCompletion(BaseLLM): logger_fn=None, headers={}, ): - self.async_handler = AsyncHTTPHandler( - timeout=httpx.Timeout(timeout=600.0, connect=5.0) + + async_handler = AsyncHTTPHandler( + timeout=httpx.Timeout(timeout=600.0, connect=20.0) ) data["stream"] = True - response = await self.async_handler.post( - api_base, headers=headers, data=json.dumps(data), stream=True - ) + response = await async_handler.post(api_base, headers=headers, json=data) if response.status_code != 200: raise AnthropicError( @@ -421,12 +420,10 @@ class AnthropicChatCompletion(BaseLLM): logger_fn=None, headers={}, ) -> Union[ModelResponse, CustomStreamWrapper]: - self.async_handler = AsyncHTTPHandler( + async_handler = AsyncHTTPHandler( timeout=httpx.Timeout(timeout=600.0, connect=5.0) ) - response = await self.async_handler.post( - api_base, headers=headers, data=json.dumps(data) - ) + response = await async_handler.post(api_base, headers=headers, json=data) if stream and _is_function_call: return self.process_streaming_response( model=model, diff --git a/litellm/llms/custom_httpx/http_handler.py b/litellm/llms/custom_httpx/http_handler.py index 4df25944b..8b5f11398 100644 --- a/litellm/llms/custom_httpx/http_handler.py +++ b/litellm/llms/custom_httpx/http_handler.py @@ -43,12 +43,13 @@ class AsyncHTTPHandler: self, url: str, data: Optional[Union[dict, str]] = None, # type: ignore + json: Optional[dict] = None, params: Optional[dict] = None, headers: Optional[dict] = None, stream: bool = False, ): req = self.client.build_request( - "POST", url, data=data, params=params, headers=headers # type: ignore + "POST", url, data=data, json=json, params=params, headers=headers # type: ignore ) response = await self.client.send(req, stream=stream) return response diff --git a/litellm/tests/test_streaming.py b/litellm/tests/test_streaming.py index 3b60896d4..f32bba50b 100644 --- a/litellm/tests/test_streaming.py +++ b/litellm/tests/test_streaming.py @@ -3,7 +3,7 @@ import sys, os, asyncio import traceback -import time, pytest +import time, pytest, uuid from pydantic import BaseModel from typing import Tuple @@ -241,203 +241,138 @@ def test_completion_azure_stream_content_filter_no_delta(): """ try: chunks = [ - { + { "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", "choices": [ { - "delta": { - "content": "", - "role": "assistant" - }, - "finish_reason": None, - "index": 0 + "delta": {"content": "", "role": "assistant"}, + "finish_reason": None, + "index": 0, } ], "created": 1716563849, "model": "gpt-4o-2024-05-13", "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { + "system_fingerprint": "fp_5f4bad809a", + }, + { + "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", + "choices": [ + {"delta": {"content": "This"}, "finish_reason": None, "index": 0} + ], + "created": 1716563849, + "model": "gpt-4o-2024-05-13", + "object": "chat.completion.chunk", + "system_fingerprint": "fp_5f4bad809a", + }, + { + "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", + "choices": [ + {"delta": {"content": " is"}, "finish_reason": None, "index": 0} + ], + "created": 1716563849, + "model": "gpt-4o-2024-05-13", + "object": "chat.completion.chunk", + "system_fingerprint": "fp_5f4bad809a", + }, + { + "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", + "choices": [ + {"delta": {"content": " a"}, "finish_reason": None, "index": 0} + ], + "created": 1716563849, + "model": "gpt-4o-2024-05-13", + "object": "chat.completion.chunk", + "system_fingerprint": "fp_5f4bad809a", + }, + { + "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", + "choices": [ + {"delta": {"content": " dummy"}, "finish_reason": None, "index": 0} + ], + "created": 1716563849, + "model": "gpt-4o-2024-05-13", + "object": "chat.completion.chunk", + "system_fingerprint": "fp_5f4bad809a", + }, + { "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", "choices": [ { - "delta": { - "content": "This" - }, - "finish_reason": None, - "index": 0 + "delta": {"content": " response"}, + "finish_reason": None, + "index": 0, } ], "created": 1716563849, "model": "gpt-4o-2024-05-13", "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { - "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", - "choices": [ - { - "delta": { - "content": " is" - }, - "finish_reason": None, - "index": 0 - } - ], - "created": 1716563849, - "model": "gpt-4o-2024-05-13", - "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { - "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", - "choices": [ - { - "delta": { - "content": " a" - }, - "finish_reason": None, - "index": 0 - } - ], - "created": 1716563849, - "model": "gpt-4o-2024-05-13", - "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { - "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", - "choices": [ - { - "delta": { - "content": " dummy" - }, - "finish_reason": None, - "index": 0 - } - ], - "created": 1716563849, - "model": "gpt-4o-2024-05-13", - "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { - "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", - "choices": [ - { - "delta": { - "content": " response" - }, - "finish_reason": None, - "index": 0 - } - ], - "created": 1716563849, - "model": "gpt-4o-2024-05-13", - "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { + "system_fingerprint": "fp_5f4bad809a", + }, + { "id": "", "choices": [ { - "finish_reason": None, - "index": 0, - "content_filter_offsets": { - "check_offset": 35159, - "start_offset": 35159, - "end_offset": 36150 - }, - "content_filter_results": { - "hate": { - "filtered": False, - "severity": "safe" + "finish_reason": None, + "index": 0, + "content_filter_offsets": { + "check_offset": 35159, + "start_offset": 35159, + "end_offset": 36150, }, - "self_harm": { - "filtered": False, - "severity": "safe" + "content_filter_results": { + "hate": {"filtered": False, "severity": "safe"}, + "self_harm": {"filtered": False, "severity": "safe"}, + "sexual": {"filtered": False, "severity": "safe"}, + "violence": {"filtered": False, "severity": "safe"}, }, - "sexual": { - "filtered": False, - "severity": "safe" - }, - "violence": { - "filtered": False, - "severity": "safe" - } - } } ], "created": 0, "model": "", - "object": "" - }, - { + "object": "", + }, + { "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", "choices": [ - { - "delta": { - "content": "." - }, - "finish_reason": None, - "index": 0 - } + {"delta": {"content": "."}, "finish_reason": None, "index": 0} ], "created": 1716563849, "model": "gpt-4o-2024-05-13", "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { + "system_fingerprint": "fp_5f4bad809a", + }, + { "id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj", - "choices": [ - { - "delta": {}, - "finish_reason": "stop", - "index": 0 - } - ], + "choices": [{"delta": {}, "finish_reason": "stop", "index": 0}], "created": 1716563849, "model": "gpt-4o-2024-05-13", "object": "chat.completion.chunk", - "system_fingerprint": "fp_5f4bad809a" - }, - { + "system_fingerprint": "fp_5f4bad809a", + }, + { "id": "", "choices": [ { - "finish_reason": None, - "index": 0, - "content_filter_offsets": { - "check_offset": 36150, - "start_offset": 36060, - "end_offset": 37029 - }, - "content_filter_results": { - "hate": { - "filtered": False, - "severity": "safe" + "finish_reason": None, + "index": 0, + "content_filter_offsets": { + "check_offset": 36150, + "start_offset": 36060, + "end_offset": 37029, }, - "self_harm": { - "filtered": False, - "severity": "safe" + "content_filter_results": { + "hate": {"filtered": False, "severity": "safe"}, + "self_harm": {"filtered": False, "severity": "safe"}, + "sexual": {"filtered": False, "severity": "safe"}, + "violence": {"filtered": False, "severity": "safe"}, }, - "sexual": { - "filtered": False, - "severity": "safe" - }, - "violence": { - "filtered": False, - "severity": "safe" - } - } } ], "created": 0, "model": "", - "object": "" - } + "object": "", + }, ] chunk_list = [] @@ -1449,29 +1384,68 @@ def test_bedrock_claude_3_streaming(): pytest.fail(f"Error occurred: {e}") +@pytest.mark.parametrize("sync_mode", [True, False]) @pytest.mark.asyncio -async def test_claude_3_streaming_finish_reason(): +async def test_claude_3_streaming_finish_reason(sync_mode): try: + import threading + litellm.set_verbose = True messages = [ {"role": "system", "content": "Be helpful"}, {"role": "user", "content": "What do you know?"}, ] - response: ModelResponse = await litellm.acompletion( # type: ignore - model="claude-3-opus-20240229", - messages=messages, - stream=True, - max_tokens=10, - ) - complete_response = "" - # Add any assertions here to-check the response - num_finish_reason = 0 - async for chunk in response: - print(f"chunk: {chunk}") - if isinstance(chunk, ModelResponse): - if chunk.choices[0].finish_reason is not None: - num_finish_reason += 1 - assert num_finish_reason == 1 + + def sync_test_streaming(): + response: litellm.CustomStreamWrapper = litellm.acompletion( # type: ignore + model="claude-3-opus-20240229", + messages=messages, + stream=True, + max_tokens=10, + ) + complete_response = "" + # Add any assertions here to-check the response + num_finish_reason = 0 + for chunk in response: + print(f"chunk: {chunk}") + if isinstance(chunk, ModelResponse): + if chunk.choices[0].finish_reason is not None: + num_finish_reason += 1 + assert num_finish_reason == 1 + + async def test_streaming(): + response: litellm.CustomStreamWrapper = await litellm.acompletion( # type: ignore + model="claude-3-opus-20240229", + messages=messages, + stream=True, + max_tokens=10, + ) + complete_response = "" + # Add any assertions here to-check the response + num_finish_reason = 0 + async for chunk in response: + print(f"chunk: {chunk}") + if isinstance(chunk, ModelResponse): + if chunk.choices[0].finish_reason is not None: + num_finish_reason += 1 + assert num_finish_reason == 1 + + tasks = [] + for _ in range(2): + if sync_mode == False: + tasks.append(test_streaming()) + else: + thread = threading.Thread(target=sync_test_streaming) + thread.start() + tasks.append(thread) + + if sync_mode == False: + await asyncio.gather(*tasks) + else: + # Wait for all threads to complete + for thread in tasks: + thread.join() + except RateLimitError: pass except Exception as e: