feat(scheduler.py): support redis caching for req. prioritization

enables req. prioritization to work across multiple instances of litellm
This commit is contained in:
Krrish Dholakia 2024-06-06 14:19:06 -07:00
parent 422b4b1728
commit b590e6607c
2 changed files with 19 additions and 10 deletions

View file

@ -220,8 +220,6 @@ class Router:
[] []
) # names of models under litellm_params. ex. azure/chatgpt-v-2 ) # names of models under litellm_params. ex. azure/chatgpt-v-2
self.deployment_latency_map = {} self.deployment_latency_map = {}
### SCHEDULER ###
self.scheduler = Scheduler(polling_interval=polling_interval)
### CACHING ### ### CACHING ###
cache_type: Literal["local", "redis"] = "local" # default to an in-memory cache cache_type: Literal["local", "redis"] = "local" # default to an in-memory cache
redis_cache = None redis_cache = None
@ -259,6 +257,10 @@ class Router:
redis_cache=redis_cache, in_memory_cache=InMemoryCache() redis_cache=redis_cache, in_memory_cache=InMemoryCache()
) # use a dual cache (Redis+In-Memory) for tracking cooldowns, usage, etc. ) # use a dual cache (Redis+In-Memory) for tracking cooldowns, usage, etc.
### SCHEDULER ###
self.scheduler = Scheduler(
polling_interval=polling_interval, redis_cache=redis_cache
)
self.default_deployment = None # use this to track the users default deployment, when they want to use model = * self.default_deployment = None # use this to track the users default deployment, when they want to use model = *
self.default_max_parallel_requests = default_max_parallel_requests self.default_max_parallel_requests = default_max_parallel_requests

View file

@ -1,13 +1,14 @@
import heapq, time import heapq
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
import enum import enum
from litellm.caching import DualCache from litellm.caching import DualCache, RedisCache
from litellm import print_verbose from litellm import print_verbose
class SchedulerCacheKeys(enum.Enum): class SchedulerCacheKeys(enum.Enum):
queue = "scheduler:queue" queue = "scheduler:queue"
default_in_memory_ttl = 5 # cache queue in-memory for 5s when redis cache available
class DefaultPriorities(enum.Enum): class DefaultPriorities(enum.Enum):
@ -25,18 +26,24 @@ class FlowItem(BaseModel):
class Scheduler: class Scheduler:
cache: DualCache cache: DualCache
def __init__(self, polling_interval: Optional[float] = None): def __init__(
self,
polling_interval: Optional[float] = None,
redis_cache: Optional[RedisCache] = None,
):
""" """
polling_interval: float or null - frequency of polling queue. Default is 3ms. polling_interval: float or null - frequency of polling queue. Default is 3ms.
""" """
self.queue: list = [] self.queue: list = []
self.cache = DualCache() default_in_memory_ttl: Optional[float] = None
if redis_cache is not None:
# if redis-cache available frequently poll that instead of using in-memory.
default_in_memory_ttl = SchedulerCacheKeys.default_in_memory_ttl.value
self.cache = DualCache(
redis_cache=redis_cache, default_in_memory_ttl=default_in_memory_ttl
)
self.polling_interval = polling_interval or 0.03 # default to 3ms self.polling_interval = polling_interval or 0.03 # default to 3ms
def update_variables(self, cache: Optional[DualCache] = None):
if cache is not None:
self.cache = cache
async def add_request(self, request: FlowItem): async def add_request(self, request: FlowItem):
# We use the priority directly, as lower values indicate higher priority # We use the priority directly, as lower values indicate higher priority
# get the queue # get the queue