mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
refactor daily spend updates to use new Queue DS
This commit is contained in:
parent
655ce2e745
commit
4c14550721
5 changed files with 365 additions and 237 deletions
|
@ -19,6 +19,7 @@ DEFAULT_IMAGE_HEIGHT = 300
|
||||||
MAX_SIZE_PER_ITEM_IN_MEMORY_CACHE_IN_KB = 1024 # 1MB = 1024KB
|
MAX_SIZE_PER_ITEM_IN_MEMORY_CACHE_IN_KB = 1024 # 1MB = 1024KB
|
||||||
SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD = 1000 # Minimum number of requests to consider "reasonable traffic". Used for single-deployment cooldown logic.
|
SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD = 1000 # Minimum number of requests to consider "reasonable traffic". Used for single-deployment cooldown logic.
|
||||||
REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
|
REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
|
||||||
|
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
|
||||||
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
|
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
|
||||||
#### RELIABILITY ####
|
#### RELIABILITY ####
|
||||||
REPEATED_STREAMING_CHUNK_LIMIT = 100 # catch if model starts looping the same chunk while streaming. Uses high default to prevent false positives.
|
REPEATED_STREAMING_CHUNK_LIMIT = 100 # catch if model starts looping the same chunk while streaming. Uses high default to prevent false positives.
|
||||||
|
|
|
@ -10,7 +10,7 @@ import os
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import TYPE_CHECKING, Any, Optional, Union
|
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
|
||||||
|
|
||||||
import litellm
|
import litellm
|
||||||
from litellm._logging import verbose_proxy_logger
|
from litellm._logging import verbose_proxy_logger
|
||||||
|
@ -18,6 +18,7 @@ from litellm.caching import DualCache, RedisCache
|
||||||
from litellm.constants import DB_SPEND_UPDATE_JOB_NAME
|
from litellm.constants import DB_SPEND_UPDATE_JOB_NAME
|
||||||
from litellm.proxy._types import (
|
from litellm.proxy._types import (
|
||||||
DB_CONNECTION_ERROR_TYPES,
|
DB_CONNECTION_ERROR_TYPES,
|
||||||
|
DailyUserSpendTransaction,
|
||||||
DBSpendUpdateTransactions,
|
DBSpendUpdateTransactions,
|
||||||
Litellm_EntityType,
|
Litellm_EntityType,
|
||||||
LiteLLM_UserTable,
|
LiteLLM_UserTable,
|
||||||
|
@ -26,7 +27,7 @@ from litellm.proxy._types import (
|
||||||
)
|
)
|
||||||
from litellm.proxy.db.pod_lock_manager import PodLockManager
|
from litellm.proxy.db.pod_lock_manager import PodLockManager
|
||||||
from litellm.proxy.db.redis_update_buffer import RedisUpdateBuffer
|
from litellm.proxy.db.redis_update_buffer import RedisUpdateBuffer
|
||||||
from litellm.proxy.db.spend_update_queue import SpendUpdateQueue
|
from litellm.proxy.db.spend_update_queue import DailySpendUpdateQueue, SpendUpdateQueue
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from litellm.proxy.utils import PrismaClient, ProxyLogging
|
from litellm.proxy.utils import PrismaClient, ProxyLogging
|
||||||
|
@ -51,6 +52,7 @@ class DBSpendUpdateWriter:
|
||||||
self.redis_update_buffer = RedisUpdateBuffer(redis_cache=self.redis_cache)
|
self.redis_update_buffer = RedisUpdateBuffer(redis_cache=self.redis_cache)
|
||||||
self.pod_lock_manager = PodLockManager(cronjob_id=DB_SPEND_UPDATE_JOB_NAME)
|
self.pod_lock_manager = PodLockManager(cronjob_id=DB_SPEND_UPDATE_JOB_NAME)
|
||||||
self.spend_update_queue = SpendUpdateQueue()
|
self.spend_update_queue = SpendUpdateQueue()
|
||||||
|
self.daily_spend_update_queue = DailySpendUpdateQueue()
|
||||||
|
|
||||||
async def update_database(
|
async def update_database(
|
||||||
# LiteLLM management object fields
|
# LiteLLM management object fields
|
||||||
|
@ -119,7 +121,7 @@ class DBSpendUpdateWriter:
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if disable_spend_logs is False:
|
if disable_spend_logs is False:
|
||||||
await DBSpendUpdateWriter._insert_spend_log_to_db(
|
await self._insert_spend_log_to_db(
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
completion_response=completion_response,
|
completion_response=completion_response,
|
||||||
start_time=start_time,
|
start_time=start_time,
|
||||||
|
@ -278,8 +280,8 @@ class DBSpendUpdateWriter:
|
||||||
)
|
)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
async def _insert_spend_log_to_db(
|
async def _insert_spend_log_to_db(
|
||||||
|
self,
|
||||||
kwargs: Optional[dict],
|
kwargs: Optional[dict],
|
||||||
completion_response: Optional[Union[litellm.ModelResponse, Any, Exception]],
|
completion_response: Optional[Union[litellm.ModelResponse, Any, Exception]],
|
||||||
start_time: Optional[datetime],
|
start_time: Optional[datetime],
|
||||||
|
@ -300,7 +302,7 @@ class DBSpendUpdateWriter:
|
||||||
end_time=end_time,
|
end_time=end_time,
|
||||||
)
|
)
|
||||||
payload["spend"] = response_cost or 0.0
|
payload["spend"] = response_cost or 0.0
|
||||||
DBSpendUpdateWriter._set_spend_logs_payload(
|
await self._set_spend_logs_payload(
|
||||||
payload=payload,
|
payload=payload,
|
||||||
spend_logs_url=os.getenv("SPEND_LOGS_URL"),
|
spend_logs_url=os.getenv("SPEND_LOGS_URL"),
|
||||||
prisma_client=prisma_client,
|
prisma_client=prisma_client,
|
||||||
|
@ -311,8 +313,8 @@ class DBSpendUpdateWriter:
|
||||||
)
|
)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
@staticmethod
|
async def _set_spend_logs_payload(
|
||||||
def _set_spend_logs_payload(
|
self,
|
||||||
payload: Union[dict, SpendLogsPayload],
|
payload: Union[dict, SpendLogsPayload],
|
||||||
prisma_client: PrismaClient,
|
prisma_client: PrismaClient,
|
||||||
spend_logs_url: Optional[str] = None,
|
spend_logs_url: Optional[str] = None,
|
||||||
|
@ -331,8 +333,9 @@ class DBSpendUpdateWriter:
|
||||||
elif prisma_client is not None:
|
elif prisma_client is not None:
|
||||||
prisma_client.spend_log_transactions.append(payload)
|
prisma_client.spend_log_transactions.append(payload)
|
||||||
|
|
||||||
prisma_client.add_spend_log_transaction_to_daily_user_transaction(
|
await self.add_spend_log_transaction_to_daily_user_transaction(
|
||||||
payload.copy()
|
payload=payload.copy(),
|
||||||
|
prisma_client=prisma_client,
|
||||||
)
|
)
|
||||||
return prisma_client
|
return prisma_client
|
||||||
|
|
||||||
|
@ -390,6 +393,7 @@ class DBSpendUpdateWriter:
|
||||||
"""
|
"""
|
||||||
await self.redis_update_buffer.store_in_memory_spend_updates_in_redis(
|
await self.redis_update_buffer.store_in_memory_spend_updates_in_redis(
|
||||||
spend_update_queue=self.spend_update_queue,
|
spend_update_queue=self.spend_update_queue,
|
||||||
|
daily_spend_update_queue=self.daily_spend_update_queue,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Only commit from redis to db if this pod is the leader
|
# Only commit from redis to db if this pod is the leader
|
||||||
|
@ -400,6 +404,9 @@ class DBSpendUpdateWriter:
|
||||||
db_spend_update_transactions = (
|
db_spend_update_transactions = (
|
||||||
await self.redis_update_buffer.get_all_update_transactions_from_redis_buffer()
|
await self.redis_update_buffer.get_all_update_transactions_from_redis_buffer()
|
||||||
)
|
)
|
||||||
|
daily_spend_update_transactions = (
|
||||||
|
await self.redis_update_buffer.get_all_daily_spend_update_transactions_from_redis_buffer()
|
||||||
|
)
|
||||||
if db_spend_update_transactions is not None:
|
if db_spend_update_transactions is not None:
|
||||||
await self._commit_spend_updates_to_db(
|
await self._commit_spend_updates_to_db(
|
||||||
prisma_client=prisma_client,
|
prisma_client=prisma_client,
|
||||||
|
@ -407,6 +414,13 @@ class DBSpendUpdateWriter:
|
||||||
proxy_logging_obj=proxy_logging_obj,
|
proxy_logging_obj=proxy_logging_obj,
|
||||||
db_spend_update_transactions=db_spend_update_transactions,
|
db_spend_update_transactions=db_spend_update_transactions,
|
||||||
)
|
)
|
||||||
|
if daily_spend_update_transactions is not None:
|
||||||
|
await DBSpendUpdateWriter.update_daily_user_spend(
|
||||||
|
n_retry_times=n_retry_times,
|
||||||
|
prisma_client=prisma_client,
|
||||||
|
proxy_logging_obj=proxy_logging_obj,
|
||||||
|
daily_spend_transactions=daily_spend_update_transactions,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verbose_proxy_logger.error(f"Error committing spend updates: {e}")
|
verbose_proxy_logger.error(f"Error committing spend updates: {e}")
|
||||||
finally:
|
finally:
|
||||||
|
@ -428,6 +442,9 @@ class DBSpendUpdateWriter:
|
||||||
db_spend_update_transactions = (
|
db_spend_update_transactions = (
|
||||||
await self.spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions()
|
await self.spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions()
|
||||||
)
|
)
|
||||||
|
daily_spend_update_transactions = (
|
||||||
|
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
|
||||||
|
)
|
||||||
await self._commit_spend_updates_to_db(
|
await self._commit_spend_updates_to_db(
|
||||||
prisma_client=prisma_client,
|
prisma_client=prisma_client,
|
||||||
n_retry_times=n_retry_times,
|
n_retry_times=n_retry_times,
|
||||||
|
@ -435,6 +452,13 @@ class DBSpendUpdateWriter:
|
||||||
db_spend_update_transactions=db_spend_update_transactions,
|
db_spend_update_transactions=db_spend_update_transactions,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
await DBSpendUpdateWriter.update_daily_user_spend(
|
||||||
|
n_retry_times=n_retry_times,
|
||||||
|
prisma_client=prisma_client,
|
||||||
|
proxy_logging_obj=proxy_logging_obj,
|
||||||
|
daily_spend_transactions=daily_spend_update_transactions,
|
||||||
|
)
|
||||||
|
|
||||||
async def _commit_spend_updates_to_db( # noqa: PLR0915
|
async def _commit_spend_updates_to_db( # noqa: PLR0915
|
||||||
self,
|
self,
|
||||||
prisma_client: PrismaClient,
|
prisma_client: PrismaClient,
|
||||||
|
@ -679,3 +703,192 @@ class DBSpendUpdateWriter:
|
||||||
_raise_failed_update_spend_exception(
|
_raise_failed_update_spend_exception(
|
||||||
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def update_daily_user_spend(
|
||||||
|
n_retry_times: int,
|
||||||
|
prisma_client: PrismaClient,
|
||||||
|
proxy_logging_obj: ProxyLogging,
|
||||||
|
daily_spend_transactions: Dict[str, DailyUserSpendTransaction],
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Batch job to update LiteLLM_DailyUserSpend table using in-memory daily_spend_transactions
|
||||||
|
"""
|
||||||
|
from litellm.proxy.utils import _raise_failed_update_spend_exception
|
||||||
|
|
||||||
|
### UPDATE DAILY USER SPEND ###
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
"Daily User Spend transactions: {}".format(len(daily_spend_transactions))
|
||||||
|
)
|
||||||
|
BATCH_SIZE = (
|
||||||
|
100 # Number of aggregated records to update in each database operation
|
||||||
|
)
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
for i in range(n_retry_times + 1):
|
||||||
|
try:
|
||||||
|
# Get transactions to process
|
||||||
|
transactions_to_process = dict(
|
||||||
|
list(daily_spend_transactions.items())[:BATCH_SIZE]
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(transactions_to_process) == 0:
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
"No new transactions to process for daily spend update"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Update DailyUserSpend table in batches
|
||||||
|
async with prisma_client.db.batch_() as batcher:
|
||||||
|
for _, transaction in transactions_to_process.items():
|
||||||
|
user_id = transaction.get("user_id")
|
||||||
|
if not user_id: # Skip if no user_id
|
||||||
|
continue
|
||||||
|
|
||||||
|
batcher.litellm_dailyuserspend.upsert(
|
||||||
|
where={
|
||||||
|
"user_id_date_api_key_model_custom_llm_provider": {
|
||||||
|
"user_id": user_id,
|
||||||
|
"date": transaction["date"],
|
||||||
|
"api_key": transaction["api_key"],
|
||||||
|
"model": transaction["model"],
|
||||||
|
"custom_llm_provider": transaction.get(
|
||||||
|
"custom_llm_provider"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data={
|
||||||
|
"create": {
|
||||||
|
"user_id": user_id,
|
||||||
|
"date": transaction["date"],
|
||||||
|
"api_key": transaction["api_key"],
|
||||||
|
"model": transaction["model"],
|
||||||
|
"model_group": transaction.get("model_group"),
|
||||||
|
"custom_llm_provider": transaction.get(
|
||||||
|
"custom_llm_provider"
|
||||||
|
),
|
||||||
|
"prompt_tokens": transaction["prompt_tokens"],
|
||||||
|
"completion_tokens": transaction[
|
||||||
|
"completion_tokens"
|
||||||
|
],
|
||||||
|
"spend": transaction["spend"],
|
||||||
|
"api_requests": transaction["api_requests"],
|
||||||
|
"successful_requests": transaction[
|
||||||
|
"successful_requests"
|
||||||
|
],
|
||||||
|
"failed_requests": transaction[
|
||||||
|
"failed_requests"
|
||||||
|
],
|
||||||
|
},
|
||||||
|
"update": {
|
||||||
|
"prompt_tokens": {
|
||||||
|
"increment": transaction["prompt_tokens"]
|
||||||
|
},
|
||||||
|
"completion_tokens": {
|
||||||
|
"increment": transaction[
|
||||||
|
"completion_tokens"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"spend": {"increment": transaction["spend"]},
|
||||||
|
"api_requests": {
|
||||||
|
"increment": transaction["api_requests"]
|
||||||
|
},
|
||||||
|
"successful_requests": {
|
||||||
|
"increment": transaction[
|
||||||
|
"successful_requests"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"failed_requests": {
|
||||||
|
"increment": transaction["failed_requests"]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
verbose_proxy_logger.info(
|
||||||
|
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Remove processed transactions
|
||||||
|
for key in transactions_to_process.keys():
|
||||||
|
daily_spend_transactions.pop(key, None)
|
||||||
|
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
except DB_CONNECTION_ERROR_TYPES as e:
|
||||||
|
if i >= n_retry_times:
|
||||||
|
_raise_failed_update_spend_exception(
|
||||||
|
e=e,
|
||||||
|
start_time=start_time,
|
||||||
|
proxy_logging_obj=proxy_logging_obj,
|
||||||
|
)
|
||||||
|
await asyncio.sleep(2**i) # Exponential backoff
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# Remove processed transactions even if there was an error
|
||||||
|
if "transactions_to_process" in locals():
|
||||||
|
for key in transactions_to_process.keys(): # type: ignore
|
||||||
|
daily_spend_transactions.pop(key, None)
|
||||||
|
_raise_failed_update_spend_exception(
|
||||||
|
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
||||||
|
)
|
||||||
|
|
||||||
|
async def add_spend_log_transaction_to_daily_user_transaction(
|
||||||
|
self,
|
||||||
|
payload: Union[dict, SpendLogsPayload],
|
||||||
|
prisma_client: PrismaClient,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Add a spend log transaction to the daily user transaction list
|
||||||
|
|
||||||
|
Key = @@unique([user_id, date, api_key, model, custom_llm_provider]) )
|
||||||
|
|
||||||
|
If key exists, update the transaction with the new spend and usage
|
||||||
|
"""
|
||||||
|
expected_keys = ["user", "startTime", "api_key", "model", "custom_llm_provider"]
|
||||||
|
|
||||||
|
if not all(key in payload for key in expected_keys):
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
request_status = prisma_client.get_request_status(payload)
|
||||||
|
verbose_proxy_logger.info(f"Logged request status: {request_status}")
|
||||||
|
if isinstance(payload["startTime"], datetime):
|
||||||
|
start_time = payload["startTime"].isoformat()
|
||||||
|
date = start_time.split("T")[0]
|
||||||
|
elif isinstance(payload["startTime"], str):
|
||||||
|
date = payload["startTime"].split("T")[0]
|
||||||
|
else:
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
daily_transaction_key = f"{payload['user']}_{date}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
|
||||||
|
daily_transaction = DailyUserSpendTransaction(
|
||||||
|
user_id=payload["user"],
|
||||||
|
date=date,
|
||||||
|
api_key=payload["api_key"],
|
||||||
|
model=payload["model"],
|
||||||
|
model_group=payload["model_group"],
|
||||||
|
custom_llm_provider=payload["custom_llm_provider"],
|
||||||
|
prompt_tokens=payload["prompt_tokens"],
|
||||||
|
completion_tokens=payload["completion_tokens"],
|
||||||
|
spend=payload["spend"],
|
||||||
|
api_requests=1,
|
||||||
|
successful_requests=1 if request_status == "success" else 0,
|
||||||
|
failed_requests=1 if request_status != "success" else 0,
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.daily_spend_update_queue.add_update(
|
||||||
|
update={daily_transaction_key: daily_transaction}
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
|
|
@ -9,10 +9,14 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
||||||
|
|
||||||
from litellm._logging import verbose_proxy_logger
|
from litellm._logging import verbose_proxy_logger
|
||||||
from litellm.caching import RedisCache
|
from litellm.caching import RedisCache
|
||||||
from litellm.constants import MAX_REDIS_BUFFER_DEQUEUE_COUNT, REDIS_UPDATE_BUFFER_KEY
|
from litellm.constants import (
|
||||||
|
MAX_REDIS_BUFFER_DEQUEUE_COUNT,
|
||||||
|
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY,
|
||||||
|
REDIS_UPDATE_BUFFER_KEY,
|
||||||
|
)
|
||||||
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
|
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
|
||||||
from litellm.proxy._types import DBSpendUpdateTransactions
|
from litellm.proxy._types import DailyUserSpendTransaction, DBSpendUpdateTransactions
|
||||||
from litellm.proxy.db.spend_update_queue import SpendUpdateQueue
|
from litellm.proxy.db.spend_update_queue import DailySpendUpdateQueue, SpendUpdateQueue
|
||||||
from litellm.secret_managers.main import str_to_bool
|
from litellm.secret_managers.main import str_to_bool
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -56,10 +60,16 @@ class RedisUpdateBuffer:
|
||||||
async def store_in_memory_spend_updates_in_redis(
|
async def store_in_memory_spend_updates_in_redis(
|
||||||
self,
|
self,
|
||||||
spend_update_queue: SpendUpdateQueue,
|
spend_update_queue: SpendUpdateQueue,
|
||||||
|
daily_spend_update_queue: DailySpendUpdateQueue,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Stores the in-memory spend updates to Redis
|
Stores the in-memory spend updates to Redis
|
||||||
|
|
||||||
|
Stores the following in memory data structures in Redis:
|
||||||
|
- SpendUpdateQueue - Key, User, Team, TeamMember, Org, EndUser Spend updates
|
||||||
|
- DailySpendUpdateQueue - Daily Spend updates Aggregate view
|
||||||
|
|
||||||
|
For SpendUpdateQueue:
|
||||||
Each transaction is a dict stored as following:
|
Each transaction is a dict stored as following:
|
||||||
- key is the entity id
|
- key is the entity id
|
||||||
- value is the spend amount
|
- value is the spend amount
|
||||||
|
@ -73,6 +83,28 @@ class RedisUpdateBuffer:
|
||||||
"0929880203": 0.001,
|
"0929880203": 0.001,
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
For DailySpendUpdateQueue:
|
||||||
|
Each transaction is a Dict[str, DailyUserSpendTransaction] stored as following:
|
||||||
|
- key is the daily_transaction_key
|
||||||
|
- value is the DailyUserSpendTransaction
|
||||||
|
|
||||||
|
```
|
||||||
|
Redis List:
|
||||||
|
daily_spend_update_transactions:
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"user_keyhash_1_model_1": {
|
||||||
|
"spend": 1.2,
|
||||||
|
"prompt_tokens": 1000,
|
||||||
|
"completion_tokens": 1000,
|
||||||
|
"api_requests": 1000,
|
||||||
|
"successful_requests": 1000,
|
||||||
|
},
|
||||||
|
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
"""
|
"""
|
||||||
if self.redis_cache is None:
|
if self.redis_cache is None:
|
||||||
verbose_proxy_logger.debug(
|
verbose_proxy_logger.debug(
|
||||||
|
@ -86,6 +118,12 @@ class RedisUpdateBuffer:
|
||||||
verbose_proxy_logger.debug(
|
verbose_proxy_logger.debug(
|
||||||
"ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions
|
"ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions
|
||||||
)
|
)
|
||||||
|
daily_spend_update_transactions = (
|
||||||
|
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
|
||||||
|
)
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
"ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions
|
||||||
|
)
|
||||||
|
|
||||||
# only store in redis if there are any updates to commit
|
# only store in redis if there are any updates to commit
|
||||||
if (
|
if (
|
||||||
|
@ -100,6 +138,14 @@ class RedisUpdateBuffer:
|
||||||
values=list_of_transactions,
|
values=list_of_transactions,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
list_of_daily_spend_update_transactions = [
|
||||||
|
safe_dumps(daily_spend_update_transactions)
|
||||||
|
]
|
||||||
|
await self.redis_cache.async_rpush(
|
||||||
|
key=REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY,
|
||||||
|
values=list_of_daily_spend_update_transactions,
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _number_of_transactions_to_store_in_redis(
|
def _number_of_transactions_to_store_in_redis(
|
||||||
db_spend_update_transactions: DBSpendUpdateTransactions,
|
db_spend_update_transactions: DBSpendUpdateTransactions,
|
||||||
|
@ -180,6 +226,27 @@ class RedisUpdateBuffer:
|
||||||
|
|
||||||
return combined_transaction
|
return combined_transaction
|
||||||
|
|
||||||
|
async def get_all_daily_spend_update_transactions_from_redis_buffer(
|
||||||
|
self,
|
||||||
|
) -> Optional[Dict[str, DailyUserSpendTransaction]]:
|
||||||
|
"""
|
||||||
|
Gets all the daily spend update transactions from Redis
|
||||||
|
"""
|
||||||
|
if self.redis_cache is None:
|
||||||
|
return None
|
||||||
|
list_of_transactions = await self.redis_cache.async_lpop(
|
||||||
|
key=REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY,
|
||||||
|
count=MAX_REDIS_BUFFER_DEQUEUE_COUNT,
|
||||||
|
)
|
||||||
|
if list_of_transactions is None:
|
||||||
|
return None
|
||||||
|
list_of_daily_spend_update_transactions = [
|
||||||
|
json.loads(transaction) for transaction in list_of_transactions
|
||||||
|
]
|
||||||
|
return DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
|
||||||
|
list_of_daily_spend_update_transactions
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _parse_list_of_transactions(
|
def _parse_list_of_transactions(
|
||||||
list_of_transactions: Union[Any, List[Any]],
|
list_of_transactions: Union[Any, List[Any]],
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import TYPE_CHECKING, Any, List
|
from typing import TYPE_CHECKING, Any, Dict, List
|
||||||
|
|
||||||
from litellm._logging import verbose_proxy_logger
|
from litellm._logging import verbose_proxy_logger
|
||||||
from litellm.proxy._types import (
|
from litellm.proxy._types import (
|
||||||
|
DailyUserSpendTransaction,
|
||||||
DBSpendUpdateTransactions,
|
DBSpendUpdateTransactions,
|
||||||
Litellm_EntityType,
|
Litellm_EntityType,
|
||||||
SpendUpdateQueueItem,
|
SpendUpdateQueueItem,
|
||||||
|
@ -130,3 +131,61 @@ class SpendUpdateQueue:
|
||||||
transactions_dict[entity_id] += response_cost or 0
|
transactions_dict[entity_id] += response_cost or 0
|
||||||
|
|
||||||
return db_spend_update_transactions
|
return db_spend_update_transactions
|
||||||
|
|
||||||
|
|
||||||
|
class DailySpendUpdateQueue:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
):
|
||||||
|
self.update_queue: asyncio.Queue[
|
||||||
|
Dict[str, DailyUserSpendTransaction]
|
||||||
|
] = asyncio.Queue()
|
||||||
|
|
||||||
|
async def add_update(self, update: Dict[str, DailyUserSpendTransaction]) -> None:
|
||||||
|
"""Enqueue an update. Each update might be a dict like {'entity_type': 'user', 'entity_id': '123', 'amount': 1.2}."""
|
||||||
|
verbose_proxy_logger.debug("Adding update to queue: %s", update)
|
||||||
|
await self.update_queue.put(update)
|
||||||
|
|
||||||
|
async def flush_all_updates_from_in_memory_queue(
|
||||||
|
self,
|
||||||
|
) -> List[Dict[str, DailyUserSpendTransaction]]:
|
||||||
|
"""Get all updates from the queue."""
|
||||||
|
updates: List[Dict[str, DailyUserSpendTransaction]] = []
|
||||||
|
while not self.update_queue.empty():
|
||||||
|
updates.append(await self.update_queue.get())
|
||||||
|
return updates
|
||||||
|
|
||||||
|
async def flush_and_get_aggregated_daily_spend_update_transactions(
|
||||||
|
self,
|
||||||
|
) -> Dict[str, DailyUserSpendTransaction]:
|
||||||
|
"""Get all updates from the queue and return all updates aggregated by daily_transaction_key."""
|
||||||
|
updates = await self.flush_all_updates_from_in_memory_queue()
|
||||||
|
return DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
|
||||||
|
updates
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_aggregated_daily_spend_update_transactions(
|
||||||
|
updates: List[Dict[str, DailyUserSpendTransaction]]
|
||||||
|
) -> Dict[str, DailyUserSpendTransaction]:
|
||||||
|
"""Aggregate updates by daily_transaction_key."""
|
||||||
|
aggregated_daily_spend_update_transactions: Dict[
|
||||||
|
str, DailyUserSpendTransaction
|
||||||
|
] = {}
|
||||||
|
for _update in updates:
|
||||||
|
for _key, payload in _update.items():
|
||||||
|
if _key in aggregated_daily_spend_update_transactions:
|
||||||
|
daily_transaction = aggregated_daily_spend_update_transactions[_key]
|
||||||
|
daily_transaction["spend"] += payload["spend"]
|
||||||
|
daily_transaction["prompt_tokens"] += payload["prompt_tokens"]
|
||||||
|
daily_transaction["completion_tokens"] += payload[
|
||||||
|
"completion_tokens"
|
||||||
|
]
|
||||||
|
daily_transaction["api_requests"] += payload["api_requests"]
|
||||||
|
daily_transaction["successful_requests"] += payload[
|
||||||
|
"successful_requests"
|
||||||
|
]
|
||||||
|
daily_transaction["failed_requests"] += payload["failed_requests"]
|
||||||
|
else:
|
||||||
|
aggregated_daily_spend_update_transactions[_key] = payload
|
||||||
|
return aggregated_daily_spend_update_transactions
|
||||||
|
|
|
@ -25,7 +25,6 @@ from typing import (
|
||||||
from litellm.proxy._types import (
|
from litellm.proxy._types import (
|
||||||
DB_CONNECTION_ERROR_TYPES,
|
DB_CONNECTION_ERROR_TYPES,
|
||||||
CommonProxyErrors,
|
CommonProxyErrors,
|
||||||
DailyUserSpendTransaction,
|
|
||||||
ProxyErrorTypes,
|
ProxyErrorTypes,
|
||||||
ProxyException,
|
ProxyException,
|
||||||
SpendLogsMetadata,
|
SpendLogsMetadata,
|
||||||
|
@ -1112,7 +1111,6 @@ def jsonify_object(data: dict) -> dict:
|
||||||
|
|
||||||
class PrismaClient:
|
class PrismaClient:
|
||||||
spend_log_transactions: List = []
|
spend_log_transactions: List = []
|
||||||
daily_user_spend_transactions: Dict[str, DailyUserSpendTransaction] = {}
|
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
@ -1185,74 +1183,6 @@ class PrismaClient:
|
||||||
# Default to success if metadata parsing fails
|
# Default to success if metadata parsing fails
|
||||||
return "success"
|
return "success"
|
||||||
|
|
||||||
def add_spend_log_transaction_to_daily_user_transaction(
|
|
||||||
self, payload: Union[dict, SpendLogsPayload]
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Add a spend log transaction to the daily user transaction list
|
|
||||||
|
|
||||||
Key = @@unique([user_id, date, api_key, model, custom_llm_provider]) )
|
|
||||||
|
|
||||||
If key exists, update the transaction with the new spend and usage
|
|
||||||
"""
|
|
||||||
expected_keys = ["user", "startTime", "api_key", "model", "custom_llm_provider"]
|
|
||||||
|
|
||||||
if not all(key in payload for key in expected_keys):
|
|
||||||
verbose_proxy_logger.debug(
|
|
||||||
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
request_status = self.get_request_status(payload)
|
|
||||||
verbose_proxy_logger.info(f"Logged request status: {request_status}")
|
|
||||||
if isinstance(payload["startTime"], datetime):
|
|
||||||
start_time = payload["startTime"].isoformat()
|
|
||||||
date = start_time.split("T")[0]
|
|
||||||
elif isinstance(payload["startTime"], str):
|
|
||||||
date = payload["startTime"].split("T")[0]
|
|
||||||
else:
|
|
||||||
verbose_proxy_logger.debug(
|
|
||||||
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
daily_transaction_key = f"{payload['user']}_{date}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
|
|
||||||
|
|
||||||
if daily_transaction_key in self.daily_user_spend_transactions:
|
|
||||||
daily_transaction = self.daily_user_spend_transactions[
|
|
||||||
daily_transaction_key
|
|
||||||
]
|
|
||||||
daily_transaction["spend"] += payload["spend"]
|
|
||||||
daily_transaction["prompt_tokens"] += payload["prompt_tokens"]
|
|
||||||
daily_transaction["completion_tokens"] += payload["completion_tokens"]
|
|
||||||
daily_transaction["api_requests"] += 1
|
|
||||||
|
|
||||||
if request_status == "success":
|
|
||||||
daily_transaction["successful_requests"] += 1
|
|
||||||
else:
|
|
||||||
daily_transaction["failed_requests"] += 1
|
|
||||||
else:
|
|
||||||
daily_transaction = DailyUserSpendTransaction(
|
|
||||||
user_id=payload["user"],
|
|
||||||
date=date,
|
|
||||||
api_key=payload["api_key"],
|
|
||||||
model=payload["model"],
|
|
||||||
model_group=payload["model_group"],
|
|
||||||
custom_llm_provider=payload["custom_llm_provider"],
|
|
||||||
prompt_tokens=payload["prompt_tokens"],
|
|
||||||
completion_tokens=payload["completion_tokens"],
|
|
||||||
spend=payload["spend"],
|
|
||||||
api_requests=1,
|
|
||||||
successful_requests=1 if request_status == "success" else 0,
|
|
||||||
failed_requests=1 if request_status != "success" else 0,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.daily_user_spend_transactions[
|
|
||||||
daily_transaction_key
|
|
||||||
] = daily_transaction
|
|
||||||
except Exception as e:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def hash_token(self, token: str):
|
def hash_token(self, token: str):
|
||||||
# Hash the string using SHA-256
|
# Hash the string using SHA-256
|
||||||
hashed_token = hashlib.sha256(token.encode()).hexdigest()
|
hashed_token = hashlib.sha256(token.encode()).hexdigest()
|
||||||
|
@ -2588,134 +2518,6 @@ class ProxyUpdateSpend:
|
||||||
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
async def update_daily_user_spend(
|
|
||||||
n_retry_times: int,
|
|
||||||
prisma_client: PrismaClient,
|
|
||||||
proxy_logging_obj: ProxyLogging,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Batch job to update LiteLLM_DailyUserSpend table using in-memory daily_spend_transactions
|
|
||||||
"""
|
|
||||||
BATCH_SIZE = (
|
|
||||||
100 # Number of aggregated records to update in each database operation
|
|
||||||
)
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
try:
|
|
||||||
for i in range(n_retry_times + 1):
|
|
||||||
try:
|
|
||||||
# Get transactions to process
|
|
||||||
transactions_to_process = dict(
|
|
||||||
list(prisma_client.daily_user_spend_transactions.items())[
|
|
||||||
:BATCH_SIZE
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(transactions_to_process) == 0:
|
|
||||||
verbose_proxy_logger.debug(
|
|
||||||
"No new transactions to process for daily spend update"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
# Update DailyUserSpend table in batches
|
|
||||||
async with prisma_client.db.batch_() as batcher:
|
|
||||||
for _, transaction in transactions_to_process.items():
|
|
||||||
user_id = transaction.get("user_id")
|
|
||||||
if not user_id: # Skip if no user_id
|
|
||||||
continue
|
|
||||||
|
|
||||||
batcher.litellm_dailyuserspend.upsert(
|
|
||||||
where={
|
|
||||||
"user_id_date_api_key_model_custom_llm_provider": {
|
|
||||||
"user_id": user_id,
|
|
||||||
"date": transaction["date"],
|
|
||||||
"api_key": transaction["api_key"],
|
|
||||||
"model": transaction["model"],
|
|
||||||
"custom_llm_provider": transaction.get(
|
|
||||||
"custom_llm_provider"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
data={
|
|
||||||
"create": {
|
|
||||||
"user_id": user_id,
|
|
||||||
"date": transaction["date"],
|
|
||||||
"api_key": transaction["api_key"],
|
|
||||||
"model": transaction["model"],
|
|
||||||
"model_group": transaction.get("model_group"),
|
|
||||||
"custom_llm_provider": transaction.get(
|
|
||||||
"custom_llm_provider"
|
|
||||||
),
|
|
||||||
"prompt_tokens": transaction["prompt_tokens"],
|
|
||||||
"completion_tokens": transaction[
|
|
||||||
"completion_tokens"
|
|
||||||
],
|
|
||||||
"spend": transaction["spend"],
|
|
||||||
"api_requests": transaction["api_requests"],
|
|
||||||
"successful_requests": transaction[
|
|
||||||
"successful_requests"
|
|
||||||
],
|
|
||||||
"failed_requests": transaction[
|
|
||||||
"failed_requests"
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"update": {
|
|
||||||
"prompt_tokens": {
|
|
||||||
"increment": transaction["prompt_tokens"]
|
|
||||||
},
|
|
||||||
"completion_tokens": {
|
|
||||||
"increment": transaction[
|
|
||||||
"completion_tokens"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"spend": {"increment": transaction["spend"]},
|
|
||||||
"api_requests": {
|
|
||||||
"increment": transaction["api_requests"]
|
|
||||||
},
|
|
||||||
"successful_requests": {
|
|
||||||
"increment": transaction[
|
|
||||||
"successful_requests"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"failed_requests": {
|
|
||||||
"increment": transaction["failed_requests"]
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
verbose_proxy_logger.info(
|
|
||||||
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Remove processed transactions
|
|
||||||
for key in transactions_to_process.keys():
|
|
||||||
prisma_client.daily_user_spend_transactions.pop(key, None)
|
|
||||||
|
|
||||||
verbose_proxy_logger.debug(
|
|
||||||
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
except DB_CONNECTION_ERROR_TYPES as e:
|
|
||||||
if i >= n_retry_times:
|
|
||||||
_raise_failed_update_spend_exception(
|
|
||||||
e=e,
|
|
||||||
start_time=start_time,
|
|
||||||
proxy_logging_obj=proxy_logging_obj,
|
|
||||||
)
|
|
||||||
await asyncio.sleep(2**i) # Exponential backoff
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
# Remove processed transactions even if there was an error
|
|
||||||
if "transactions_to_process" in locals():
|
|
||||||
for key in transactions_to_process.keys(): # type: ignore
|
|
||||||
prisma_client.daily_user_spend_transactions.pop(key, None)
|
|
||||||
_raise_failed_update_spend_exception(
|
|
||||||
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def disable_spend_updates() -> bool:
|
def disable_spend_updates() -> bool:
|
||||||
"""
|
"""
|
||||||
|
@ -2765,20 +2567,6 @@ async def update_spend( # noqa: PLR0915
|
||||||
db_writer_client=db_writer_client,
|
db_writer_client=db_writer_client,
|
||||||
)
|
)
|
||||||
|
|
||||||
### UPDATE DAILY USER SPEND ###
|
|
||||||
verbose_proxy_logger.debug(
|
|
||||||
"Daily User Spend transactions: {}".format(
|
|
||||||
len(prisma_client.daily_user_spend_transactions)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(prisma_client.daily_user_spend_transactions) > 0:
|
|
||||||
await ProxyUpdateSpend.update_daily_user_spend(
|
|
||||||
n_retry_times=n_retry_times,
|
|
||||||
prisma_client=prisma_client,
|
|
||||||
proxy_logging_obj=proxy_logging_obj,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _raise_failed_update_spend_exception(
|
def _raise_failed_update_spend_exception(
|
||||||
e: Exception, start_time: float, proxy_logging_obj: ProxyLogging
|
e: Exception, start_time: float, proxy_logging_obj: ProxyLogging
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue