forked from phoenix/litellm-mirror
* fix: fix type-checking errors * fix: fix additional type-checking errors * fix: additional type-checking error fixes * fix: fix additional type-checking errors * fix: additional type-check fixes * fix: fix all type-checking errors + add pyright to ci/cd * fix: fix incorrect import * ci(config.yml): use mypy on ci/cd * fix: fix type-checking errors in utils.py * fix: fix all type-checking errors on main.py * fix: fix mypy linting errors * fix(anthropic/cost_calculator.py): fix linting errors * fix: fix mypy linting errors * fix: fix linting errors
218 lines
6.5 KiB
Python
218 lines
6.5 KiB
Python
# Enterprise Proxy Util Endpoints
|
|
from typing import Optional, List
|
|
from litellm._logging import verbose_logger
|
|
from litellm.proxy.proxy_server import PrismaClient, HTTPException
|
|
import collections
|
|
from datetime import datetime
|
|
|
|
|
|
async def get_spend_by_tags(
|
|
prisma_client: PrismaClient, start_date=None, end_date=None
|
|
):
|
|
response = await prisma_client.db.query_raw(
|
|
"""
|
|
SELECT
|
|
jsonb_array_elements_text(request_tags) AS individual_request_tag,
|
|
COUNT(*) AS log_count,
|
|
SUM(spend) AS total_spend
|
|
FROM "LiteLLM_SpendLogs"
|
|
GROUP BY individual_request_tag;
|
|
"""
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
async def ui_get_spend_by_tags(
|
|
start_date: str,
|
|
end_date: str,
|
|
prisma_client: Optional[PrismaClient] = None,
|
|
tags_str: Optional[str] = None,
|
|
):
|
|
"""
|
|
Should cover 2 cases:
|
|
1. When user is getting spend for all_tags. "all_tags" in tags_list
|
|
2. When user is getting spend for specific tags.
|
|
"""
|
|
|
|
# tags_str is a list of strings csv of tags
|
|
# tags_str = tag1,tag2,tag3
|
|
# convert to list if it's not None
|
|
tags_list: Optional[List[str]] = None
|
|
if tags_str is not None and len(tags_str) > 0:
|
|
tags_list = tags_str.split(",")
|
|
|
|
if prisma_client is None:
|
|
raise HTTPException(status_code=500, detail={"error": "No db connected"})
|
|
|
|
response = None
|
|
if tags_list is None or (isinstance(tags_list, list) and "all-tags" in tags_list):
|
|
# Get spend for all tags
|
|
sql_query = """
|
|
SELECT
|
|
individual_request_tag,
|
|
spend_date,
|
|
log_count,
|
|
total_spend
|
|
FROM DailyTagSpend
|
|
WHERE spend_date >= $1::date AND spend_date <= $2::date
|
|
ORDER BY total_spend DESC;
|
|
"""
|
|
response = await prisma_client.db.query_raw(
|
|
sql_query,
|
|
start_date,
|
|
end_date,
|
|
)
|
|
else:
|
|
# filter by tags list
|
|
sql_query = """
|
|
SELECT
|
|
individual_request_tag,
|
|
SUM(log_count) AS log_count,
|
|
SUM(total_spend) AS total_spend
|
|
FROM DailyTagSpend
|
|
WHERE spend_date >= $1::date AND spend_date <= $2::date
|
|
AND individual_request_tag = ANY($3::text[])
|
|
GROUP BY individual_request_tag
|
|
ORDER BY total_spend DESC;
|
|
"""
|
|
response = await prisma_client.db.query_raw(
|
|
sql_query,
|
|
start_date,
|
|
end_date,
|
|
tags_list,
|
|
)
|
|
|
|
# print("tags - spend")
|
|
# print(response)
|
|
# Bar Chart 1 - Spend per tag - Top 10 tags by spend
|
|
total_spend_per_tag: collections.defaultdict = collections.defaultdict(float)
|
|
total_requests_per_tag: collections.defaultdict = collections.defaultdict(int)
|
|
for row in response:
|
|
tag_name = row["individual_request_tag"]
|
|
tag_spend = row["total_spend"]
|
|
|
|
total_spend_per_tag[tag_name] += tag_spend
|
|
total_requests_per_tag[tag_name] += row["log_count"]
|
|
|
|
sorted_tags = sorted(total_spend_per_tag.items(), key=lambda x: x[1], reverse=True)
|
|
# convert to ui format
|
|
ui_tags = []
|
|
for tag in sorted_tags:
|
|
current_spend = tag[1]
|
|
if current_spend is not None and isinstance(current_spend, float):
|
|
current_spend = round(current_spend, 4)
|
|
ui_tags.append(
|
|
{
|
|
"name": tag[0],
|
|
"spend": current_spend,
|
|
"log_count": total_requests_per_tag[tag[0]],
|
|
}
|
|
)
|
|
|
|
return {"spend_per_tag": ui_tags}
|
|
|
|
|
|
def _forecast_daily_cost(data: list):
|
|
import requests # type: ignore
|
|
from datetime import datetime, timedelta
|
|
|
|
if len(data) == 0:
|
|
return {
|
|
"response": [],
|
|
"predicted_spend": "Current Spend = $0, Predicted = $0",
|
|
}
|
|
first_entry = data[0]
|
|
last_entry = data[-1]
|
|
|
|
# get the date today
|
|
today_date = datetime.today().date()
|
|
|
|
today_day_month = today_date.month
|
|
|
|
# Parse the date from the first entry
|
|
first_entry_date = datetime.strptime(first_entry["date"], "%Y-%m-%d").date()
|
|
last_entry_date = datetime.strptime(last_entry["date"], "%Y-%m-%d")
|
|
|
|
print("last entry date", last_entry_date)
|
|
|
|
# Assuming today_date is a datetime object
|
|
today_date = datetime.now()
|
|
|
|
# Calculate the last day of the month
|
|
last_day_of_todays_month = datetime(
|
|
today_date.year, today_date.month % 12 + 1, 1
|
|
) - timedelta(days=1)
|
|
|
|
# Calculate the remaining days in the month
|
|
remaining_days = (last_day_of_todays_month - last_entry_date).days
|
|
|
|
current_spend_this_month = 0
|
|
series = {}
|
|
for entry in data:
|
|
date = entry["date"]
|
|
spend = entry["spend"]
|
|
series[date] = spend
|
|
|
|
# check if the date is in this month
|
|
if datetime.strptime(date, "%Y-%m-%d").month == today_day_month:
|
|
current_spend_this_month += spend
|
|
|
|
if len(series) < 10:
|
|
num_items_to_fill = 11 - len(series)
|
|
|
|
# avg spend for all days in series
|
|
avg_spend = sum(series.values()) / len(series)
|
|
for i in range(num_items_to_fill):
|
|
# go backwards from the first entry
|
|
date = first_entry_date - timedelta(days=i)
|
|
series[date.strftime("%Y-%m-%d")] = avg_spend
|
|
series[date.strftime("%Y-%m-%d")] = avg_spend
|
|
|
|
payload = {"series": series, "count": remaining_days}
|
|
print("Prediction Data:", payload)
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
response = requests.post(
|
|
url="https://trend-api-production.up.railway.app/forecast",
|
|
json=payload,
|
|
headers=headers,
|
|
)
|
|
# check the status code
|
|
response.raise_for_status()
|
|
|
|
json_response = response.json()
|
|
forecast_data = json_response["forecast"]
|
|
|
|
# print("Forecast Data:", forecast_data)
|
|
|
|
response_data = []
|
|
total_predicted_spend = current_spend_this_month
|
|
for date in forecast_data:
|
|
spend = forecast_data[date]
|
|
entry = {
|
|
"date": date,
|
|
"predicted_spend": spend,
|
|
}
|
|
total_predicted_spend += spend
|
|
response_data.append(entry)
|
|
|
|
# get month as a string, Jan, Feb, etc.
|
|
today_month = today_date.strftime("%B")
|
|
predicted_spend = (
|
|
f"Predicted Spend for { today_month } 2024, ${total_predicted_spend}"
|
|
)
|
|
return {"response": response_data, "predicted_spend": predicted_spend}
|
|
|
|
# print(f"Date: {entry['date']}, Spend: {entry['spend']}, Response: {response.text}")
|
|
|
|
|
|
# _forecast_daily_cost(
|
|
# [
|
|
# {"date": "2022-01-01", "spend": 100},
|
|
|
|
# ]
|
|
# )
|