mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 02:34:29 +00:00
* test_acompletion_fallbacks_basic * use common run_async_function * fix completion_with_fallbacks * fix completion with fallbacks * fix fallback utils * test_acompletion_fallbacks_basic * test_completion_fallbacks_sync * huggingface/mistralai/Mistral-7B-Instruct-v0.3
116 lines
3.9 KiB
Python
116 lines
3.9 KiB
Python
import asyncio
|
|
import functools
|
|
from typing import Awaitable, Callable, Optional
|
|
|
|
import anyio
|
|
import anyio.to_thread
|
|
from typing_extensions import ParamSpec, TypeVar
|
|
|
|
T_ParamSpec = ParamSpec("T_ParamSpec")
|
|
T_Retval = TypeVar("T_Retval")
|
|
|
|
|
|
def function_has_argument(function: Callable, arg_name: str) -> bool:
|
|
"""Helper function to check if a function has a specific argument."""
|
|
import inspect
|
|
|
|
signature = inspect.signature(function)
|
|
return arg_name in signature.parameters
|
|
|
|
|
|
def asyncify(
|
|
function: Callable[T_ParamSpec, T_Retval],
|
|
*,
|
|
cancellable: bool = False,
|
|
limiter: Optional[anyio.CapacityLimiter] = None,
|
|
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
|
|
"""
|
|
Take a blocking function and create an async one that receives the same
|
|
positional and keyword arguments, and that when called, calls the original function
|
|
in a worker thread using `anyio.to_thread.run_sync()`.
|
|
|
|
If the `cancellable` option is enabled and the task waiting for its completion is
|
|
cancelled, the thread will still run its course but its return value (or any raised
|
|
exception) will be ignored.
|
|
|
|
## Arguments
|
|
- `function`: a blocking regular callable (e.g. a function)
|
|
- `cancellable`: `True` to allow cancellation of the operation
|
|
- `limiter`: capacity limiter to use to limit the total amount of threads running
|
|
(if omitted, the default limiter is used)
|
|
|
|
## Return
|
|
An async function that takes the same positional and keyword arguments as the
|
|
original one, that when called runs the same original function in a thread worker
|
|
and returns the result.
|
|
"""
|
|
|
|
async def wrapper(
|
|
*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
|
|
) -> T_Retval:
|
|
partial_f = functools.partial(function, *args, **kwargs)
|
|
|
|
# In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old
|
|
# `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid
|
|
# surfacing deprecation warnings.
|
|
if function_has_argument(anyio.to_thread.run_sync, "abandon_on_cancel"):
|
|
return await anyio.to_thread.run_sync(
|
|
partial_f,
|
|
abandon_on_cancel=cancellable,
|
|
limiter=limiter,
|
|
)
|
|
|
|
return await anyio.to_thread.run_sync(
|
|
partial_f,
|
|
cancellable=cancellable,
|
|
limiter=limiter,
|
|
)
|
|
|
|
return wrapper
|
|
|
|
|
|
def run_async_function(async_function, *args, **kwargs):
|
|
"""
|
|
Helper utility to run an async function in a sync context.
|
|
Handles the case where there is an existing event loop running.
|
|
|
|
Args:
|
|
async_function (Callable): The async function to run
|
|
*args: Positional arguments to pass to the async function
|
|
**kwargs: Keyword arguments to pass to the async function
|
|
|
|
Returns:
|
|
The result of the async function execution
|
|
|
|
Example:
|
|
```python
|
|
async def my_async_func(x, y):
|
|
return x + y
|
|
|
|
result = run_async_function(my_async_func, 1, 2)
|
|
```
|
|
"""
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
def run_in_new_loop():
|
|
"""Run the coroutine in a new event loop within this thread."""
|
|
new_loop = asyncio.new_event_loop()
|
|
try:
|
|
asyncio.set_event_loop(new_loop)
|
|
return new_loop.run_until_complete(async_function(*args, **kwargs))
|
|
finally:
|
|
new_loop.close()
|
|
asyncio.set_event_loop(None)
|
|
|
|
try:
|
|
# First, try to get the current event loop
|
|
_ = asyncio.get_running_loop()
|
|
# If we're already in an event loop, run in a separate thread
|
|
# to avoid nested event loop issues
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(run_in_new_loop)
|
|
return future.result()
|
|
|
|
except RuntimeError:
|
|
# No running event loop, we can safely run in this thread
|
|
return run_in_new_loop()
|