Merge pull request #2722 from BerriAI/litellm_db_perf_improvement

feat(proxy/utils.py): enable updating db in a separate server
This commit is contained in:
Krish Dholakia 2024-03-28 14:56:14 -07:00 committed by GitHub
commit 934a9ac2b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 868 additions and 62 deletions

View file

@ -97,7 +97,6 @@ from litellm.proxy.utils import (
_is_projected_spend_over_limit,
_get_projected_spend_over_limit,
update_spend,
monitor_spend_list,
)
from litellm.proxy.secret_managers.google_kms import load_google_kms
from litellm.proxy.secret_managers.aws_secret_manager import load_aws_secret_manager
@ -118,6 +117,7 @@ from litellm.proxy.auth.auth_checks import (
allowed_routes_check,
get_actual_routes,
)
from litellm.llms.custom_httpx.httpx_handler import HTTPHandler
try:
from litellm._version import version
@ -304,6 +304,8 @@ proxy_logging_obj = ProxyLogging(user_api_key_cache=user_api_key_cache)
async_result = None
celery_app_conn = None
celery_fn = None # Redis Queue for handling requests
### DB WRITER ###
db_writer_client: Optional[HTTPHandler] = None
### logger ###
@ -1249,10 +1251,11 @@ async def update_database(
user_ids.append(litellm_proxy_budget_name)
### KEY CHANGE ###
for _id in user_ids:
prisma_client.user_list_transactons[_id] = (
response_cost
+ prisma_client.user_list_transactons.get(_id, 0)
)
if _id is not None:
prisma_client.user_list_transactons[_id] = (
response_cost
+ prisma_client.user_list_transactons.get(_id, 0)
)
if end_user_id is not None:
prisma_client.end_user_list_transactons[end_user_id] = (
response_cost
@ -1380,7 +1383,16 @@ async def update_database(
)
payload["spend"] = response_cost
if prisma_client is not None:
if (
os.getenv("SPEND_LOGS_URL", None) is not None
and prisma_client is not None
):
if isinstance(payload["startTime"], datetime):
payload["startTime"] = payload["startTime"].isoformat()
if isinstance(payload["endTime"], datetime):
payload["endTime"] = payload["endTime"].isoformat()
prisma_client.spend_log_transactons.append(payload)
elif prisma_client is not None:
await prisma_client.insert_data(data=payload, table_name="spend")
except Exception as e:
verbose_proxy_logger.debug(
@ -2707,7 +2719,7 @@ def on_backoff(details):
@router.on_event("startup")
async def startup_event():
global prisma_client, master_key, use_background_health_checks, llm_router, llm_model_list, general_settings, proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time, litellm_proxy_admin_name
global prisma_client, master_key, use_background_health_checks, llm_router, llm_model_list, general_settings, proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time, litellm_proxy_admin_name, db_writer_client
import json
### LOAD MASTER KEY ###
@ -2740,6 +2752,8 @@ async def startup_event():
## COST TRACKING ##
cost_tracking()
db_writer_client = HTTPHandler()
proxy_logging_obj._init_litellm_callbacks() # INITIALIZE LITELLM CALLBACKS ON SERVER STARTUP <- do this to catch any logging errors on startup, not when calls are being made
## JWT AUTH ##
@ -2850,7 +2864,7 @@ async def startup_event():
update_spend,
"interval",
seconds=batch_writing_interval,
args=[prisma_client],
args=[prisma_client, db_writer_client],
)
scheduler.start()
@ -8060,6 +8074,8 @@ async def shutdown_event():
await jwt_handler.close()
if db_writer_client is not None:
await db_writer_client.close()
## RESET CUSTOM VARIABLES ##
cleanup_router_config_variables()