litellm-mirror/litellm/llms/custom_httpx/http_handler.py
Krrish Dholakia 324bf027f5 fix(anthropic.py): fix parallel streaming on anthropic.py
prevent parallel requests from cancelling each other

Fixes https://github.com/BerriAI/litellm/issues/3881
2024-05-28 16:29:09 -07:00

114 lines
3.3 KiB
Python

import httpx, asyncio
from typing import Optional, Union, Mapping, Any
# https://www.python-httpx.org/advanced/timeouts
_DEFAULT_TIMEOUT = httpx.Timeout(timeout=5.0, connect=5.0)
class AsyncHTTPHandler:
def __init__(
self,
timeout: Optional[Union[float, httpx.Timeout]] = None,
concurrent_limit=1000,
):
if timeout is None:
timeout = _DEFAULT_TIMEOUT
# Create a client with a connection pool
self.client = httpx.AsyncClient(
timeout=timeout,
limits=httpx.Limits(
max_connections=concurrent_limit,
max_keepalive_connections=concurrent_limit,
),
)
async def close(self):
# Close the client when you're done with it
await self.client.aclose()
async def __aenter__(self):
return self.client
async def __aexit__(self):
# close the client when exiting
await self.client.aclose()
async def get(
self, url: str, params: Optional[dict] = None, headers: Optional[dict] = None
):
response = await self.client.get(url, params=params, headers=headers)
return response
async def post(
self,
url: str,
data: Optional[Union[dict, str]] = None, # type: ignore
json: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
stream: bool = False,
):
req = self.client.build_request(
"POST", url, data=data, json=json, params=params, headers=headers # type: ignore
)
response = await self.client.send(req, stream=stream)
return response
def __del__(self) -> None:
try:
asyncio.get_running_loop().create_task(self.close())
except Exception:
pass
class HTTPHandler:
def __init__(
self,
timeout: Optional[Union[float, httpx.Timeout]] = None,
concurrent_limit=1000,
client: Optional[httpx.Client] = None,
):
if timeout is None:
timeout = _DEFAULT_TIMEOUT
if client is None:
# Create a client with a connection pool
self.client = httpx.Client(
timeout=timeout,
limits=httpx.Limits(
max_connections=concurrent_limit,
max_keepalive_connections=concurrent_limit,
),
)
else:
self.client = client
def close(self):
# Close the client when you're done with it
self.client.close()
def get(
self, url: str, params: Optional[dict] = None, headers: Optional[dict] = None
):
response = self.client.get(url, params=params, headers=headers)
return response
def post(
self,
url: str,
data: Optional[Union[dict, str]] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
stream: bool = False,
):
req = self.client.build_request(
"POST", url, data=data, params=params, headers=headers # type: ignore
)
response = self.client.send(req, stream=stream)
return response
def __del__(self) -> None:
try:
self.close()
except Exception:
pass