mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-24 18:24:20 +00:00
commitb12a9892b7
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Wed Apr 2 08:09:56 2025 -0700 fix(utils.py): don't modify openai_token_counter commit294de31803
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 21:22:40 2025 -0700 fix: fix linting error commitcb6e9fbe40
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 19:52:45 2025 -0700 refactor: complete migration commitbfc159172d
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 19:09:59 2025 -0700 refactor: refactor more constants commit43ffb6a558
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:45:24 2025 -0700 fix: test commit04dbe4310c
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:28:58 2025 -0700 refactor: refactor: move more constants into constants.py commit3c26284aff
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:14:46 2025 -0700 refactor: migrate hardcoded constants out of __init__.py commitc11e0de69d
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:11:21 2025 -0700 build: migrate all constants into constants.py commit7882bdc787
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:07:37 2025 -0700 build: initial test banning hardcoded numbers in repo
136 lines
4.5 KiB
Python
136 lines
4.5 KiB
Python
import enum
|
|
import heapq
|
|
from typing import Optional
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from litellm import print_verbose
|
|
from litellm.caching.caching import DualCache, RedisCache
|
|
from litellm.constants import DEFAULT_IN_MEMORY_TTL, DEFAULT_POLLING_INTERVAL
|
|
|
|
|
|
class SchedulerCacheKeys(enum.Enum):
|
|
queue = "scheduler:queue"
|
|
default_in_memory_ttl = (
|
|
DEFAULT_IN_MEMORY_TTL # cache queue in-memory for 5s when redis cache available
|
|
)
|
|
|
|
|
|
class FlowItem(BaseModel):
|
|
priority: int # Priority between 0 and 255
|
|
request_id: str
|
|
model_name: str
|
|
|
|
|
|
class Scheduler:
|
|
cache: DualCache
|
|
|
|
def __init__(
|
|
self,
|
|
polling_interval: Optional[float] = None,
|
|
redis_cache: Optional[RedisCache] = None,
|
|
):
|
|
"""
|
|
polling_interval: float or null - frequency of polling queue. Default is 3ms.
|
|
"""
|
|
self.queue: list = []
|
|
default_in_memory_ttl: Optional[float] = None
|
|
if redis_cache is not None:
|
|
# if redis-cache available frequently poll that instead of using in-memory.
|
|
default_in_memory_ttl = SchedulerCacheKeys.default_in_memory_ttl.value
|
|
self.cache = DualCache(
|
|
redis_cache=redis_cache, default_in_memory_ttl=default_in_memory_ttl
|
|
)
|
|
self.polling_interval = (
|
|
polling_interval or DEFAULT_POLLING_INTERVAL
|
|
) # default to 3ms
|
|
|
|
async def add_request(self, request: FlowItem):
|
|
# We use the priority directly, as lower values indicate higher priority
|
|
# get the queue
|
|
queue = await self.get_queue(model_name=request.model_name)
|
|
# update the queue
|
|
heapq.heappush(queue, (request.priority, request.request_id))
|
|
|
|
# save the queue
|
|
await self.save_queue(queue=queue, model_name=request.model_name)
|
|
|
|
async def poll(self, id: str, model_name: str, health_deployments: list) -> bool:
|
|
"""
|
|
Return if request can be processed.
|
|
|
|
Returns:
|
|
- True:
|
|
* If healthy deployments are available
|
|
* OR If request at the top of queue
|
|
- False:
|
|
* If no healthy deployments available
|
|
* AND request not at the top of queue
|
|
"""
|
|
queue = await self.get_queue(model_name=model_name)
|
|
if not queue:
|
|
raise Exception(
|
|
"Incorrectly setup. Queue is invalid. Queue={}".format(queue)
|
|
)
|
|
|
|
# ------------
|
|
# Setup values
|
|
# ------------
|
|
|
|
print_verbose(f"len(health_deployments): {len(health_deployments)}")
|
|
if len(health_deployments) == 0:
|
|
print_verbose(f"queue: {queue}, seeking id={id}")
|
|
# Check if the id is at the top of the heap
|
|
if queue[0][1] == id:
|
|
# Remove the item from the queue
|
|
heapq.heappop(queue)
|
|
print_verbose(f"Popped id: {id}")
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
return True
|
|
|
|
async def peek(self, id: str, model_name: str, health_deployments: list) -> bool:
|
|
"""Return if the id is at the top of the queue. Don't pop the value from heap."""
|
|
queue = await self.get_queue(model_name=model_name)
|
|
if not queue:
|
|
raise Exception(
|
|
"Incorrectly setup. Queue is invalid. Queue={}".format(queue)
|
|
)
|
|
|
|
# ------------
|
|
# Setup values
|
|
# ------------
|
|
|
|
# Check if the id is at the top of the heap
|
|
if queue[0][1] == id:
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_queue_status(self):
|
|
"""Get the status of items in the queue"""
|
|
return self.queue
|
|
|
|
async def get_queue(self, model_name: str) -> list:
|
|
"""
|
|
Return a queue for that specific model group
|
|
"""
|
|
if self.cache is not None:
|
|
_cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_name)
|
|
response = await self.cache.async_get_cache(key=_cache_key)
|
|
if response is None or not isinstance(response, list):
|
|
return []
|
|
elif isinstance(response, list):
|
|
return response
|
|
return self.queue
|
|
|
|
async def save_queue(self, queue: list, model_name: str) -> None:
|
|
"""
|
|
Save the updated queue of the model group
|
|
"""
|
|
if self.cache is not None:
|
|
_cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_name)
|
|
await self.cache.async_set_cache(key=_cache_key, value=queue)
|
|
return None
|