diff --git a/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py b/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py index 52c5fabd4c..2900824e19 100644 --- a/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py +++ b/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py @@ -1,4 +1,5 @@ import asyncio +from copy import deepcopy from typing import Dict, List from litellm._logging import verbose_proxy_logger @@ -54,25 +55,26 @@ class DailySpendUpdateQueue(BaseUpdateQueue): async def add_update(self, update: Dict[str, DailyUserSpendTransaction]): """Enqueue an update.""" + verbose_proxy_logger.debug("Adding update to queue: %s", update) + await self.update_queue.put(update) if self.update_queue.qsize() >= self.MAX_SIZE_IN_MEMORY_QUEUE: verbose_proxy_logger.warning( "Spend update queue is full. Aggregating all entries in queue to concatenate entries." ) - aggregated_updates = await self.aggregate_queue_updates() - await self.update_queue.put(aggregated_updates) - else: - verbose_proxy_logger.debug("Adding update to queue: %s", update) - await self.update_queue.put(update) + await self.aggregate_queue_updates() async def aggregate_queue_updates(self): - """Combine all updates in the queue into a single update.""" + """ + Combine all updates in the queue into a single update. + This is used to reduce the size of the in-memory queue. + """ updates: List[ Dict[str, DailyUserSpendTransaction] ] = await self.flush_all_updates_from_in_memory_queue() aggregated_updates = self.get_aggregated_daily_spend_update_transactions( updates ) - return aggregated_updates + await self.update_queue.put(aggregated_updates) async def flush_and_get_aggregated_daily_spend_update_transactions( self, @@ -113,5 +115,5 @@ class DailySpendUpdateQueue(BaseUpdateQueue): ] daily_transaction["failed_requests"] += payload["failed_requests"] else: - aggregated_daily_spend_update_transactions[_key] = payload + aggregated_daily_spend_update_transactions[_key] = deepcopy(payload) return aggregated_daily_spend_update_transactions diff --git a/tests/litellm/proxy/db/db_transaction_queue/test_daily_spend_update_queue.py b/tests/litellm/proxy/db/db_transaction_queue/test_daily_spend_update_queue.py index 228dfe64b1..b92f2919ed 100644 --- a/tests/litellm/proxy/db/db_transaction_queue/test_daily_spend_update_queue.py +++ b/tests/litellm/proxy/db/db_transaction_queue/test_daily_spend_update_queue.py @@ -6,6 +6,11 @@ import sys import pytest from fastapi.testclient import TestClient +sys.path.insert( + 0, os.path.abspath("../../..") +) # Adds the parent directory to the system path +import litellm +from litellm.constants import MAX_SIZE_IN_MEMORY_QUEUE from litellm.proxy._types import ( DailyUserSpendTransaction, Litellm_EntityType, @@ -16,10 +21,6 @@ from litellm.proxy.db.db_transaction_queue.daily_spend_update_queue import ( ) from litellm.proxy.db.db_transaction_queue.spend_update_queue import SpendUpdateQueue -sys.path.insert( - 0, os.path.abspath("../../..") -) # Adds the parent directory to the system path - @pytest.fixture def daily_spend_update_queue(): @@ -254,7 +255,7 @@ async def test_flush_and_get_aggregated_daily_spend_update_transactions( await daily_spend_update_queue.add_update({test_key: test_transaction1}) await daily_spend_update_queue.add_update({test_key: test_transaction2}) - # Test full workflow + # Flush and get aggregated transactions result = ( await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() ) @@ -262,3 +263,170 @@ async def test_flush_and_get_aggregated_daily_spend_update_transactions( assert len(result) == 1 assert test_key in result assert result[test_key] == expected_transaction + + +@pytest.mark.asyncio +async def test_queue_max_size_triggers_aggregation( + monkeypatch, daily_spend_update_queue +): + """Test that reaching MAX_SIZE_IN_MEMORY_QUEUE triggers aggregation""" + # Override MAX_SIZE_IN_MEMORY_QUEUE for testing + litellm._turn_on_debug() + monkeypatch.setattr(daily_spend_update_queue, "MAX_SIZE_IN_MEMORY_QUEUE", 6) + + test_key = "user1_2023-01-01_key123_gpt-4_openai" + test_transaction = { + "spend": 1.0, + "prompt_tokens": 100, + "completion_tokens": 50, + "api_requests": 1, + "successful_requests": 1, + "failed_requests": 0, + } + + # Add 6 identical updates (exceeding the max size of 5) + for i in range(6): + await daily_spend_update_queue.add_update({test_key: test_transaction}) + + # Queue should have aggregated to a single item + assert daily_spend_update_queue.update_queue.qsize() == 1 + + # Verify the aggregated values + result = ( + await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() + ) + assert result[test_key]["spend"] == 6.0 + assert result[test_key]["prompt_tokens"] == 600 + assert result[test_key]["completion_tokens"] == 300 + assert result[test_key]["api_requests"] == 6 + assert result[test_key]["successful_requests"] == 6 + assert result[test_key]["failed_requests"] == 0 + + +@pytest.mark.asyncio +async def test_aggregate_queue_updates_accuracy(daily_spend_update_queue): + """Test that queue aggregation correctly combines metrics by transaction key""" + # Add multiple updates for different transaction keys + test_key1 = "user1_2023-01-01_key123_gpt-4_openai" + test_transaction1 = { + "spend": 10.0, + "prompt_tokens": 100, + "completion_tokens": 50, + "api_requests": 1, + "successful_requests": 1, + "failed_requests": 0, + } + + test_key2 = "user1_2023-01-01_key123_gpt-4_openai" # Same key + test_transaction2 = { + "spend": 5.0, + "prompt_tokens": 200, + "completion_tokens": 30, + "api_requests": 1, + "successful_requests": 0, + "failed_requests": 1, + } + + test_key3 = "user2_2023-01-01_key456_gpt-3.5-turbo_openai" # Different key + test_transaction3 = { + "spend": 3.0, + "prompt_tokens": 150, + "completion_tokens": 25, + "api_requests": 1, + "successful_requests": 1, + "failed_requests": 0, + } + + # Add updates directly to the queue + await daily_spend_update_queue.update_queue.put({test_key1: test_transaction1}) + await daily_spend_update_queue.update_queue.put({test_key2: test_transaction2}) + await daily_spend_update_queue.update_queue.put({test_key3: test_transaction3}) + + # Force aggregation + await daily_spend_update_queue.aggregate_queue_updates() + updates = await daily_spend_update_queue.flush_all_updates_from_in_memory_queue() + print("AGGREGATED UPDATES", json.dumps(updates, indent=4)) + daily_spend_update_transactions = updates[0] + + # Should have 2 keys after aggregation (test_key1/test_key2 combined, and test_key3) + assert len(daily_spend_update_transactions) == 2 + + # Check aggregated values for test_key1 (which is the same as test_key2) + assert daily_spend_update_transactions[test_key1]["spend"] == 15.0 + assert daily_spend_update_transactions[test_key1]["prompt_tokens"] == 300 + assert daily_spend_update_transactions[test_key1]["completion_tokens"] == 80 + assert daily_spend_update_transactions[test_key1]["api_requests"] == 2 + assert daily_spend_update_transactions[test_key1]["successful_requests"] == 1 + assert daily_spend_update_transactions[test_key1]["failed_requests"] == 1 + + # Check values for test_key3 remain the same + assert daily_spend_update_transactions[test_key3]["spend"] == 3.0 + assert daily_spend_update_transactions[test_key3]["prompt_tokens"] == 150 + assert daily_spend_update_transactions[test_key3]["completion_tokens"] == 25 + assert daily_spend_update_transactions[test_key3]["api_requests"] == 1 + assert daily_spend_update_transactions[test_key3]["successful_requests"] == 1 + assert daily_spend_update_transactions[test_key3]["failed_requests"] == 0 + + +@pytest.mark.asyncio +async def test_queue_size_reduction_with_large_volume( + monkeypatch, daily_spend_update_queue +): + """Test that queue size is actually reduced when dealing with many items""" + # Set a smaller MAX_SIZE for testing + monkeypatch.setattr(daily_spend_update_queue, "MAX_SIZE_IN_MEMORY_QUEUE", 10) + + # Create transaction templates + user1_key = "user1_2023-01-01_key123_gpt-4_openai" + user1_transaction = { + "spend": 0.5, + "prompt_tokens": 100, + "completion_tokens": 50, + "api_requests": 1, + "successful_requests": 1, + "failed_requests": 0, + } + + user2_key = "user2_2023-01-01_key456_gpt-3.5-turbo_openai" + user2_transaction = { + "spend": 1.0, + "prompt_tokens": 200, + "completion_tokens": 30, + "api_requests": 1, + "successful_requests": 1, + "failed_requests": 0, + } + + # Add 30 updates (200 for user1, 10 for user2) + for i in range(200): + await daily_spend_update_queue.add_update({user1_key: user1_transaction}) + + # At this point, aggregation should have happened at least once + # Queue size should be much less than 10 + assert daily_spend_update_queue.update_queue.qsize() <= 10 + + for i in range(100): + await daily_spend_update_queue.add_update({user2_key: user2_transaction}) + + # Queue should have at most 10 items after all this activity + assert daily_spend_update_queue.update_queue.qsize() <= 10 + + # Verify total costs are correct + result = ( + await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() + ) + print("RESULT", json.dumps(result, indent=4)) + + assert result[user1_key]["spend"] == 200 * 0.5 # 10.0 + assert result[user1_key]["prompt_tokens"] == 200 * 100 # 2000 + assert result[user1_key]["completion_tokens"] == 200 * 50 # 1000 + assert result[user1_key]["api_requests"] == 200 + assert result[user1_key]["successful_requests"] == 200 + assert result[user1_key]["failed_requests"] == 0 + + assert result[user2_key]["spend"] == 100 * 1.0 # 10.0 + assert result[user2_key]["prompt_tokens"] == 100 * 200 # 2000 + assert result[user2_key]["completion_tokens"] == 100 * 30 # 300 + assert result[user2_key]["api_requests"] == 100 + assert result[user2_key]["successful_requests"] == 100 + assert result[user2_key]["failed_requests"] == 0