diff --git a/litellm/integrations/clickhouse.py b/litellm/integrations/clickhouse.py index 82ecd3256..700423664 100644 --- a/litellm/integrations/clickhouse.py +++ b/litellm/integrations/clickhouse.py @@ -27,6 +27,126 @@ import litellm, uuid from litellm._logging import print_verbose, verbose_logger +def create_client(): + try: + import clickhouse_connect + + port = os.getenv("CLICKHOUSE_PORT") + clickhouse_host = os.getenv("CLICKHOUSE_HOST") + if clickhouse_host is not None: + verbose_logger.debug("setting up clickhouse") + if port is not None and isinstance(port, str): + port = int(port) + + client = clickhouse_connect.get_client( + host=os.getenv("CLICKHOUSE_HOST"), + port=port, + username=os.getenv("CLICKHOUSE_USERNAME"), + password=os.getenv("CLICKHOUSE_PASSWORD"), + ) + return client + else: + raise Exception("Clickhouse: Clickhouse host not set") + except Exception as e: + raise ValueError(f"Clickhouse: {e}") + + +def build_daily_metrics(): + click_house_client = create_client() + + # get daily spend + daily_spend = click_house_client.query_df( + """ + SELECT sumMerge(DailySpend) as daily_spend, day FROM daily_aggregated_spend GROUP BY day + """ + ) + + # get daily spend per model + + daily_spend_per_model = click_house_client.query_df( + """ + SELECT sumMerge(DailySpend) as daily_spend, day, model FROM daily_aggregated_spend_per_model GROUP BY day, model + """ + ) + new_df = daily_spend_per_model.to_dict(orient="records") + import pandas as pd + + df = pd.DataFrame(new_df) + # Group by 'day' and create a dictionary for each group + result_dict = {} + for day, group in df.groupby("day"): + models = group["model"].tolist() + spend = group["daily_spend"].tolist() + spend_per_model = {model: spend for model, spend in zip(models, spend)} + result_dict[day] = spend_per_model + + # Display the resulting dictionary + + daily_spend_per_api_key = click_house_client.query_df( + """ + SELECT + daily_spend, + day, + api_key + FROM ( + SELECT + sumMerge(DailySpend) as daily_spend, + day, + api_key, + RANK() OVER (PARTITION BY day ORDER BY sumMerge(DailySpend) DESC) as spend_rank + FROM + daily_aggregated_spend_per_api_key + GROUP BY + day, + api_key + ) AS ranked_api_keys + WHERE + spend_rank <= 5 + AND day IS NOT NULL + ORDER BY + day, + daily_spend DESC + """ + ) + new_df = daily_spend_per_api_key.to_dict(orient="records") + import pandas as pd + + df = pd.DataFrame(new_df) + # Group by 'day' and create a dictionary for each group + api_key_result_dict = {} + for day, group in df.groupby("day"): + api_keys = group["api_key"].tolist() + spend = group["daily_spend"].tolist() + spend_per_api_key = {api_key: spend for api_key, spend in zip(api_keys, spend)} + api_key_result_dict[day] = spend_per_api_key + + # Display the resulting dictionary + + # for each day in daily spend, look up the day in result_dict and api_key_result_dict + # Assuming daily_spend DataFrame has 'day' column + result = [] + for index, row in daily_spend.iterrows(): + day = row["day"] + data_day = row.to_dict() + + # Look up in result_dict + if day in result_dict: + spend_per_model = result_dict[day] + # Assuming there is a column named 'model' in daily_spend + data_day["spend_per_model"] = spend_per_model # Assign 0 if model not found + + # Look up in api_key_result_dict + if day in api_key_result_dict: + spend_per_api_key = api_key_result_dict[day] + # Assuming there is a column named 'api_key' in daily_spend + data_day["spend_per_api_key"] = spend_per_api_key + + result.append(data_day) + + # print("FINAL daily metric", result) + return result + + def _create_clickhouse_material_views(client=None, table_names=[]): # Create Materialized Views if they don't exist # Materialized Views send new inserted rows to the aggregate tables diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 6b9e3f644..d4b261817 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -3991,6 +3991,62 @@ async def view_spend_logs( ) +@router.get( + "/daily_metrics", + summary="Get daily spend metrics", + tags=["budget & spend Tracking"], + dependencies=[Depends(user_api_key_auth)], +) +async def view_daily_metrics( + start_date: Optional[str] = fastapi.Query( + default=None, + description="Time from which to start viewing key spend", + ), + end_date: Optional[str] = fastapi.Query( + default=None, + description="Time till which to view key spend", + ), + metric_type: Optional[Literal["per_api_key", "per_user", "per_model"]] = None, +): + """ """ + try: + if os.getenv("CLICKHOUSE_HOST") is not None: + # gettting spend logs from clickhouse + from litellm.integrations import clickhouse + + return clickhouse.build_daily_metrics() + + # create a response object + """ + { + "date": "2022-01-01", + "spend": 0.0, + "users": {}, + "models": {} + } + """ + else: + raise Exception( + "Clickhouse: Clickhouse host not set. Required for viewing /daily/metrics" + ) + except Exception as e: + if isinstance(e, HTTPException): + raise ProxyException( + message=getattr(e, "detail", f"/spend/logs Error({str(e)})"), + type="internal_error", + param=getattr(e, "param", "None"), + code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR), + ) + elif isinstance(e, ProxyException): + raise e + raise ProxyException( + message="/spend/logs Error" + str(e), + type="internal_error", + param=getattr(e, "param", "None"), + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + #### USER MANAGEMENT #### @router.post( "/user/new",