fix(proxy_server.py): fix spend log update

This commit is contained in:
Krrish Dholakia 2024-03-18 20:26:28 -07:00
parent 8fefe625d9
commit f588bff69b
3 changed files with 31 additions and 5 deletions

View file

@ -279,6 +279,7 @@ litellm_proxy_admin_name = "default_user_id"
ui_access_mode: Literal["admin", "all"] = "all" ui_access_mode: Literal["admin", "all"] = "all"
proxy_budget_rescheduler_min_time = 597 proxy_budget_rescheduler_min_time = 597
proxy_budget_rescheduler_max_time = 605 proxy_budget_rescheduler_max_time = 605
proxy_batch_write_at = 60 # in seconds
litellm_master_key_hash = None litellm_master_key_hash = None
### INITIALIZE GLOBAL LOGGING OBJECT ### ### INITIALIZE GLOBAL LOGGING OBJECT ###
proxy_logging_obj = ProxyLogging(user_api_key_cache=user_api_key_cache) proxy_logging_obj = ProxyLogging(user_api_key_cache=user_api_key_cache)
@ -1138,7 +1139,6 @@ async def update_database(
async def _insert_spend_log_to_db(): async def _insert_spend_log_to_db():
try: try:
# Helper to generate payload to log # Helper to generate payload to log
verbose_proxy_logger.debug("inserting spend log to db")
payload = get_logging_payload( payload = get_logging_payload(
kwargs=kwargs, kwargs=kwargs,
response_obj=completion_response, response_obj=completion_response,
@ -1150,7 +1150,7 @@ async def update_database(
if prisma_client is not None: if prisma_client is not None:
await prisma_client.insert_data(data=payload, table_name="spend") await prisma_client.insert_data(data=payload, table_name="spend")
except Exception as e: except Exception as e:
verbose_proxy_logger.info( verbose_proxy_logger.debug(
f"Update Spend Logs DB failed to execute - {str(e)}\n{traceback.format_exc()}" f"Update Spend Logs DB failed to execute - {str(e)}\n{traceback.format_exc()}"
) )
raise e raise e
@ -1181,6 +1181,7 @@ async def update_database(
asyncio.create_task(_update_key_db()) asyncio.create_task(_update_key_db())
asyncio.create_task(_update_team_db()) asyncio.create_task(_update_team_db())
# asyncio.create_task(_insert_spend_log_to_db()) # asyncio.create_task(_insert_spend_log_to_db())
await _insert_spend_log_to_db()
verbose_proxy_logger.debug("Runs spend update on all tables") verbose_proxy_logger.debug("Runs spend update on all tables")
except Exception as e: except Exception as e:
@ -1499,7 +1500,7 @@ class ProxyConfig:
""" """
Load config values into proxy global state Load config values into proxy global state
""" """
global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, user_custom_key_generate, use_background_health_checks, health_check_interval, use_queue, custom_db_client, proxy_budget_rescheduler_max_time, proxy_budget_rescheduler_min_time, ui_access_mode, litellm_master_key_hash global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, user_custom_key_generate, use_background_health_checks, health_check_interval, use_queue, custom_db_client, proxy_budget_rescheduler_max_time, proxy_budget_rescheduler_min_time, ui_access_mode, litellm_master_key_hash, proxy_batch_write_at
# Load existing config # Load existing config
config = await self.get_config(config_file_path=config_file_path) config = await self.get_config(config_file_path=config_file_path)
@ -1855,6 +1856,10 @@ class ProxyConfig:
proxy_budget_rescheduler_max_time = general_settings.get( proxy_budget_rescheduler_max_time = general_settings.get(
"proxy_budget_rescheduler_max_time", proxy_budget_rescheduler_max_time "proxy_budget_rescheduler_max_time", proxy_budget_rescheduler_max_time
) )
## BATCH WRITER ##
proxy_batch_write_at = general_settings.get(
"proxy_batch_write_at", proxy_batch_write_at
)
### BACKGROUND HEALTH CHECKS ### ### BACKGROUND HEALTH CHECKS ###
# Enable background health checks # Enable background health checks
use_background_health_checks = general_settings.get( use_background_health_checks = general_settings.get(
@ -2481,10 +2486,18 @@ async def startup_event():
interval = random.randint( interval = random.randint(
proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time
) # random interval, so multiple workers avoid resetting budget at the same time ) # random interval, so multiple workers avoid resetting budget at the same time
batch_writing_interval = random.randint(
proxy_batch_write_at - 3, proxy_batch_write_at + 3
) # random interval, so multiple workers avoid batch writing at the same time
scheduler.add_job( scheduler.add_job(
reset_budget, "interval", seconds=interval, args=[prisma_client] reset_budget, "interval", seconds=interval, args=[prisma_client]
) )
scheduler.add_job(update_spend, "interval", seconds=10, args=[prisma_client]) scheduler.add_job(
update_spend,
"interval",
seconds=batch_writing_interval,
args=[prisma_client],
)
scheduler.start() scheduler.start()

View file

@ -50,6 +50,7 @@ general_settings:
master_key: sk-1234 # [OPTIONAL] Only use this if you to require all calls to contain this key (Authorization: Bearer sk-1234) master_key: sk-1234 # [OPTIONAL] Only use this if you to require all calls to contain this key (Authorization: Bearer sk-1234)
proxy_budget_rescheduler_min_time: 60 proxy_budget_rescheduler_min_time: 60
proxy_budget_rescheduler_max_time: 64 proxy_budget_rescheduler_max_time: 64
proxy_batch_write_at: 1
# database_url: "postgresql://<user>:<password>@<host>:<port>/<dbname>" # [OPTIONAL] use for token-based auth to proxy # database_url: "postgresql://<user>:<password>@<host>:<port>/<dbname>" # [OPTIONAL] use for token-based auth to proxy
# environment_variables: # environment_variables:

View file

@ -329,6 +329,16 @@ async def test_key_info_spend_values():
- make completion call - make completion call
- assert cost is expected value - assert cost is expected value
""" """
async def retry_request(func, *args, _max_attempts=5, **kwargs):
for attempt in range(_max_attempts):
try:
return await func(*args, **kwargs)
except aiohttp.client_exceptions.ClientOSError as e:
if attempt + 1 == _max_attempts:
raise # re-raise the last ClientOSError if all attempts failed
print(f"Attempt {attempt+1} failed, retrying...")
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
## Test Spend Update ## ## Test Spend Update ##
# completion # completion
@ -336,7 +346,9 @@ async def test_key_info_spend_values():
key = key_gen["key"] key = key_gen["key"]
response = await chat_completion(session=session, key=key) response = await chat_completion(session=session, key=key)
await asyncio.sleep(5) await asyncio.sleep(5)
spend_logs = await get_spend_logs(session=session, request_id=response["id"]) spend_logs = await retry_request(
get_spend_logs, session=session, request_id=response["id"]
)
print(f"spend_logs: {spend_logs}") print(f"spend_logs: {spend_logs}")
completion_tokens = spend_logs[0]["completion_tokens"] completion_tokens = spend_logs[0]["completion_tokens"]
prompt_tokens = spend_logs[0]["prompt_tokens"] prompt_tokens = spend_logs[0]["prompt_tokens"]