diff --git a/litellm/proxy/db/db_spend_update_writer.py b/litellm/proxy/db/db_spend_update_writer.py index ecafdcc3df..37d709c0f1 100644 --- a/litellm/proxy/db/db_spend_update_writer.py +++ b/litellm/proxy/db/db_spend_update_writer.py @@ -13,6 +13,7 @@ from typing import Any, Optional, Union import litellm from litellm._logging import verbose_proxy_logger +from litellm.caching import DualCache from litellm.proxy._types import LiteLLM_UserTable, SpendLogsPayload from litellm.proxy.spend_tracking.spend_tracking_utils import get_logging_payload from litellm.proxy.utils import PrismaClient, ProxyUpdateSpend, hash_token @@ -27,7 +28,7 @@ class DBSpendUpdateWriter: """ @staticmethod - async def update_database( # noqa: PLR0915 + async def update_database( # LiteLLM management object fields token: Optional[str], user_id: Optional[str], @@ -59,155 +60,47 @@ class DBSpendUpdateWriter: else: hashed_token = token - ### UPDATE USER SPEND ### - async def _update_user_db(): - """ - - Update that user's row - - Update litellm-proxy-budget row (global proxy spend) - """ - ## if an end-user is passed in, do an upsert - we can't guarantee they already exist in db - existing_user_obj = await user_api_key_cache.async_get_cache( - key=user_id + asyncio.create_task( + DBSpendUpdateWriter._update_user_db( + response_cost=response_cost, + user_id=user_id, + prisma_client=prisma_client, + user_api_key_cache=user_api_key_cache, + litellm_proxy_budget_name=litellm_proxy_budget_name, + end_user_id=end_user_id, ) - if existing_user_obj is not None and isinstance( - existing_user_obj, dict - ): - existing_user_obj = LiteLLM_UserTable(**existing_user_obj) - try: - if prisma_client is not None: # update - user_ids = [user_id] - if ( - litellm.max_budget > 0 - ): # track global proxy budget, if user set max budget - user_ids.append(litellm_proxy_budget_name) - ### KEY CHANGE ### - for _id in user_ids: - if _id is not None: - prisma_client.user_list_transactons[_id] = ( - response_cost - + prisma_client.user_list_transactons.get(_id, 0) - ) - if end_user_id is not None: - prisma_client.end_user_list_transactons[end_user_id] = ( - response_cost - + prisma_client.end_user_list_transactons.get( - end_user_id, 0 - ) - ) - except Exception as e: - verbose_proxy_logger.info( - "\033[91m" - + f"Update User DB call failed to execute {str(e)}\n{traceback.format_exc()}" - ) - - ### UPDATE KEY SPEND ### - async def _update_key_db(): - try: - verbose_proxy_logger.debug( - f"adding spend to key db. Response cost: {response_cost}. Token: {hashed_token}." - ) - if hashed_token is None: - return - if prisma_client is not None: - prisma_client.key_list_transactons[hashed_token] = ( - response_cost - + prisma_client.key_list_transactons.get(hashed_token, 0) - ) - except Exception as e: - verbose_proxy_logger.exception( - f"Update Key DB Call failed to execute - {str(e)}" - ) - raise e - - ### UPDATE SPEND LOGS ### - async def _insert_spend_log_to_db(): - try: - if prisma_client: - payload = get_logging_payload( - kwargs=kwargs, - response_obj=completion_response, - start_time=start_time, - end_time=end_time, - ) - payload["spend"] = response_cost or 0.0 - DBSpendUpdateWriter._set_spend_logs_payload( - payload=payload, - spend_logs_url=os.getenv("SPEND_LOGS_URL"), - prisma_client=prisma_client, - ) - except Exception as e: - verbose_proxy_logger.debug( - f"Update Spend Logs DB failed to execute - {str(e)}\n{traceback.format_exc()}" - ) - raise e - - ### UPDATE TEAM SPEND ### - async def _update_team_db(): - try: - verbose_proxy_logger.debug( - f"adding spend to team db. Response cost: {response_cost}. team_id: {team_id}." - ) - if team_id is None: - verbose_proxy_logger.debug( - "track_cost_callback: team_id is None. Not tracking spend for team" - ) - return - if prisma_client is not None: - prisma_client.team_list_transactons[team_id] = ( - response_cost - + prisma_client.team_list_transactons.get(team_id, 0) - ) - - try: - # Track spend of the team member within this team - # key is "team_id::::user_id::" - team_member_key = f"team_id::{team_id}::user_id::{user_id}" - prisma_client.team_member_list_transactons[ - team_member_key - ] = ( - response_cost - + prisma_client.team_member_list_transactons.get( - team_member_key, 0 - ) - ) - except Exception: - pass - except Exception as e: - verbose_proxy_logger.info( - f"Update Team DB failed to execute - {str(e)}\n{traceback.format_exc()}" - ) - raise e - - ### UPDATE ORG SPEND ### - async def _update_org_db(): - try: - verbose_proxy_logger.debug( - "adding spend to org db. Response cost: {}. org_id: {}.".format( - response_cost, org_id - ) - ) - if org_id is None: - verbose_proxy_logger.debug( - "track_cost_callback: org_id is None. Not tracking spend for org" - ) - return - if prisma_client is not None: - prisma_client.org_list_transactons[org_id] = ( - response_cost - + prisma_client.org_list_transactons.get(org_id, 0) - ) - except Exception as e: - verbose_proxy_logger.info( - f"Update Org DB failed to execute - {str(e)}\n{traceback.format_exc()}" - ) - raise e - - asyncio.create_task(_update_user_db()) - asyncio.create_task(_update_key_db()) - asyncio.create_task(_update_team_db()) - asyncio.create_task(_update_org_db()) + ) + asyncio.create_task( + DBSpendUpdateWriter._update_key_db( + response_cost=response_cost, + hashed_token=hashed_token, + prisma_client=prisma_client, + ) + ) + asyncio.create_task( + DBSpendUpdateWriter._update_team_db( + response_cost=response_cost, + team_id=team_id, + user_id=user_id, + prisma_client=prisma_client, + ) + ) + asyncio.create_task( + DBSpendUpdateWriter._update_org_db( + response_cost=response_cost, + org_id=org_id, + prisma_client=prisma_client, + ) + ) if disable_spend_logs is False: - await _insert_spend_log_to_db() + await DBSpendUpdateWriter._insert_spend_log_to_db( + kwargs=kwargs, + completion_response=completion_response, + start_time=start_time, + end_time=end_time, + response_cost=response_cost, + prisma_client=prisma_client, + ) else: verbose_proxy_logger.info( "disable_spend_logs=True. Skipping writing spend logs to db. Other spend updates - Key/User/Team table will still occur." @@ -219,6 +112,166 @@ class DBSpendUpdateWriter: f"Error updating Prisma database: {traceback.format_exc()}" ) + @staticmethod + async def _update_key_db( + response_cost: Optional[float], + hashed_token: Optional[str], + prisma_client: Optional[PrismaClient], + ): + try: + verbose_proxy_logger.debug( + f"adding spend to key db. Response cost: {response_cost}. Token: {hashed_token}." + ) + if hashed_token is None: + return + if prisma_client is not None: + prisma_client.key_list_transactons[hashed_token] = ( + response_cost + + prisma_client.key_list_transactons.get(hashed_token, 0) + ) + except Exception as e: + verbose_proxy_logger.exception( + f"Update Key DB Call failed to execute - {str(e)}" + ) + raise e + + @staticmethod + async def _update_user_db( + response_cost: Optional[float], + user_id: Optional[str], + prisma_client: Optional[PrismaClient], + user_api_key_cache: DualCache, + litellm_proxy_budget_name: Optional[str], + end_user_id: Optional[str] = None, + ): + """ + - Update that user's row + - Update litellm-proxy-budget row (global proxy spend) + """ + ## if an end-user is passed in, do an upsert - we can't guarantee they already exist in db + existing_user_obj = await user_api_key_cache.async_get_cache(key=user_id) + if existing_user_obj is not None and isinstance(existing_user_obj, dict): + existing_user_obj = LiteLLM_UserTable(**existing_user_obj) + try: + if prisma_client is not None: # update + user_ids = [user_id] + if ( + litellm.max_budget > 0 + ): # track global proxy budget, if user set max budget + user_ids.append(litellm_proxy_budget_name) + ### KEY CHANGE ### + for _id in user_ids: + if _id is not None: + prisma_client.user_list_transactons[_id] = ( + response_cost + + prisma_client.user_list_transactons.get(_id, 0) + ) + if end_user_id is not None: + prisma_client.end_user_list_transactons[end_user_id] = ( + response_cost + + prisma_client.end_user_list_transactons.get(end_user_id, 0) + ) + except Exception as e: + verbose_proxy_logger.info( + "\033[91m" + + f"Update User DB call failed to execute {str(e)}\n{traceback.format_exc()}" + ) + + @staticmethod + async def _update_team_db( + response_cost: Optional[float], + team_id: Optional[str], + user_id: Optional[str], + prisma_client: Optional[PrismaClient], + ): + try: + verbose_proxy_logger.debug( + f"adding spend to team db. Response cost: {response_cost}. team_id: {team_id}." + ) + if team_id is None: + verbose_proxy_logger.debug( + "track_cost_callback: team_id is None. Not tracking spend for team" + ) + return + if prisma_client is not None: + prisma_client.team_list_transactons[team_id] = ( + response_cost + prisma_client.team_list_transactons.get(team_id, 0) + ) + + try: + # Track spend of the team member within this team + # key is "team_id::::user_id::" + team_member_key = f"team_id::{team_id}::user_id::{user_id}" + prisma_client.team_member_list_transactons[team_member_key] = ( + response_cost + + prisma_client.team_member_list_transactons.get( + team_member_key, 0 + ) + ) + except Exception: + pass + except Exception as e: + verbose_proxy_logger.info( + f"Update Team DB failed to execute - {str(e)}\n{traceback.format_exc()}" + ) + raise e + + @staticmethod + async def _update_org_db( + response_cost: Optional[float], + org_id: Optional[str], + prisma_client: Optional[PrismaClient], + ): + try: + verbose_proxy_logger.debug( + "adding spend to org db. Response cost: {}. org_id: {}.".format( + response_cost, org_id + ) + ) + if org_id is None: + verbose_proxy_logger.debug( + "track_cost_callback: org_id is None. Not tracking spend for org" + ) + return + if prisma_client is not None: + prisma_client.org_list_transactons[org_id] = ( + response_cost + prisma_client.org_list_transactons.get(org_id, 0) + ) + except Exception as e: + verbose_proxy_logger.info( + f"Update Org DB failed to execute - {str(e)}\n{traceback.format_exc()}" + ) + raise e + + @staticmethod + async def _insert_spend_log_to_db( + kwargs: Optional[dict], + completion_response: Optional[Union[litellm.ModelResponse, Any, Exception]], + start_time: Optional[datetime], + end_time: Optional[datetime], + response_cost: Optional[float], + prisma_client: Optional[PrismaClient], + ): + try: + if prisma_client: + payload = get_logging_payload( + kwargs=kwargs, + response_obj=completion_response, + start_time=start_time, + end_time=end_time, + ) + payload["spend"] = response_cost or 0.0 + DBSpendUpdateWriter._set_spend_logs_payload( + payload=payload, + spend_logs_url=os.getenv("SPEND_LOGS_URL"), + prisma_client=prisma_client, + ) + except Exception as e: + verbose_proxy_logger.debug( + f"Update Spend Logs DB failed to execute - {str(e)}\n{traceback.format_exc()}" + ) + raise e + @staticmethod def _set_spend_logs_payload( payload: Union[dict, SpendLogsPayload],