basic structure for commit update txs to redis

This commit is contained in:
Ishaan Jaff 2025-03-27 18:21:33 -07:00
parent 894306141e
commit ad72078167
2 changed files with 69 additions and 3 deletions

View file

@ -29,6 +29,7 @@ from litellm.proxy.utils import (
_raise_failed_update_spend_exception, _raise_failed_update_spend_exception,
hash_token, hash_token,
) )
from litellm.secret_managers.main import str_to_bool
class DBSpendUpdateWriter: class DBSpendUpdateWriter:
@ -360,7 +361,7 @@ class DBSpendUpdateWriter:
return prisma_client return prisma_client
@staticmethod @staticmethod
async def commit_update_transactions_to_db( # noqa: PLR0915 async def db_spend_transaction_handler(
prisma_client: PrismaClient, prisma_client: PrismaClient,
n_retry_times: int, n_retry_times: int,
proxy_logging_obj: ProxyLogging, proxy_logging_obj: ProxyLogging,
@ -368,12 +369,77 @@ class DBSpendUpdateWriter:
""" """
Handles commiting update spend transactions to db Handles commiting update spend transactions to db
UPDATES can lead to deadlocks, hence we handle them separately `UPDATES` can lead to deadlocks, hence we handle them separately
Args: Args:
prisma_client: PrismaClient object prisma_client: PrismaClient object
n_retry_times: int, number of retry times n_retry_times: int, number of retry times
proxy_logging_obj: ProxyLogging object proxy_logging_obj: ProxyLogging object
How this works:
- Check `general_settings.use_redis_transaction_buffer`
- If enabled, write in-memory transactions to Redis
- Check if this Pod should read from the DB
else:
- Regular flow of this method
"""
if DBSpendUpdateWriter._should_commit_spend_updates_to_redis():
pass
if DBSpendUpdateWriter._should_commit_spend_updates_to_db():
await DBSpendUpdateWriter._commit_spend_updates_to_db(
prisma_client=prisma_client,
n_retry_times=n_retry_times,
proxy_logging_obj=proxy_logging_obj,
)
pass
@staticmethod
def _should_commit_spend_updates_to_redis() -> bool:
"""
Checks if the Pod should commit spend updates to Redis
This setting enables buffering database transactions in Redis
to improve reliability and reduce database contention
"""
from litellm.proxy.proxy_server import general_settings
_use_redis_transaction_buffer: Optional[Union[bool, str]] = (
general_settings.get("use_redis_transaction_buffer", False)
)
if isinstance(_use_redis_transaction_buffer, str):
_use_redis_transaction_buffer = str_to_bool(_use_redis_transaction_buffer)
if _use_redis_transaction_buffer is None:
return False
return _use_redis_transaction_buffer
@staticmethod
async def _commit_spend_updates_to_redis(
prisma_client: PrismaClient,
):
"""
Commits all the spend updates to Redis for each entity type
once committed, the transactions are cleared from the in-memory variables
"""
pass
@staticmethod
def _should_commit_spend_updates_to_db() -> bool:
"""
Checks if the Pod should commit spend updates to the Database
"""
return False
@staticmethod
async def _commit_spend_updates_to_db( # noqa: PLR0915
prisma_client: PrismaClient,
n_retry_times: int,
proxy_logging_obj: ProxyLogging,
):
"""
Commits all the spend updates to the Database
""" """
### UPDATE USER TABLE ### ### UPDATE USER TABLE ###
if len(prisma_client.user_list_transactons.keys()) > 0: if len(prisma_client.user_list_transactons.keys()) > 0:

View file

@ -2675,7 +2675,7 @@ async def update_spend( # noqa: PLR0915
spend_logs: list, spend_logs: list,
""" """
n_retry_times = 3 n_retry_times = 3
await DBSpendUpdateWriter.commit_update_transactions_to_db( await DBSpendUpdateWriter._commit_spend_updates_to_db(
prisma_client=prisma_client, prisma_client=prisma_client,
n_retry_times=n_retry_times, n_retry_times=n_retry_times,
proxy_logging_obj=proxy_logging_obj, proxy_logging_obj=proxy_logging_obj,