feat: update enduser spend and budget reset date based on budget duration

This commit is contained in:
Laurien Lummer 2025-02-10 12:59:36 +01:00
parent bcfa641b81
commit 0b6d941dc2
3 changed files with 129 additions and 4 deletions

View file

@ -8,7 +8,7 @@ import smtplib
import threading
import time
import traceback
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Union, overload
@ -482,7 +482,6 @@ class ProxyLogging:
try:
for callback in litellm.callbacks:
_callback = None
if isinstance(callback, str):
_callback = litellm.litellm_core_utils.litellm_logging.get_custom_logger_compatible_class(
@ -1290,6 +1289,8 @@ class PrismaClient:
"key",
"config",
"spend",
"enduser",
"budget",
"team",
"user_notification",
"combined_view",
@ -1304,6 +1305,7 @@ class PrismaClient:
] = None, # pagination, number of rows to getch when find_all==True
parent_otel_span: Optional[Span] = None,
proxy_logging_obj: Optional[ProxyLogging] = None,
budget_id_list: Optional[List[str]] = None,
):
args_passed_in = locals()
start_time = time.time()
@ -1482,6 +1484,21 @@ class PrismaClient:
order={"startTime": "desc"},
)
return response
elif table_name == "budget" and reset_at is not None:
if query_type == "find_all":
response = await self.db.litellm_budgettable.find_many(
where={ # type:ignore
"budget_reset_at": {"lt": reset_at}
}
)
return response
elif table_name == "enduser" and budget_id_list is not None:
if query_type == "find_all":
response = await self.db.litellm_endusertable.find_many(
where={"budget_id": {"in": budget_id_list}}
)
return response
elif table_name == "team":
if query_type == "find_unique":
response = await self.db.litellm_teamtable.find_unique(
@ -1799,7 +1816,9 @@ class PrismaClient:
user_id: Optional[str] = None,
team_id: Optional[str] = None,
query_type: Literal["update", "update_many"] = "update",
table_name: Optional[Literal["user", "key", "config", "spend", "team"]] = None,
table_name: Optional[
Literal["user", "key", "config", "spend", "team", "enduser", "budget"]
] = None,
update_key_values: Optional[dict] = None,
update_key_values_custom_query: Optional[dict] = None,
):
@ -1966,6 +1985,68 @@ class PrismaClient:
verbose_proxy_logger.info(
"\033[91m" + "DB User Table Batch update succeeded" + "\033[0m"
)
elif (
table_name is not None
and table_name == "enduser"
and query_type == "update_many"
and data_list is not None
and isinstance(data_list, list)
):
"""
Batch write update queries
"""
batcher = self.db.batch_()
for enduser in data_list:
try:
data_json = self.jsonify_object(
data=enduser.model_dump(exclude_none=True)
)
except Exception:
data_json = self.jsonify_object(data=enduser.dict())
batcher.litellm_endusertable.upsert(
where={"user_id": enduser.user_id}, # type: ignore
data={
"create": {**data_json}, # type: ignore
"update": {
**data_json # type: ignore
}, # just update end-user-specified values, if it already exists
},
)
await batcher.commit()
verbose_proxy_logger.info(
"\033[91m" + "DB End User Table Batch update succeeded" + "\033[0m"
)
elif (
table_name is not None
and table_name == "budget"
and query_type == "update_many"
and data_list is not None
and isinstance(data_list, list)
):
"""
Batch write update queries
"""
batcher = self.db.batch_()
for budget in data_list:
try:
data_json = self.jsonify_object(
data=budget.model_dump(exclude_none=True)
)
except Exception:
data_json = self.jsonify_object(data=budget.dict())
batcher.litellm_budgettable.upsert(
where={"budget_id": budget.budget_id}, # type: ignore
data={
"create": {**data_json}, # type: ignore
"update": {
**data_json # type: ignore
}, # just update end-user-specified values, if it already exists
},
)
await batcher.commit()
verbose_proxy_logger.info(
"\033[91m" + "DB Budget Table Batch update succeeded" + "\033[0m"
)
elif (
table_name is not None
and table_name == "team"
@ -2408,6 +2489,39 @@ async def reset_budget(prisma_client: PrismaClient):
query_type="update_many", data_list=users_to_reset, table_name="user"
)
## Reset End-User (Customer) Spend and corresponding Budget duration
now = datetime.now(timezone.utc)
budgets_to_reset = await prisma_client.get_data(
table_name="budget", query_type="find_all", reset_at=now
)
budget_id_list_to_reset_enduser = []
if budgets_to_reset is not None and len(budgets_to_reset) > 0:
for budget in budgets_to_reset:
budget_id_list_to_reset_enduser.append(budget.budget_id)
duration_s = duration_in_seconds(duration=budget.budget_duration)
budget.budget_reset_at = now + timedelta(seconds=duration_s)
await prisma_client.update_data(
query_type="update_many",
data_list=budgets_to_reset,
table_name="budget",
)
endusers_to_reset = await prisma_client.get_data(
table_name="enduser",
query_type="find_all",
budget_id_list=budget_id_list_to_reset_enduser,
)
if endusers_to_reset is not None and len(endusers_to_reset) > 0:
for enduser in endusers_to_reset:
enduser.spend = 0.0
await prisma_client.update_data(
query_type="update_many",
data_list=endusers_to_reset,
table_name="enduser",
)
## Reset Team Budget
now = datetime.utcnow()
teams_to_reset = await prisma_client.get_data(