diff --git a/enterprise/utils.py b/enterprise/utils.py index b8f660927..baff3b1b0 100644 --- a/enterprise/utils.py +++ b/enterprise/utils.py @@ -122,236 +122,6 @@ async def ui_get_spend_by_tags( return {"spend_per_tag": ui_tags} -async def view_spend_logs_from_clickhouse( - api_key=None, user_id=None, request_id=None, start_date=None, end_date=None -): - verbose_logger.debug("Reading logs from Clickhouse") - import os - - # if user has setup clickhouse - # TODO: Move this to be a helper function - # querying clickhouse for this data - import clickhouse_connect - from datetime import datetime - - port = os.getenv("CLICKHOUSE_PORT") - if port is not None and isinstance(port, str): - port = int(port) - - client = clickhouse_connect.get_client( - host=os.getenv("CLICKHOUSE_HOST"), - port=port, - username=os.getenv("CLICKHOUSE_USERNAME", ""), - password=os.getenv("CLICKHOUSE_PASSWORD", ""), - ) - if ( - start_date is not None - and isinstance(start_date, str) - and end_date is not None - and isinstance(end_date, str) - ): - # Convert the date strings to datetime objects - start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") - end_date_obj = datetime.strptime(end_date, "%Y-%m-%d") - - # get top spend per day - response = client.query( - f""" - SELECT - toDate(startTime) AS day, - sum(spend) AS total_spend - FROM - spend_logs - WHERE - toDate(startTime) BETWEEN toDate('2024-02-01') AND toDate('2024-02-29') - GROUP BY - day - ORDER BY - total_spend - """ - ) - - results = [] - result_rows = list(response.result_rows) - for response in result_rows: - current_row = {} - current_row["users"] = {"example": 0.0} - current_row["models"] = {} - - current_row["spend"] = float(response[1]) - current_row["startTime"] = str(response[0]) - - # stubbed api_key - current_row[""] = 0.0 # type: ignore - results.append(current_row) - - return results - else: - # check if spend logs exist, if it does then return last 10 logs, sorted in descending order of startTime - response = client.query( - """ - SELECT - * - FROM - default.spend_logs - ORDER BY - startTime DESC - LIMIT - 10 - """ - ) - - # get size of spend logs - num_rows = client.query("SELECT count(*) FROM default.spend_logs") - num_rows = num_rows.result_rows[0][0] - - # safely access num_rows.result_rows[0][0] - if num_rows is None: - num_rows = 0 - - raw_rows = list(response.result_rows) - response_data = { - "logs": raw_rows, - "log_count": num_rows, - } - return response_data - - -def _create_clickhouse_material_views(client=None, table_names=[]): - # Create Materialized Views if they don't exist - # Materialized Views send new inserted rows to the aggregate tables - - verbose_logger.debug("Clickhouse: Creating Materialized Views") - if "daily_aggregated_spend_per_model_mv" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model_mv") - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_per_model_mv - TO daily_aggregated_spend_per_model - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend, - model as model - FROM spend_logs - GROUP BY - day, model - """ - ) - if "daily_aggregated_spend_per_api_key_mv" not in table_names: - verbose_logger.debug( - "Clickhouse: Creating daily_aggregated_spend_per_api_key_mv" - ) - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_per_api_key_mv - TO daily_aggregated_spend_per_api_key - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend, - api_key as api_key - FROM spend_logs - GROUP BY - day, api_key - """ - ) - if "daily_aggregated_spend_per_user_mv" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user_mv") - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_per_user_mv - TO daily_aggregated_spend_per_user - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend, - user as user - FROM spend_logs - GROUP BY - day, user - """ - ) - if "daily_aggregated_spend_mv" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_mv") - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_mv - TO daily_aggregated_spend - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend - FROM spend_logs - GROUP BY - day - """ - ) - - -def _create_clickhouse_aggregate_tables(client=None, table_names=[]): - # Basic Logging works without this - this is only used for low latency reporting apis - verbose_logger.debug("Clickhouse: Creating Aggregate Tables") - - # Create Aggregeate Tables if they don't exist - if "daily_aggregated_spend_per_model" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model") - client.command( - """ - CREATE TABLE daily_aggregated_spend_per_model - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - `model` String - ) - ENGINE = SummingMergeTree() - ORDER BY (day, model); - """ - ) - if "daily_aggregated_spend_per_api_key" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_api_key") - client.command( - """ - CREATE TABLE daily_aggregated_spend_per_api_key - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - `api_key` String - ) - ENGINE = SummingMergeTree() - ORDER BY (day, api_key); - """ - ) - if "daily_aggregated_spend_per_user" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user") - client.command( - """ - CREATE TABLE daily_aggregated_spend_per_user - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - `user` String - ) - ENGINE = SummingMergeTree() - ORDER BY (day, user); - """ - ) - if "daily_aggregated_spend" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend") - client.command( - """ - CREATE TABLE daily_aggregated_spend - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - ) - ENGINE = SummingMergeTree() - ORDER BY (day); - """ - ) - return - - def _forecast_daily_cost(data: list): import requests # type: ignore from datetime import datetime, timedelta diff --git a/litellm/integrations/clickhouse.py b/litellm/integrations/clickhouse.py index f8b6b1bbf..eff1836ef 100644 --- a/litellm/integrations/clickhouse.py +++ b/litellm/integrations/clickhouse.py @@ -226,14 +226,6 @@ def _start_clickhouse(): response = client.query("DESCRIBE default.spend_logs") verbose_logger.debug(f"spend logs schema ={response.result_rows}") - # RUN Enterprise Clickhouse Setup - # TLDR: For Enterprise - we create views / aggregate tables for low latency reporting APIs - from litellm.proxy.enterprise.utils import _create_clickhouse_aggregate_tables - from litellm.proxy.enterprise.utils import _create_clickhouse_material_views - - _create_clickhouse_aggregate_tables(client=client, table_names=table_names) - _create_clickhouse_material_views(client=client, table_names=table_names) - class ClickhouseLogger: # Class variables or attributes