flush_all_updates_from_in_memory_queue

This commit is contained in:
Ishaan Jaff 2025-04-04 15:34:56 -07:00
parent 065477abb4
commit 93068cb142
2 changed files with 8 additions and 1 deletions

View file

@ -26,6 +26,7 @@ REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
MAX_SIZE_IN_MEMORY_QUEUE = 10000
MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000
###############################################################################################
MINIMUM_PROMPT_CACHE_TOKEN_COUNT = (
1024 # minimum number of tokens to cache a prompt by Anthropic

View file

@ -4,7 +4,7 @@ Base class for in memory buffer for database transactions
import asyncio
from litellm._logging import verbose_proxy_logger
from litellm.constants import MAX_SIZE_IN_MEMORY_QUEUE
from litellm.constants import MAX_IN_MEMORY_QUEUE_FLUSH_COUNT, MAX_SIZE_IN_MEMORY_QUEUE
class BaseUpdateQueue:
@ -23,5 +23,11 @@ class BaseUpdateQueue:
"""Get all updates from the queue."""
updates = []
while not self.update_queue.empty():
if len(updates) >= MAX_IN_MEMORY_QUEUE_FLUSH_COUNT:
# circuit breaker to ensure we're not stuck dequeuing updates
verbose_proxy_logger.warning(
"Max in memory queue flush count reached, stopping flush"
)
break
updates.append(await self.update_queue.get())
return updates