forked from phoenix/litellm-mirror
fix - remove deprecated clickhouse utils
This commit is contained in:
parent
f2b3770869
commit
2f9aa308f1
2 changed files with 0 additions and 238 deletions
|
@ -122,236 +122,6 @@ async def ui_get_spend_by_tags(
|
||||||
return {"spend_per_tag": ui_tags}
|
return {"spend_per_tag": ui_tags}
|
||||||
|
|
||||||
|
|
||||||
async def view_spend_logs_from_clickhouse(
|
|
||||||
api_key=None, user_id=None, request_id=None, start_date=None, end_date=None
|
|
||||||
):
|
|
||||||
verbose_logger.debug("Reading logs from Clickhouse")
|
|
||||||
import os
|
|
||||||
|
|
||||||
# if user has setup clickhouse
|
|
||||||
# TODO: Move this to be a helper function
|
|
||||||
# querying clickhouse for this data
|
|
||||||
import clickhouse_connect
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
port = os.getenv("CLICKHOUSE_PORT")
|
|
||||||
if port is not None and isinstance(port, str):
|
|
||||||
port = int(port)
|
|
||||||
|
|
||||||
client = clickhouse_connect.get_client(
|
|
||||||
host=os.getenv("CLICKHOUSE_HOST"),
|
|
||||||
port=port,
|
|
||||||
username=os.getenv("CLICKHOUSE_USERNAME", ""),
|
|
||||||
password=os.getenv("CLICKHOUSE_PASSWORD", ""),
|
|
||||||
)
|
|
||||||
if (
|
|
||||||
start_date is not None
|
|
||||||
and isinstance(start_date, str)
|
|
||||||
and end_date is not None
|
|
||||||
and isinstance(end_date, str)
|
|
||||||
):
|
|
||||||
# Convert the date strings to datetime objects
|
|
||||||
start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
|
|
||||||
end_date_obj = datetime.strptime(end_date, "%Y-%m-%d")
|
|
||||||
|
|
||||||
# get top spend per day
|
|
||||||
response = client.query(
|
|
||||||
f"""
|
|
||||||
SELECT
|
|
||||||
toDate(startTime) AS day,
|
|
||||||
sum(spend) AS total_spend
|
|
||||||
FROM
|
|
||||||
spend_logs
|
|
||||||
WHERE
|
|
||||||
toDate(startTime) BETWEEN toDate('2024-02-01') AND toDate('2024-02-29')
|
|
||||||
GROUP BY
|
|
||||||
day
|
|
||||||
ORDER BY
|
|
||||||
total_spend
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
results = []
|
|
||||||
result_rows = list(response.result_rows)
|
|
||||||
for response in result_rows:
|
|
||||||
current_row = {}
|
|
||||||
current_row["users"] = {"example": 0.0}
|
|
||||||
current_row["models"] = {}
|
|
||||||
|
|
||||||
current_row["spend"] = float(response[1])
|
|
||||||
current_row["startTime"] = str(response[0])
|
|
||||||
|
|
||||||
# stubbed api_key
|
|
||||||
current_row[""] = 0.0 # type: ignore
|
|
||||||
results.append(current_row)
|
|
||||||
|
|
||||||
return results
|
|
||||||
else:
|
|
||||||
# check if spend logs exist, if it does then return last 10 logs, sorted in descending order of startTime
|
|
||||||
response = client.query(
|
|
||||||
"""
|
|
||||||
SELECT
|
|
||||||
*
|
|
||||||
FROM
|
|
||||||
default.spend_logs
|
|
||||||
ORDER BY
|
|
||||||
startTime DESC
|
|
||||||
LIMIT
|
|
||||||
10
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
# get size of spend logs
|
|
||||||
num_rows = client.query("SELECT count(*) FROM default.spend_logs")
|
|
||||||
num_rows = num_rows.result_rows[0][0]
|
|
||||||
|
|
||||||
# safely access num_rows.result_rows[0][0]
|
|
||||||
if num_rows is None:
|
|
||||||
num_rows = 0
|
|
||||||
|
|
||||||
raw_rows = list(response.result_rows)
|
|
||||||
response_data = {
|
|
||||||
"logs": raw_rows,
|
|
||||||
"log_count": num_rows,
|
|
||||||
}
|
|
||||||
return response_data
|
|
||||||
|
|
||||||
|
|
||||||
def _create_clickhouse_material_views(client=None, table_names=[]):
|
|
||||||
# Create Materialized Views if they don't exist
|
|
||||||
# Materialized Views send new inserted rows to the aggregate tables
|
|
||||||
|
|
||||||
verbose_logger.debug("Clickhouse: Creating Materialized Views")
|
|
||||||
if "daily_aggregated_spend_per_model_mv" not in table_names:
|
|
||||||
verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model_mv")
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE MATERIALIZED VIEW daily_aggregated_spend_per_model_mv
|
|
||||||
TO daily_aggregated_spend_per_model
|
|
||||||
AS
|
|
||||||
SELECT
|
|
||||||
toDate(startTime) as day,
|
|
||||||
sumState(spend) AS DailySpend,
|
|
||||||
model as model
|
|
||||||
FROM spend_logs
|
|
||||||
GROUP BY
|
|
||||||
day, model
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
if "daily_aggregated_spend_per_api_key_mv" not in table_names:
|
|
||||||
verbose_logger.debug(
|
|
||||||
"Clickhouse: Creating daily_aggregated_spend_per_api_key_mv"
|
|
||||||
)
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE MATERIALIZED VIEW daily_aggregated_spend_per_api_key_mv
|
|
||||||
TO daily_aggregated_spend_per_api_key
|
|
||||||
AS
|
|
||||||
SELECT
|
|
||||||
toDate(startTime) as day,
|
|
||||||
sumState(spend) AS DailySpend,
|
|
||||||
api_key as api_key
|
|
||||||
FROM spend_logs
|
|
||||||
GROUP BY
|
|
||||||
day, api_key
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
if "daily_aggregated_spend_per_user_mv" not in table_names:
|
|
||||||
verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user_mv")
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE MATERIALIZED VIEW daily_aggregated_spend_per_user_mv
|
|
||||||
TO daily_aggregated_spend_per_user
|
|
||||||
AS
|
|
||||||
SELECT
|
|
||||||
toDate(startTime) as day,
|
|
||||||
sumState(spend) AS DailySpend,
|
|
||||||
user as user
|
|
||||||
FROM spend_logs
|
|
||||||
GROUP BY
|
|
||||||
day, user
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
if "daily_aggregated_spend_mv" not in table_names:
|
|
||||||
verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_mv")
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE MATERIALIZED VIEW daily_aggregated_spend_mv
|
|
||||||
TO daily_aggregated_spend
|
|
||||||
AS
|
|
||||||
SELECT
|
|
||||||
toDate(startTime) as day,
|
|
||||||
sumState(spend) AS DailySpend
|
|
||||||
FROM spend_logs
|
|
||||||
GROUP BY
|
|
||||||
day
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _create_clickhouse_aggregate_tables(client=None, table_names=[]):
|
|
||||||
# Basic Logging works without this - this is only used for low latency reporting apis
|
|
||||||
verbose_logger.debug("Clickhouse: Creating Aggregate Tables")
|
|
||||||
|
|
||||||
# Create Aggregeate Tables if they don't exist
|
|
||||||
if "daily_aggregated_spend_per_model" not in table_names:
|
|
||||||
verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model")
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE TABLE daily_aggregated_spend_per_model
|
|
||||||
(
|
|
||||||
`day` Date,
|
|
||||||
`DailySpend` AggregateFunction(sum, Float64),
|
|
||||||
`model` String
|
|
||||||
)
|
|
||||||
ENGINE = SummingMergeTree()
|
|
||||||
ORDER BY (day, model);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
if "daily_aggregated_spend_per_api_key" not in table_names:
|
|
||||||
verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_api_key")
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE TABLE daily_aggregated_spend_per_api_key
|
|
||||||
(
|
|
||||||
`day` Date,
|
|
||||||
`DailySpend` AggregateFunction(sum, Float64),
|
|
||||||
`api_key` String
|
|
||||||
)
|
|
||||||
ENGINE = SummingMergeTree()
|
|
||||||
ORDER BY (day, api_key);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
if "daily_aggregated_spend_per_user" not in table_names:
|
|
||||||
verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user")
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE TABLE daily_aggregated_spend_per_user
|
|
||||||
(
|
|
||||||
`day` Date,
|
|
||||||
`DailySpend` AggregateFunction(sum, Float64),
|
|
||||||
`user` String
|
|
||||||
)
|
|
||||||
ENGINE = SummingMergeTree()
|
|
||||||
ORDER BY (day, user);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
if "daily_aggregated_spend" not in table_names:
|
|
||||||
verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend")
|
|
||||||
client.command(
|
|
||||||
"""
|
|
||||||
CREATE TABLE daily_aggregated_spend
|
|
||||||
(
|
|
||||||
`day` Date,
|
|
||||||
`DailySpend` AggregateFunction(sum, Float64),
|
|
||||||
)
|
|
||||||
ENGINE = SummingMergeTree()
|
|
||||||
ORDER BY (day);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def _forecast_daily_cost(data: list):
|
def _forecast_daily_cost(data: list):
|
||||||
import requests # type: ignore
|
import requests # type: ignore
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
|
@ -226,14 +226,6 @@ def _start_clickhouse():
|
||||||
response = client.query("DESCRIBE default.spend_logs")
|
response = client.query("DESCRIBE default.spend_logs")
|
||||||
verbose_logger.debug(f"spend logs schema ={response.result_rows}")
|
verbose_logger.debug(f"spend logs schema ={response.result_rows}")
|
||||||
|
|
||||||
# RUN Enterprise Clickhouse Setup
|
|
||||||
# TLDR: For Enterprise - we create views / aggregate tables for low latency reporting APIs
|
|
||||||
from litellm.proxy.enterprise.utils import _create_clickhouse_aggregate_tables
|
|
||||||
from litellm.proxy.enterprise.utils import _create_clickhouse_material_views
|
|
||||||
|
|
||||||
_create_clickhouse_aggregate_tables(client=client, table_names=table_names)
|
|
||||||
_create_clickhouse_material_views(client=client, table_names=table_names)
|
|
||||||
|
|
||||||
|
|
||||||
class ClickhouseLogger:
|
class ClickhouseLogger:
|
||||||
# Class variables or attributes
|
# Class variables or attributes
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue