test_queue_size_reduction_with_large_volume

This commit is contained in:
Ishaan Jaff 2025-04-04 16:21:29 -07:00
parent dc063fdfec
commit e77a178a37
2 changed files with 183 additions and 13 deletions

View file

@ -1,4 +1,5 @@
import asyncio
from copy import deepcopy
from typing import Dict, List
from litellm._logging import verbose_proxy_logger
@ -54,25 +55,26 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
async def add_update(self, update: Dict[str, DailyUserSpendTransaction]):
"""Enqueue an update."""
verbose_proxy_logger.debug("Adding update to queue: %s", update)
await self.update_queue.put(update)
if self.update_queue.qsize() >= self.MAX_SIZE_IN_MEMORY_QUEUE:
verbose_proxy_logger.warning(
"Spend update queue is full. Aggregating all entries in queue to concatenate entries."
)
aggregated_updates = await self.aggregate_queue_updates()
await self.update_queue.put(aggregated_updates)
else:
verbose_proxy_logger.debug("Adding update to queue: %s", update)
await self.update_queue.put(update)
await self.aggregate_queue_updates()
async def aggregate_queue_updates(self):
"""Combine all updates in the queue into a single update."""
"""
Combine all updates in the queue into a single update.
This is used to reduce the size of the in-memory queue.
"""
updates: List[
Dict[str, DailyUserSpendTransaction]
] = await self.flush_all_updates_from_in_memory_queue()
aggregated_updates = self.get_aggregated_daily_spend_update_transactions(
updates
)
return aggregated_updates
await self.update_queue.put(aggregated_updates)
async def flush_and_get_aggregated_daily_spend_update_transactions(
self,
@ -113,5 +115,5 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
]
daily_transaction["failed_requests"] += payload["failed_requests"]
else:
aggregated_daily_spend_update_transactions[_key] = payload
aggregated_daily_spend_update_transactions[_key] = deepcopy(payload)
return aggregated_daily_spend_update_transactions

View file

