litellm-mirror/litellm/timeout.py
Krish Dholakia d57be47b0f
Litellm ruff linting enforcement (#5992)
* ci(config.yml): add a 'check_code_quality' step

Addresses https://github.com/BerriAI/litellm/issues/5991

* ci(config.yml): check why circle ci doesn't pick up this test

* ci(config.yml): fix to run 'check_code_quality' tests

* fix(__init__.py): fix unprotected import

* fix(__init__.py): don't remove unused imports

* build(ruff.toml): update ruff.toml to ignore unused imports

* fix: fix: ruff + pyright - fix linting + type-checking errors

* fix: fix linting errors

* fix(lago.py): fix module init error

* fix: fix linting errors

* ci(config.yml): cd into correct dir for checks

* fix(proxy_server.py): fix linting error

* fix(utils.py): fix bare except

causes ruff linting errors

* fix: ruff - fix remaining linting errors

* fix(clickhouse.py): use standard logging object

* fix(__init__.py): fix unprotected import

* fix: ruff - fix linting errors

* fix: fix linting errors

* ci(config.yml): cleanup code qa step (formatting handled in local_testing)

* fix(_health_endpoints.py): fix ruff linting errors

* ci(config.yml): just use ruff in check_code_quality pipeline for now

* build(custom_guardrail.py): include missing file

* style(embedding_handler.py): fix ruff check
2024-10-01 19:44:20 -04:00

111 lines
4.2 KiB
Python

# +-----------------------------------------------+
# | |
# | Give Feedback / Get Help |
# | https://github.com/BerriAI/litellm/issues/new |
# | |
# +-----------------------------------------------+
#
# Thank you users! We ❤️ you! - Krrish & Ishaan
"""
Module containing "timeout" decorator for sync and async callables.
"""
import asyncio
from concurrent import futures
from functools import wraps
from inspect import iscoroutinefunction
from threading import Thread
from litellm.exceptions import Timeout
def timeout(timeout_duration: float = 0.0, exception_to_raise=Timeout):
"""
Wraps a function to raise the specified exception if execution time
is greater than the specified timeout.
Works with both synchronous and asynchronous callables, but with synchronous ones will introduce
some overhead due to the backend use of threads and asyncio.
:param float timeout_duration: Timeout duration in seconds. If none callable won't time out.
:param OpenAIError exception_to_raise: Exception to raise when the callable times out.
Defaults to TimeoutError.
:return: The decorated function.
:rtype: callable
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
async def async_func():
return func(*args, **kwargs)
thread = _LoopWrapper()
thread.start()
future = asyncio.run_coroutine_threadsafe(async_func(), thread.loop)
local_timeout_duration = timeout_duration
if "force_timeout" in kwargs and kwargs["force_timeout"] is not None:
local_timeout_duration = kwargs["force_timeout"]
elif "request_timeout" in kwargs and kwargs["request_timeout"] is not None:
local_timeout_duration = kwargs["request_timeout"]
try:
result = future.result(timeout=local_timeout_duration)
except futures.TimeoutError:
thread.stop_loop()
model = args[0] if len(args) > 0 else kwargs["model"]
raise exception_to_raise(
f"A timeout error occurred. The function call took longer than {local_timeout_duration} second(s).",
model=model, # [TODO]: replace with logic for parsing out llm provider from model name
llm_provider="openai",
)
thread.stop_loop()
return result
@wraps(func)
async def async_wrapper(*args, **kwargs):
local_timeout_duration = timeout_duration
if "force_timeout" in kwargs:
local_timeout_duration = kwargs["force_timeout"]
elif "request_timeout" in kwargs and kwargs["request_timeout"] is not None:
local_timeout_duration = kwargs["request_timeout"]
try:
value = await asyncio.wait_for(
func(*args, **kwargs), timeout=timeout_duration
)
return value
except asyncio.TimeoutError:
model = args[0] if len(args) > 0 else kwargs["model"]
raise exception_to_raise(
f"A timeout error occurred. The function call took longer than {local_timeout_duration} second(s).",
model=model, # [TODO]: replace with logic for parsing out llm provider from model name
llm_provider="openai",
)
if iscoroutinefunction(func):
return async_wrapper
return wrapper
return decorator
class _LoopWrapper(Thread):
def __init__(self):
super().__init__(daemon=True)
self.loop = asyncio.new_event_loop()
def run(self) -> None:
try:
self.loop.run_forever()
self.loop.call_soon_threadsafe(self.loop.close)
except Exception:
# Log exception here
pass
finally:
self.loop.close()
asyncio.set_event_loop(None)
def stop_loop(self):
for task in asyncio.all_tasks(self.loop):
task.cancel()
self.loop.call_soon_threadsafe(self.loop.stop)