From f588bff69ba74ad8c45bc490bbbbe6ac4cf0c7d4 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 18 Mar 2024 20:26:28 -0700 Subject: [PATCH] fix(proxy_server.py): fix spend log update --- litellm/proxy/proxy_server.py | 21 +++++++++++++++++---- proxy_server_config.yaml | 1 + tests/test_keys.py | 14 +++++++++++++- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 588846bd3..930ffb558 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -279,6 +279,7 @@ litellm_proxy_admin_name = "default_user_id" ui_access_mode: Literal["admin", "all"] = "all" proxy_budget_rescheduler_min_time = 597 proxy_budget_rescheduler_max_time = 605 +proxy_batch_write_at = 60 # in seconds litellm_master_key_hash = None ### INITIALIZE GLOBAL LOGGING OBJECT ### proxy_logging_obj = ProxyLogging(user_api_key_cache=user_api_key_cache) @@ -1138,7 +1139,6 @@ async def update_database( async def _insert_spend_log_to_db(): try: # Helper to generate payload to log - verbose_proxy_logger.debug("inserting spend log to db") payload = get_logging_payload( kwargs=kwargs, response_obj=completion_response, @@ -1150,7 +1150,7 @@ async def update_database( if prisma_client is not None: await prisma_client.insert_data(data=payload, table_name="spend") except Exception as e: - verbose_proxy_logger.info( + verbose_proxy_logger.debug( f"Update Spend Logs DB failed to execute - {str(e)}\n{traceback.format_exc()}" ) raise e @@ -1181,6 +1181,7 @@ async def update_database( asyncio.create_task(_update_key_db()) asyncio.create_task(_update_team_db()) # asyncio.create_task(_insert_spend_log_to_db()) + await _insert_spend_log_to_db() verbose_proxy_logger.debug("Runs spend update on all tables") except Exception as e: @@ -1499,7 +1500,7 @@ class ProxyConfig: """ Load config values into proxy global state """ - global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, user_custom_key_generate, use_background_health_checks, health_check_interval, use_queue, custom_db_client, proxy_budget_rescheduler_max_time, proxy_budget_rescheduler_min_time, ui_access_mode, litellm_master_key_hash + global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, user_custom_key_generate, use_background_health_checks, health_check_interval, use_queue, custom_db_client, proxy_budget_rescheduler_max_time, proxy_budget_rescheduler_min_time, ui_access_mode, litellm_master_key_hash, proxy_batch_write_at # Load existing config config = await self.get_config(config_file_path=config_file_path) @@ -1855,6 +1856,10 @@ class ProxyConfig: proxy_budget_rescheduler_max_time = general_settings.get( "proxy_budget_rescheduler_max_time", proxy_budget_rescheduler_max_time ) + ## BATCH WRITER ## + proxy_batch_write_at = general_settings.get( + "proxy_batch_write_at", proxy_batch_write_at + ) ### BACKGROUND HEALTH CHECKS ### # Enable background health checks use_background_health_checks = general_settings.get( @@ -2481,10 +2486,18 @@ async def startup_event(): interval = random.randint( proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time ) # random interval, so multiple workers avoid resetting budget at the same time + batch_writing_interval = random.randint( + proxy_batch_write_at - 3, proxy_batch_write_at + 3 + ) # random interval, so multiple workers avoid batch writing at the same time scheduler.add_job( reset_budget, "interval", seconds=interval, args=[prisma_client] ) - scheduler.add_job(update_spend, "interval", seconds=10, args=[prisma_client]) + scheduler.add_job( + update_spend, + "interval", + seconds=batch_writing_interval, + args=[prisma_client], + ) scheduler.start() diff --git a/proxy_server_config.yaml b/proxy_server_config.yaml index 34a61994b..d31218b8d 100644 --- a/proxy_server_config.yaml +++ b/proxy_server_config.yaml @@ -50,6 +50,7 @@ general_settings: master_key: sk-1234 # [OPTIONAL] Only use this if you to require all calls to contain this key (Authorization: Bearer sk-1234) proxy_budget_rescheduler_min_time: 60 proxy_budget_rescheduler_max_time: 64 + proxy_batch_write_at: 1 # database_url: "postgresql://:@:/" # [OPTIONAL] use for token-based auth to proxy # environment_variables: diff --git a/tests/test_keys.py b/tests/test_keys.py index cba960aca..0419e9f8a 100644 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -329,6 +329,16 @@ async def test_key_info_spend_values(): - make completion call - assert cost is expected value """ + + async def retry_request(func, *args, _max_attempts=5, **kwargs): + for attempt in range(_max_attempts): + try: + return await func(*args, **kwargs) + except aiohttp.client_exceptions.ClientOSError as e: + if attempt + 1 == _max_attempts: + raise # re-raise the last ClientOSError if all attempts failed + print(f"Attempt {attempt+1} failed, retrying...") + async with aiohttp.ClientSession() as session: ## Test Spend Update ## # completion @@ -336,7 +346,9 @@ async def test_key_info_spend_values(): key = key_gen["key"] response = await chat_completion(session=session, key=key) await asyncio.sleep(5) - spend_logs = await get_spend_logs(session=session, request_id=response["id"]) + spend_logs = await retry_request( + get_spend_logs, session=session, request_id=response["id"] + ) print(f"spend_logs: {spend_logs}") completion_tokens = spend_logs[0]["completion_tokens"] prompt_tokens = spend_logs[0]["prompt_tokens"]