From 9ad8b7c8fb003662c5c95549f97e7edaae94ee2c Mon Sep 17 00:00:00 2001 From: ishaan-jaff Date: Tue, 27 Feb 2024 19:41:01 -0800 Subject: [PATCH] (chore) move clickhouse to correct directory --- enterprise/utils.py | 135 ++++++++++++++++++++++++++++ litellm/integrations/clickhouse.py | 138 +---------------------------- 2 files changed, 138 insertions(+), 135 deletions(-) diff --git a/enterprise/utils.py b/enterprise/utils.py index f4916b689..3b5a90fc0 100644 --- a/enterprise/utils.py +++ b/enterprise/utils.py @@ -110,3 +110,138 @@ async def view_spend_logs_from_clickhouse( "log_count": num_rows, } return response_data + + +def _create_clickhouse_material_views(client=None, table_names=[]): + # Create Materialized Views if they don't exist + # Materialized Views send new inserted rows to the aggregate tables + + verbose_logger.debug("Clickhouse: Creating Materialized Views") + if "daily_aggregated_spend_per_model_mv" not in table_names: + verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model_mv") + client.command( + """ + CREATE MATERIALIZED VIEW daily_aggregated_spend_per_model_mv + TO daily_aggregated_spend_per_model + AS + SELECT + toDate(startTime) as day, + sumState(spend) AS DailySpend, + model as model + FROM spend_logs + GROUP BY + day, model + """ + ) + if "daily_aggregated_spend_per_api_key_mv" not in table_names: + verbose_logger.debug( + "Clickhouse: Creating daily_aggregated_spend_per_api_key_mv" + ) + client.command( + """ + CREATE MATERIALIZED VIEW daily_aggregated_spend_per_api_key_mv + TO daily_aggregated_spend_per_api_key + AS + SELECT + toDate(startTime) as day, + sumState(spend) AS DailySpend, + api_key as api_key + FROM spend_logs + GROUP BY + day, api_key + """ + ) + if "daily_aggregated_spend_per_user_mv" not in table_names: + verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user_mv") + client.command( + """ + CREATE MATERIALIZED VIEW daily_aggregated_spend_per_user_mv + TO daily_aggregated_spend_per_user + AS + SELECT + toDate(startTime) as day, + sumState(spend) AS DailySpend, + user as user + FROM spend_logs + GROUP BY + day, user + """ + ) + if "daily_aggregated_spend_mv" not in table_names: + verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_mv") + client.command( + """ + CREATE MATERIALIZED VIEW daily_aggregated_spend_mv + TO daily_aggregated_spend + AS + SELECT + toDate(startTime) as day, + sumState(spend) AS DailySpend + FROM spend_logs + GROUP BY + day + """ + ) + + +def _create_clickhouse_aggregate_tables(client=None, table_names=[]): + # Basic Logging works without this - this is only used for low latency reporting apis + verbose_logger.debug("Clickhouse: Creating Aggregate Tables") + + # Create Aggregeate Tables if they don't exist + if "daily_aggregated_spend_per_model" not in table_names: + verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model") + client.command( + """ + CREATE TABLE daily_aggregated_spend_per_model + ( + `day` Date, + `DailySpend` AggregateFunction(sum, Float64), + `model` String + ) + ENGINE = SummingMergeTree() + ORDER BY (day, model); + """ + ) + if "daily_aggregated_spend_per_api_key" not in table_names: + verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_api_key") + client.command( + """ + CREATE TABLE daily_aggregated_spend_per_api_key + ( + `day` Date, + `DailySpend` AggregateFunction(sum, Float64), + `api_key` String + ) + ENGINE = SummingMergeTree() + ORDER BY (day, api_key); + """ + ) + if "daily_aggregated_spend_per_user" not in table_names: + verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user") + client.command( + """ + CREATE TABLE daily_aggregated_spend_per_user + ( + `day` Date, + `DailySpend` AggregateFunction(sum, Float64), + `user` String + ) + ENGINE = SummingMergeTree() + ORDER BY (day, user); + """ + ) + if "daily_aggregated_spend" not in table_names: + verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend") + client.command( + """ + CREATE TABLE daily_aggregated_spend + ( + `day` Date, + `DailySpend` AggregateFunction(sum, Float64), + ) + ENGINE = SummingMergeTree() + ORDER BY (day); + """ + ) + return diff --git a/litellm/integrations/clickhouse.py b/litellm/integrations/clickhouse.py index 82ecd3256..a2a0b800e 100644 --- a/litellm/integrations/clickhouse.py +++ b/litellm/integrations/clickhouse.py @@ -27,141 +27,6 @@ import litellm, uuid from litellm._logging import print_verbose, verbose_logger -def _create_clickhouse_material_views(client=None, table_names=[]): - # Create Materialized Views if they don't exist - # Materialized Views send new inserted rows to the aggregate tables - - verbose_logger.debug("Clickhouse: Creating Materialized Views") - if "daily_aggregated_spend_per_model_mv" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model_mv") - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_per_model_mv - TO daily_aggregated_spend_per_model - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend, - model as model - FROM spend_logs - GROUP BY - day, model - """ - ) - if "daily_aggregated_spend_per_api_key_mv" not in table_names: - verbose_logger.debug( - "Clickhouse: Creating daily_aggregated_spend_per_api_key_mv" - ) - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_per_api_key_mv - TO daily_aggregated_spend_per_api_key - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend, - api_key as api_key - FROM spend_logs - GROUP BY - day, api_key - """ - ) - if "daily_aggregated_spend_per_user_mv" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user_mv") - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_per_user_mv - TO daily_aggregated_spend_per_user - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend, - user as user - FROM spend_logs - GROUP BY - day, user - """ - ) - if "daily_aggregated_spend_mv" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_mv") - client.command( - """ - CREATE MATERIALIZED VIEW daily_aggregated_spend_mv - TO daily_aggregated_spend - AS - SELECT - toDate(startTime) as day, - sumState(spend) AS DailySpend - FROM spend_logs - GROUP BY - day - """ - ) - - -def _create_clickhouse_aggregate_tables(client=None, table_names=[]): - # Basic Logging works without this - this is only used for low latency reporting apis - verbose_logger.debug("Clickhouse: Creating Aggregate Tables") - - # Create Aggregeate Tables if they don't exist - if "daily_aggregated_spend_per_model" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_model") - client.command( - """ - CREATE TABLE daily_aggregated_spend_per_model - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - `model` String - ) - ENGINE = SummingMergeTree() - ORDER BY (day, model); - """ - ) - if "daily_aggregated_spend_per_api_key" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_api_key") - client.command( - """ - CREATE TABLE daily_aggregated_spend_per_api_key - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - `api_key` String - ) - ENGINE = SummingMergeTree() - ORDER BY (day, api_key); - """ - ) - if "daily_aggregated_spend_per_user" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend_per_user") - client.command( - """ - CREATE TABLE daily_aggregated_spend_per_user - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - `user` String - ) - ENGINE = SummingMergeTree() - ORDER BY (day, user); - """ - ) - if "daily_aggregated_spend" not in table_names: - verbose_logger.debug("Clickhouse: Creating daily_aggregated_spend") - client.command( - """ - CREATE TABLE daily_aggregated_spend - ( - `day` Date, - `DailySpend` AggregateFunction(sum, Float64), - ) - ENGINE = SummingMergeTree() - ORDER BY (day); - """ - ) - return - - def _start_clickhouse(): import clickhouse_connect @@ -223,6 +88,9 @@ def _start_clickhouse(): # RUN Enterprise Clickhouse Setup # TLDR: For Enterprise - we create views / aggregate tables for low latency reporting APIs + from litellm.proxy.enterprise.utils import _create_clickhouse_aggregate_tables + from litellm.proxy.enterprise.utils import _create_clickhouse_material_views + _create_clickhouse_aggregate_tables(client=client, table_names=table_names) _create_clickhouse_material_views(client=client, table_names=table_names)