diff --git a/litellm/router.py b/litellm/router.py index e3fed496f..0015af4db 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -220,8 +220,6 @@ class Router: [] ) # names of models under litellm_params. ex. azure/chatgpt-v-2 self.deployment_latency_map = {} - ### SCHEDULER ### - self.scheduler = Scheduler(polling_interval=polling_interval) ### CACHING ### cache_type: Literal["local", "redis"] = "local" # default to an in-memory cache redis_cache = None @@ -259,6 +257,10 @@ class Router: redis_cache=redis_cache, in_memory_cache=InMemoryCache() ) # use a dual cache (Redis+In-Memory) for tracking cooldowns, usage, etc. + ### SCHEDULER ### + self.scheduler = Scheduler( + polling_interval=polling_interval, redis_cache=redis_cache + ) self.default_deployment = None # use this to track the users default deployment, when they want to use model = * self.default_max_parallel_requests = default_max_parallel_requests diff --git a/litellm/scheduler.py b/litellm/scheduler.py index aad4ba4e3..8ee7f0e07 100644 --- a/litellm/scheduler.py +++ b/litellm/scheduler.py @@ -1,13 +1,14 @@ -import heapq, time +import heapq from pydantic import BaseModel from typing import Optional import enum -from litellm.caching import DualCache +from litellm.caching import DualCache, RedisCache from litellm import print_verbose class SchedulerCacheKeys(enum.Enum): queue = "scheduler:queue" + default_in_memory_ttl = 5 # cache queue in-memory for 5s when redis cache available class DefaultPriorities(enum.Enum): @@ -25,18 +26,24 @@ class FlowItem(BaseModel): class Scheduler: cache: DualCache - def __init__(self, polling_interval: Optional[float] = None): + def __init__( + self, + polling_interval: Optional[float] = None, + redis_cache: Optional[RedisCache] = None, + ): """ polling_interval: float or null - frequency of polling queue. Default is 3ms. """ self.queue: list = [] - self.cache = DualCache() + default_in_memory_ttl: Optional[float] = None + if redis_cache is not None: + # if redis-cache available frequently poll that instead of using in-memory. + default_in_memory_ttl = SchedulerCacheKeys.default_in_memory_ttl.value + self.cache = DualCache( + redis_cache=redis_cache, default_in_memory_ttl=default_in_memory_ttl + ) self.polling_interval = polling_interval or 0.03 # default to 3ms - def update_variables(self, cache: Optional[DualCache] = None): - if cache is not None: - self.cache = cache - async def add_request(self, request: FlowItem): # We use the priority directly, as lower values indicate higher priority # get the queue