use asyncio lock for updating PrismaClient txs

This commit is contained in:
Ishaan Jaff 2025-03-28 19:51:15 -07:00
parent 19f35ada82
commit 96812a7da0
2 changed files with 32 additions and 23 deletions

View file

@ -80,30 +80,35 @@ class RedisUpdateBuffer:
"redis_cache is None, skipping store_in_memory_spend_updates_in_redis" "redis_cache is None, skipping store_in_memory_spend_updates_in_redis"
) )
return return
db_spend_update_transactions: DBSpendUpdateTransactions = ( async with prisma_client.in_memory_transaction_lock:
DBSpendUpdateTransactions( db_spend_update_transactions: DBSpendUpdateTransactions = (
user_list_transactions=prisma_client.user_list_transactions, DBSpendUpdateTransactions(
end_user_list_transactions=prisma_client.end_user_list_transactions, user_list_transactions=prisma_client.user_list_transactions,
key_list_transactions=prisma_client.key_list_transactions, end_user_list_transactions=prisma_client.end_user_list_transactions,
team_list_transactions=prisma_client.team_list_transactions, key_list_transactions=prisma_client.key_list_transactions,
team_member_list_transactions=prisma_client.team_member_list_transactions, team_list_transactions=prisma_client.team_list_transactions,
org_list_transactions=prisma_client.org_list_transactions, team_member_list_transactions=prisma_client.team_member_list_transactions,
org_list_transactions=prisma_client.org_list_transactions,
)
) )
)
# only store in redis if there are any updates to commit # only store in redis if there are any updates to commit
if ( if (
self._number_of_transactions_to_store_in_redis(db_spend_update_transactions) self._number_of_transactions_to_store_in_redis(
== 0 db_spend_update_transactions
): )
return == 0
):
return
list_of_transactions = [safe_dumps(db_spend_update_transactions)] list_of_transactions = [safe_dumps(db_spend_update_transactions)]
await self.redis_cache.async_rpush( await self.redis_cache.async_rpush(
key=REDIS_UPDATE_BUFFER_KEY, key=REDIS_UPDATE_BUFFER_KEY,
values=list_of_transactions, values=list_of_transactions,
) )
self._clear_all_in_memory_spend_updates(prisma_client)
# clear the in-memory spend updates
RedisUpdateBuffer._clear_all_in_memory_spend_updates(prisma_client)
@staticmethod @staticmethod
def _number_of_transactions_to_store_in_redis( def _number_of_transactions_to_store_in_redis(
@ -201,12 +206,15 @@ class RedisUpdateBuffer:
@staticmethod @staticmethod
def _parse_list_of_transactions( def _parse_list_of_transactions(
list_of_transactions: List[str], list_of_transactions: Union[Any, List[Any]],
) -> List[DBSpendUpdateTransactions]: ) -> List[DBSpendUpdateTransactions]:
""" """
Parses the list of transactions from Redis Parses the list of transactions from Redis
""" """
return [json.loads(transaction) for transaction in list_of_transactions] if isinstance(list_of_transactions, list):
return [json.loads(transaction) for transaction in list_of_transactions]
else:
return [json.loads(list_of_transactions)]
@staticmethod @staticmethod
def _combine_list_of_transactions( def _combine_list_of_transactions(

View file

@ -1121,6 +1121,7 @@ class PrismaClient:
self.iam_token_db_auth: Optional[bool] = str_to_bool( self.iam_token_db_auth: Optional[bool] = str_to_bool(
os.getenv("IAM_TOKEN_DB_AUTH") os.getenv("IAM_TOKEN_DB_AUTH")
) )
self.in_memory_transaction_lock = asyncio.Lock()
verbose_proxy_logger.debug("Creating Prisma Client..") verbose_proxy_logger.debug("Creating Prisma Client..")
try: try:
from prisma import Prisma # type: ignore from prisma import Prisma # type: ignore