handle commit updates to the DB

This commit is contained in:
Ishaan Jaff 2025-03-27 22:55:36 -07:00
parent 0cb6cb6c66
commit 666cd8d4e3
2 changed files with 42 additions and 29 deletions

View file

@ -47,14 +47,9 @@ class DBSpendUpdateWriter:
self, self,
redis_cache: Optional[RedisCache] = None, redis_cache: Optional[RedisCache] = None,
): ):
from litellm.proxy.proxy_server import prisma_client
self.redis_cache = redis_cache self.redis_cache = redis_cache
self.redis_update_buffer = RedisUpdateBuffer(redis_cache=self.redis_cache) self.redis_update_buffer = RedisUpdateBuffer(redis_cache=self.redis_cache)
self.pod_leader_manager = PodLockManager( self.pod_leader_manager = PodLockManager(cronjob_id=DB_SPEND_UPDATE_JOB_NAME)
cronjob_id=DB_SPEND_UPDATE_JOB_NAME,
prisma_client=prisma_client,
)
@staticmethod @staticmethod
async def update_database( async def update_database(
@ -411,6 +406,9 @@ class DBSpendUpdateWriter:
# Only commit from redis to db if this pod is the leader # Only commit from redis to db if this pod is the leader
if await self.pod_leader_manager.acquire_lock(): if await self.pod_leader_manager.acquire_lock():
verbose_proxy_logger.debug("acquired lock for spend updates")
try:
db_spend_update_transactions = ( db_spend_update_transactions = (
await self.redis_update_buffer.get_all_update_transactions_from_redis() await self.redis_update_buffer.get_all_update_transactions_from_redis()
) )
@ -421,6 +419,9 @@ class DBSpendUpdateWriter:
proxy_logging_obj=proxy_logging_obj, proxy_logging_obj=proxy_logging_obj,
db_spend_update_transactions=db_spend_update_transactions, db_spend_update_transactions=db_spend_update_transactions,
) )
except Exception as e:
verbose_proxy_logger.error(f"Error committing spend updates: {e}")
finally:
await self.pod_leader_manager.release_lock() await self.pod_leader_manager.release_lock()
else: else:
db_spend_update_transactions = DBSpendUpdateTransactions( db_spend_update_transactions = DBSpendUpdateTransactions(
@ -456,7 +457,10 @@ class DBSpendUpdateWriter:
### UPDATE USER TABLE ### ### UPDATE USER TABLE ###
user_list_transactions = db_spend_update_transactions["user_list_transactions"] user_list_transactions = db_spend_update_transactions["user_list_transactions"]
if len(user_list_transactions.keys()) > 0: if (
user_list_transactions is not None
and len(user_list_transactions.keys()) > 0
):
for i in range(n_retry_times + 1): for i in range(n_retry_times + 1):
start_time = time.time() start_time = time.time()
try: try:
@ -501,7 +505,10 @@ class DBSpendUpdateWriter:
end_user_list_transactions = db_spend_update_transactions[ end_user_list_transactions = db_spend_update_transactions[
"end_user_list_transactions" "end_user_list_transactions"
] ]
if len(end_user_list_transactions.keys()) > 0: if (
end_user_list_transactions is not None
and len(end_user_list_transactions.keys()) > 0
):
await ProxyUpdateSpend.update_end_user_spend( await ProxyUpdateSpend.update_end_user_spend(
n_retry_times=n_retry_times, n_retry_times=n_retry_times,
prisma_client=prisma_client, prisma_client=prisma_client,
@ -510,9 +517,9 @@ class DBSpendUpdateWriter:
### UPDATE KEY TABLE ### ### UPDATE KEY TABLE ###
key_list_transactions = db_spend_update_transactions["key_list_transactions"] key_list_transactions = db_spend_update_transactions["key_list_transactions"]
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
"KEY Spend transactions: {}".format(len(key_list_transactions.keys())) "KEY Spend transactions: {}".format(key_list_transactions)
) )
if len(key_list_transactions.keys()) > 0: if key_list_transactions is not None and len(key_list_transactions.keys()) > 0:
for i in range(n_retry_times + 1): for i in range(n_retry_times + 1):
start_time = time.time() start_time = time.time()
try: try:
@ -555,7 +562,10 @@ class DBSpendUpdateWriter:
) )
) )
team_list_transactions = db_spend_update_transactions["team_list_transactions"] team_list_transactions = db_spend_update_transactions["team_list_transactions"]
if len(team_list_transactions.keys()) > 0: if (
team_list_transactions is not None
and len(team_list_transactions.keys()) > 0
):
for i in range(n_retry_times + 1): for i in range(n_retry_times + 1):
start_time = time.time() start_time = time.time()
try: try:
@ -600,7 +610,10 @@ class DBSpendUpdateWriter:
team_member_list_transactions = db_spend_update_transactions[ team_member_list_transactions = db_spend_update_transactions[
"team_member_list_transactions" "team_member_list_transactions"
] ]
if len(team_member_list_transactions.keys()) > 0: if (
team_member_list_transactions is not None
and len(team_member_list_transactions.keys()) > 0
):
for i in range(n_retry_times + 1): for i in range(n_retry_times + 1):
start_time = time.time() start_time = time.time()
try: try:
@ -642,7 +655,7 @@ class DBSpendUpdateWriter:
### UPDATE ORG TABLE ### ### UPDATE ORG TABLE ###
org_list_transactions = db_spend_update_transactions["org_list_transactions"] org_list_transactions = db_spend_update_transactions["org_list_transactions"]
if len(org_list_transactions.keys()) > 0: if org_list_transactions is not None and len(org_list_transactions.keys()) > 0:
for i in range(n_retry_times + 1): for i in range(n_retry_times + 1):
start_time = time.time() start_time = time.time()
try: try:

View file

@ -17,12 +17,12 @@ else:
class DBSpendUpdateTransactions(TypedDict): class DBSpendUpdateTransactions(TypedDict):
user_list_transactions: Dict[str, float] user_list_transactions: Optional[Dict[str, float]]
end_user_list_transactions: Dict[str, float] end_user_list_transactions: Optional[Dict[str, float]]
key_list_transactions: Dict[str, float] key_list_transactions: Optional[Dict[str, float]]
team_list_transactions: Dict[str, float] team_list_transactions: Optional[Dict[str, float]]
team_member_list_transactions: Dict[str, float] team_member_list_transactions: Optional[Dict[str, float]]
org_list_transactions: Dict[str, float] org_list_transactions: Optional[Dict[str, float]]
class RedisUpdateBuffer: class RedisUpdateBuffer: