diff --git a/litellm/integrations/clickhouse.py b/litellm/integrations/clickhouse.py index 3b9fea9a7..fde831a36 100644 --- a/litellm/integrations/clickhouse.py +++ b/litellm/integrations/clickhouse.py @@ -36,9 +36,13 @@ class ClickhouseLogger: f"ClickhouseLogger init, host {os.getenv('CLICKHOUSE_HOST')}, port {os.getenv('CLICKHOUSE_PORT')}, username {os.getenv('CLICKHOUSE_USERNAME')}" ) + port = os.getenv("CLICKHOUSE_PORT") + if port is not None and isinstance(port, str): + port = int(port) + client = clickhouse_connect.get_client( host=os.getenv("CLICKHOUSE_HOST"), - port=os.getenv("CLICKHOUSE_PORT"), + port=port, username=os.getenv("CLICKHOUSE_USERNAME"), password=os.getenv("CLICKHOUSE_PASSWORD"), ) @@ -79,21 +83,34 @@ class ClickhouseLogger: # Build the initial payload # Ensure everything in the payload is converted to str - for key, value in payload.items(): - try: - payload[key] = str(value) - except: - # non blocking if it can't cast to a str - pass + # for key, value in payload.items(): + # try: + # print("key=", key, "type=", type(value)) + # # payload[key] = str(value) + # except: + # # non blocking if it can't cast to a str + # pass - print_verbose(f"\nGeneric Logger - Logging payload = {payload}") + print_verbose(f"\nClickhouse Logger - Logging payload = {payload}") + + # just get the payload items in one array and payload keys in 2nd array + values = [] + keys = [] + for key, value in payload.items(): + keys.append(key) + values.append(value) + data = [values] + + # print("logging data=", data) + # print("logging keys=", keys) + + response = self.client.insert("spend_logs", data, column_names=keys) # make request to endpoint with payload - # print_verbose( - # f"Generic Logger - final response status = {response_status}, response text = {response_text}" - # ) - # return response + print_verbose( + f"Clickhouse Logger - final response status = {response_status}, response text = {response_text}" + ) except Exception as e: traceback.print_exc() - verbose_logger.debug(f"Generic - {str(e)}\n{traceback.format_exc()}") + verbose_logger.debug(f"Clickhouse - {str(e)}\n{traceback.format_exc()}") pass diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 9f9774412..f684c3f6f 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -1360,6 +1360,7 @@ def get_logging_payload(kwargs, response_obj, start_time, end_time): "user": kwargs.get("user", ""), "metadata": metadata, "cache_key": cache_key, + "spend": kwargs.get("response_cost", 0), "total_tokens": usage.get("total_tokens", 0), "prompt_tokens": usage.get("prompt_tokens", 0), "completion_tokens": usage.get("completion_tokens", 0), diff --git a/litellm/utils.py b/litellm/utils.py index 0f54a6ba5..d33f29d5c 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -68,6 +68,7 @@ from .integrations.custom_logger import CustomLogger from .integrations.langfuse import LangFuseLogger from .integrations.dynamodb import DyanmoDBLogger from .integrations.s3 import S3Logger +from .integrations.clickhouse import ClickhouseLogger from .integrations.litedebugger import LiteDebugger from .proxy._types import KeyManagementSystem from openai import OpenAIError as OriginalError @@ -124,6 +125,7 @@ langFuseLogger = None dynamoLogger = None s3Logger = None genericAPILogger = None +clickHouseLogger = None llmonitorLogger = None aispendLogger = None berrispendLogger = None @@ -1413,6 +1415,37 @@ class Logging: user_id=kwargs.get("user", None), print_verbose=print_verbose, ) + if callback == "clickhouse": + global clickHouseLogger + verbose_logger.debug("reaches clickhouse for success logging!") + kwargs = {} + for k, v in self.model_call_details.items(): + if ( + k != "original_response" + ): # copy.deepcopy raises errors as this could be a coroutine + kwargs[k] = v + # this only logs streaming once, complete_streaming_response exists i.e when stream ends + if self.stream: + verbose_logger.debug( + f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" + ) + if complete_streaming_response is None: + break + else: + print_verbose( + "reaches clickhouse for streaming logging!" + ) + result = kwargs["complete_streaming_response"] + if clickHouseLogger is None: + clickHouseLogger = ClickhouseLogger() + clickHouseLogger.log_event( + kwargs=kwargs, + response_obj=result, + start_time=start_time, + end_time=end_time, + user_id=kwargs.get("user", None), + print_verbose=print_verbose, + ) if callback == "cache" and litellm.cache is not None: # this only logs streaming once, complete_streaming_response exists i.e when stream ends print_verbose("success_callback: reaches cache for logging!")