diff --git a/litellm/proxy/db/db_transaction_queue/base_update_queue.py b/litellm/proxy/db/db_transaction_queue/base_update_queue.py index 234b8eff8a..924c4e6905 100644 --- a/litellm/proxy/db/db_transaction_queue/base_update_queue.py +++ b/litellm/proxy/db/db_transaction_queue/base_update_queue.py @@ -23,8 +23,8 @@ class BaseUpdateQueue: """Get all updates from the queue.""" updates = [] while not self.update_queue.empty(): + # Circuit breaker to ensure we're not stuck dequeuing updates. Protect CPU utilization if len(updates) >= MAX_IN_MEMORY_QUEUE_FLUSH_COUNT: - # circuit breaker to ensure we're not stuck dequeuing updates verbose_proxy_logger.warning( "Max in memory queue flush count reached, stopping flush" ) diff --git a/tests/litellm/proxy/db/db_transaction_queue/test_base_update_queue.py b/tests/litellm/proxy/db/db_transaction_queue/test_base_update_queue.py new file mode 100644 index 0000000000..e1d4cb0541 --- /dev/null +++ b/tests/litellm/proxy/db/db_transaction_queue/test_base_update_queue.py @@ -0,0 +1,41 @@ +import asyncio +import json +import os +import sys + +import pytest +from fastapi.testclient import TestClient + +sys.path.insert( + 0, os.path.abspath("../../..") +) # Adds the parent directory to the system path + +from litellm.constants import MAX_IN_MEMORY_QUEUE_FLUSH_COUNT +from litellm.proxy.db.db_transaction_queue.base_update_queue import BaseUpdateQueue + + +@pytest.mark.asyncio +async def test_queue_flush_limit(): + """ + Test to ensure we don't dequeue more than MAX_IN_MEMORY_QUEUE_FLUSH_COUNT items. + """ + # Arrange + queue = BaseUpdateQueue() + # Add more items than the max flush count + items_to_add = MAX_IN_MEMORY_QUEUE_FLUSH_COUNT + 100 + + for i in range(items_to_add): + await queue.add_update(f"test_update_{i}") + + # Act + flushed_updates = await queue.flush_all_updates_from_in_memory_queue() + + # Assert + assert ( + len(flushed_updates) == MAX_IN_MEMORY_QUEUE_FLUSH_COUNT + ), f"Expected {MAX_IN_MEMORY_QUEUE_FLUSH_COUNT} items, but got {len(flushed_updates)}" + + # Verify remaining items are still in queue + assert ( + queue.update_queue.qsize() == 100 + ), "Expected 100 items to remain in the queue"