fix(anthropic.py): fix parallel streaming on anthropic.py

prevent parallel requests from cancelling each other

Fixes https://github.com/BerriAI/litellm/issues/3881
This commit is contained in:
Krrish Dholakia 2024-05-28 16:29:09 -07:00
parent 073bca78d4
commit 324bf027f5
3 changed files with 152 additions and 180 deletions

View file

@ -379,13 +379,12 @@ class AnthropicChatCompletion(BaseLLM):
logger_fn=None,
headers={},
):
self.async_handler = AsyncHTTPHandler(
timeout=httpx.Timeout(timeout=600.0, connect=5.0)
async_handler = AsyncHTTPHandler(
timeout=httpx.Timeout(timeout=600.0, connect=20.0)
)
data["stream"] = True
response = await self.async_handler.post(
api_base, headers=headers, data=json.dumps(data), stream=True
)
response = await async_handler.post(api_base, headers=headers, json=data)
if response.status_code != 200:
raise AnthropicError(
@ -421,12 +420,10 @@ class AnthropicChatCompletion(BaseLLM):
logger_fn=None,
headers={},
) -> Union[ModelResponse, CustomStreamWrapper]:
self.async_handler = AsyncHTTPHandler(
async_handler = AsyncHTTPHandler(
timeout=httpx.Timeout(timeout=600.0, connect=5.0)
)
response = await self.async_handler.post(
api_base, headers=headers, data=json.dumps(data)
)
response = await async_handler.post(api_base, headers=headers, json=data)
if stream and _is_function_call:
return self.process_streaming_response(
model=model,

View file

@ -43,12 +43,13 @@ class AsyncHTTPHandler:
self,
url: str,
data: Optional[Union[dict, str]] = None, # type: ignore
json: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
stream: bool = False,
):
req = self.client.build_request(
"POST", url, data=data, params=params, headers=headers # type: ignore
"POST", url, data=data, json=json, params=params, headers=headers # type: ignore
)
response = await self.client.send(req, stream=stream)
return response

View file

@ -3,7 +3,7 @@
import sys, os, asyncio
import traceback
import time, pytest
import time, pytest, uuid
from pydantic import BaseModel
from typing import Tuple
@ -245,98 +245,69 @@ def test_completion_azure_stream_content_filter_no_delta():
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {
"content": "",
"role": "assistant"
},
"delta": {"content": "", "role": "assistant"},
"finish_reason": None,
"index": 0
"index": 0,
}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{"delta": {"content": "This"}, "finish_reason": None, "index": 0}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{"delta": {"content": " is"}, "finish_reason": None, "index": 0}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{"delta": {"content": " a"}, "finish_reason": None, "index": 0}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{"delta": {"content": " dummy"}, "finish_reason": None, "index": 0}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {
"content": "This"
},
"delta": {"content": " response"},
"finish_reason": None,
"index": 0
"index": 0,
}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {
"content": " is"
},
"finish_reason": None,
"index": 0
}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {
"content": " a"
},
"finish_reason": None,
"index": 0
}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {
"content": " dummy"
},
"finish_reason": None,
"index": 0
}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {
"content": " response"
},
"finish_reason": None,
"index": 0
}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "",
@ -347,61 +318,37 @@ def test_completion_azure_stream_content_filter_no_delta():
"content_filter_offsets": {
"check_offset": 35159,
"start_offset": 35159,
"end_offset": 36150
"end_offset": 36150,
},
"content_filter_results": {
"hate": {
"filtered": False,
"severity": "safe"
"hate": {"filtered": False, "severity": "safe"},
"self_harm": {"filtered": False, "severity": "safe"},
"sexual": {"filtered": False, "severity": "safe"},
"violence": {"filtered": False, "severity": "safe"},
},
"self_harm": {
"filtered": False,
"severity": "safe"
},
"sexual": {
"filtered": False,
"severity": "safe"
},
"violence": {
"filtered": False,
"severity": "safe"
}
}
}
],
"created": 0,
"model": "",
"object": ""
"object": "",
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {
"content": "."
},
"finish_reason": None,
"index": 0
}
{"delta": {"content": "."}, "finish_reason": None, "index": 0}
],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "chatcmpl-9SQxdH5hODqkWyJopWlaVOOUnFwlj",
"choices": [
{
"delta": {},
"finish_reason": "stop",
"index": 0
}
],
"choices": [{"delta": {}, "finish_reason": "stop", "index": 0}],
"created": 1716563849,
"model": "gpt-4o-2024-05-13",
"object": "chat.completion.chunk",
"system_fingerprint": "fp_5f4bad809a"
"system_fingerprint": "fp_5f4bad809a",
},
{
"id": "",
@ -412,32 +359,20 @@ def test_completion_azure_stream_content_filter_no_delta():
"content_filter_offsets": {
"check_offset": 36150,
"start_offset": 36060,
"end_offset": 37029
"end_offset": 37029,
},
"content_filter_results": {
"hate": {
"filtered": False,
"severity": "safe"
"hate": {"filtered": False, "severity": "safe"},
"self_harm": {"filtered": False, "severity": "safe"},
"sexual": {"filtered": False, "severity": "safe"},
"violence": {"filtered": False, "severity": "safe"},
},
"self_harm": {
"filtered": False,
"severity": "safe"
},
"sexual": {
"filtered": False,
"severity": "safe"
},
"violence": {
"filtered": False,
"severity": "safe"
}
}
}
],
"created": 0,
"model": "",
"object": ""
}
"object": "",
},
]
chunk_list = []
@ -1449,15 +1384,37 @@ def test_bedrock_claude_3_streaming():
pytest.fail(f"Error occurred: {e}")
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_claude_3_streaming_finish_reason():
async def test_claude_3_streaming_finish_reason(sync_mode):
try:
import threading
litellm.set_verbose = True
messages = [
{"role": "system", "content": "Be helpful"},
{"role": "user", "content": "What do you know?"},
]
response: ModelResponse = await litellm.acompletion( # type: ignore
def sync_test_streaming():
response: litellm.CustomStreamWrapper = litellm.acompletion( # type: ignore
model="claude-3-opus-20240229",
messages=messages,
stream=True,
max_tokens=10,
)
complete_response = ""
# Add any assertions here to-check the response
num_finish_reason = 0
for chunk in response:
print(f"chunk: {chunk}")
if isinstance(chunk, ModelResponse):
if chunk.choices[0].finish_reason is not None:
num_finish_reason += 1
assert num_finish_reason == 1
async def test_streaming():
response: litellm.CustomStreamWrapper = await litellm.acompletion( # type: ignore
model="claude-3-opus-20240229",
messages=messages,
stream=True,
@ -1472,6 +1429,23 @@ async def test_claude_3_streaming_finish_reason():
if chunk.choices[0].finish_reason is not None:
num_finish_reason += 1
assert num_finish_reason == 1
tasks = []
for _ in range(2):
if sync_mode == False:
tasks.append(test_streaming())
else:
thread = threading.Thread(target=sync_test_streaming)
thread.start()
tasks.append(thread)
if sync_mode == False:
await asyncio.gather(*tasks)
else:
# Wait for all threads to complete
for thread in tasks:
thread.join()
except RateLimitError:
pass
except Exception as e: