mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 11:14:04 +00:00
flush_all_updates_from_in_memory_queue
This commit is contained in:
parent
87b834585b
commit
462827c9ec
2 changed files with 8 additions and 1 deletions
|
@ -26,6 +26,7 @@ REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
|
||||||
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
|
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
|
||||||
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
|
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
|
||||||
MAX_SIZE_IN_MEMORY_QUEUE = 10000
|
MAX_SIZE_IN_MEMORY_QUEUE = 10000
|
||||||
|
MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000
|
||||||
###############################################################################################
|
###############################################################################################
|
||||||
MINIMUM_PROMPT_CACHE_TOKEN_COUNT = (
|
MINIMUM_PROMPT_CACHE_TOKEN_COUNT = (
|
||||||
1024 # minimum number of tokens to cache a prompt by Anthropic
|
1024 # minimum number of tokens to cache a prompt by Anthropic
|
||||||
|
|
|
@ -4,7 +4,7 @@ Base class for in memory buffer for database transactions
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from litellm._logging import verbose_proxy_logger
|
from litellm._logging import verbose_proxy_logger
|
||||||
from litellm.constants import MAX_SIZE_IN_MEMORY_QUEUE
|
from litellm.constants import MAX_IN_MEMORY_QUEUE_FLUSH_COUNT, MAX_SIZE_IN_MEMORY_QUEUE
|
||||||
|
|
||||||
|
|
||||||
class BaseUpdateQueue:
|
class BaseUpdateQueue:
|
||||||
|
@ -23,5 +23,11 @@ class BaseUpdateQueue:
|
||||||
"""Get all updates from the queue."""
|
"""Get all updates from the queue."""
|
||||||
updates = []
|
updates = []
|
||||||
while not self.update_queue.empty():
|
while not self.update_queue.empty():
|
||||||
|
if len(updates) >= MAX_IN_MEMORY_QUEUE_FLUSH_COUNT:
|
||||||
|
# circuit breaker to ensure we're not stuck dequeuing updates
|
||||||
|
verbose_proxy_logger.warning(
|
||||||
|
"Max in memory queue flush count reached, stopping flush"
|
||||||
|
)
|
||||||
|
break
|
||||||
updates.append(await self.update_queue.get())
|
updates.append(await self.update_queue.get())
|
||||||
return updates
|
return updates
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue