mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
test_queue_flush_limit
This commit is contained in:
parent
e77a178a37
commit
decb6649ec
2 changed files with 42 additions and 1 deletions
|
@ -23,8 +23,8 @@ class BaseUpdateQueue:
|
||||||
"""Get all updates from the queue."""
|
"""Get all updates from the queue."""
|
||||||
updates = []
|
updates = []
|
||||||
while not self.update_queue.empty():
|
while not self.update_queue.empty():
|
||||||
|
# Circuit breaker to ensure we're not stuck dequeuing updates. Protect CPU utilization
|
||||||
if len(updates) >= MAX_IN_MEMORY_QUEUE_FLUSH_COUNT:
|
if len(updates) >= MAX_IN_MEMORY_QUEUE_FLUSH_COUNT:
|
||||||
# circuit breaker to ensure we're not stuck dequeuing updates
|
|
||||||
verbose_proxy_logger.warning(
|
verbose_proxy_logger.warning(
|
||||||
"Max in memory queue flush count reached, stopping flush"
|
"Max in memory queue flush count reached, stopping flush"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
sys.path.insert(
|
||||||
|
0, os.path.abspath("../../..")
|
||||||
|
) # Adds the parent directory to the system path
|
||||||
|
|
||||||
|
from litellm.constants import MAX_IN_MEMORY_QUEUE_FLUSH_COUNT
|
||||||
|
from litellm.proxy.db.db_transaction_queue.base_update_queue import BaseUpdateQueue
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_queue_flush_limit():
|
||||||
|
"""
|
||||||
|
Test to ensure we don't dequeue more than MAX_IN_MEMORY_QUEUE_FLUSH_COUNT items.
|
||||||
|
"""
|
||||||
|
# Arrange
|
||||||
|
queue = BaseUpdateQueue()
|
||||||
|
# Add more items than the max flush count
|
||||||
|
items_to_add = MAX_IN_MEMORY_QUEUE_FLUSH_COUNT + 100
|
||||||
|
|
||||||
|
for i in range(items_to_add):
|
||||||
|
await queue.add_update(f"test_update_{i}")
|
||||||
|
|
||||||
|
# Act
|
||||||
|
flushed_updates = await queue.flush_all_updates_from_in_memory_queue()
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert (
|
||||||
|
len(flushed_updates) == MAX_IN_MEMORY_QUEUE_FLUSH_COUNT
|
||||||
|
), f"Expected {MAX_IN_MEMORY_QUEUE_FLUSH_COUNT} items, but got {len(flushed_updates)}"
|
||||||
|
|
||||||
|
# Verify remaining items are still in queue
|
||||||
|
assert (
|
||||||
|
queue.update_queue.qsize() == 100
|
||||||
|
), "Expected 100 items to remain in the queue"
|
Loading…
Add table
Add a link
Reference in a new issue