@ -6,6 +6,11 @@ import sys
import pytest
from fastapi.testclient import TestClient
sys.path.insert(
0, os.path.abspath("../../..")
) # Adds the parent directory to the system path
import litellm
from litellm.constants import MAX_SIZE_IN_MEMORY_QUEUE
from litellm.proxy._types import (
DailyUserSpendTransaction,
Litellm_EntityType,
@ -16,10 +21,6 @@ from litellm.proxy.db.db_transaction_queue.daily_spend_update_queue import (
)
from litellm.proxy.db.db_transaction_queue.spend_update_queue import SpendUpdateQueue
sys.path.insert(
0, os.path.abspath("../../..")
) # Adds the parent directory to the system path
@pytest.fixture
def daily_spend_update_queue():
@ -254,7 +255,7 @@ async def test_flush_and_get_aggregated_daily_spend_update_transactions(
await daily_spend_update_queue.add_update({test_key: test_transaction1})
await daily_spend_update_queue.add_update({test_key: test_transaction2})
# Test full workflow
# Flush and get aggregated transactions
result = (
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
@ -262,3 +263,170 @@ async def test_flush_and_get_aggregated_daily_spend_update_transactions(
assert len(result) == 1
assert test_key in result
assert result[test_key] == expected_transaction
@pytest.mark.asyncio
async def test_queue_max_size_triggers_aggregation(
monkeypatch, daily_spend_update_queue
):
"""Test that reaching MAX_SIZE_IN_MEMORY_QUEUE triggers aggregation"""
# Override MAX_SIZE_IN_MEMORY_QUEUE for testing
litellm._turn_on_debug()
monkeypatch.setattr(daily_spend_update_queue, "MAX_SIZE_IN_MEMORY_QUEUE", 6)
test_key = "user1_2023-01-01_key123_gpt-4_openai"
test_transaction = {
"spend": 1.0,
"prompt_tokens": 100,
"completion_tokens": 50,
"api_requests": 1,
"successful_requests": 1,
"failed_requests": 0,
}
# Add 6 identical updates (exceeding the max size of 5)
for i in range(6):
await daily_spend_update_queue.add_update({test_key: test_transaction})
# Queue should have aggregated to a single item
assert daily_spend_update_queue.update_queue.qsize() == 1
# Verify the aggregated values
result = (
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
assert result[test_key]["spend"] == 6.0
assert result[test_key]["prompt_tokens"] == 600
assert result[test_key]["completion_tokens"] == 300
assert result[test_key]["api_requests"] == 6
assert result[test_key]["successful_requests"] == 6
assert result[test_key]["failed_requests"] == 0
@pytest.mark.asyncio
async def test_aggregate_queue_updates_accuracy(daily_spend_update_queue):
"""Test that queue aggregation correctly combines metrics by transaction key"""
# Add multiple updates for different transaction keys
test_key1 = "user1_2023-01-01_key123_gpt-4_openai"
test_transaction1 = {
"spend": 10.0,
"prompt_tokens": 100,
"completion_tokens": 50,
"api_requests": 1,
"successful_requests": 1,
"failed_requests": 0,
}
test_key2 = "user1_2023-01-01_key123_gpt-4_openai" # Same key
test_transaction2 = {
"spend": 5.0,
"prompt_tokens": 200,
"completion_tokens": 30,
"api_requests": 1,
"successful_requests": 0,
"failed_requests": 1,
}
test_key3 = "user2_2023-01-01_key456_gpt-3.5-turbo_openai" # Different key
test_transaction3 = {
"spend": 3.0,
"prompt_tokens": 150,
"completion_tokens": 25,
"api_requests": 1,
"successful_requests": 1,
"failed_requests": 0,
}
# Add updates directly to the queue
await daily_spend_update_queue.update_queue.put({test_key1: test_transaction1})
await daily_spend_update_queue.update_queue.put({test_key2: test_transaction2})
await daily_spend_update_queue.update_queue.put({test_key3: test_transaction3})
# Force aggregation
await daily_spend_update_queue.aggregate_queue_updates()
updates = await daily_spend_update_queue.flush_all_updates_from_in_memory_queue()
print("AGGREGATED UPDATES", json.dumps(updates, indent=4))
daily_spend_update_transactions = updates[0]
# Should have 2 keys after aggregation (test_key1/test_key2 combined, and test_key3)
assert len(daily_spend_update_transactions) == 2
# Check aggregated values for test_key1 (which is the same as test_key2)
assert daily_spend_update_transactions[test_key1]["spend"] == 15.0
assert daily_spend_update_transactions[test_key1]["prompt_tokens"] == 300
assert daily_spend_update_transactions[test_key1]["completion_tokens"] == 80
assert daily_spend_update_transactions[test_key1]["api_requests"] == 2
assert daily_spend_update_transactions[test_key1]["successful_requests"] == 1
assert daily_spend_update_transactions[test_key1]["failed_requests"] == 1
# Check values for test_key3 remain the same
assert daily_spend_update_transactions[test_key3]["spend"] == 3.0
assert daily_spend_update_transactions[test_key3]["prompt_tokens"] == 150
assert daily_spend_update_transactions[test_key3]["completion_tokens"] == 25
assert daily_spend_update_transactions[test_key3]["api_requests"] == 1
assert daily_spend_update_transactions[test_key3]["successful_requests"] == 1
assert daily_spend_update_transactions[test_key3]["failed_requests"] == 0
@pytest.mark.asyncio
async def test_queue_size_reduction_with_large_volume(
monkeypatch, daily_spend_update_queue
):
"""Test that queue size is actually reduced when dealing with many items"""
# Set a smaller MAX_SIZE for testing
monkeypatch.setattr(daily_spend_update_queue, "MAX_SIZE_IN_MEMORY_QUEUE", 10)
# Create transaction templates
user1_key = "user1_2023-01-01_key123_gpt-4_openai"
user1_transaction = {
"spend": 0.5,
"prompt_tokens": 100,
"completion_tokens": 50,
"api_requests": 1,
"successful_requests": 1,
"failed_requests": 0,
}
user2_key = "user2_2023-01-01_key456_gpt-3.5-turbo_openai"
user2_transaction = {
"spend": 1.0,
"prompt_tokens": 200,
"completion_tokens": 30,
"api_requests": 1,
"successful_requests": 1,
"failed_requests": 0,
}
# Add 30 updates (200 for user1, 10 for user2)
for i in range(200):
await daily_spend_update_queue.add_update({user1_key: user1_transaction})
# At this point, aggregation should have happened at least once
# Queue size should be much less than 10
assert daily_spend_update_queue.update_queue.qsize() <= 10
for i in range(100):
await daily_spend_update_queue.add_update({user2_key: user2_transaction})
# Queue should have at most 10 items after all this activity
assert daily_spend_update_queue.update_queue.qsize() <= 10
# Verify total costs are correct
result = (
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
print("RESULT", json.dumps(result, indent=4))
assert result[user1_key]["spend"] == 200 * 0.5 # 10.0
assert result[user1_key]["prompt_tokens"] == 200 * 100 # 2000
assert result[user1_key]["completion_tokens"] == 200 * 50 # 1000
assert result[user1_key]["api_requests"] == 200
assert result[user1_key]["successful_requests"] == 200
assert result[user1_key]["failed_requests"] == 0
assert result[user2_key]["spend"] == 100 * 1.0 # 10.0
assert result[user2_key]["prompt_tokens"] == 100 * 200 # 2000
assert result[user2_key]["completion_tokens"] == 100 * 30 # 300
assert result[user2_key]["api_requests"] == 100
assert result[user2_key]["successful_requests"] == 100
assert result[user2_key]["failed_requests"] == 0