diff --git a/cookbook/misc/clickhouse.py b/cookbook/misc/clickhouse.py new file mode 100644 index 000000000..b40a9346b --- /dev/null +++ b/cookbook/misc/clickhouse.py @@ -0,0 +1,72 @@ +import clickhouse_connect +import datetime as datetime +import os + +client = clickhouse_connect.get_client( + host=os.getenv("CLICKHOUSE_HOST"), + port=int(os.getenv("CLICKHOUSE_PORT")), + username=os.getenv("CLICKHOUSE_USERNAME"), + password=os.getenv("CLICKHOUSE_PASSWORD"), +) +import clickhouse_connect + +row1 = [ + "ishaan", # request_id + "GET", # call_type + "api_key_123", # api_key + 50.00, # spend + 1000, # total_tokens + 800, # prompt_tokens + 200, # completion_tokens + datetime.datetime.now(), # startTime (replace with the actual timestamp) + datetime.datetime.now(), # endTime (replace with the actual timestamp) + "gpt-3.5", # model + "user123", # user + '{"key": "value"}', # metadata (replace with valid JSON) + "True", # cache_hit + "cache_key_123", # cache_key + "tag1,tag2", # request_tags +] + +row2 = [ + "jaffer", # request_id + "POST", # call_type + "api_key_456", # api_key + 30.50, # spend + 800, # total_tokens + 600, # prompt_tokens + 200, # completion_tokens + datetime.datetime.now(), # startTime (replace with the actual timestamp) + datetime.datetime.now(), # endTime (replace with the actual timestamp) + "gpt-4.0", # model + "user456", # user + '{"key": "value"}', # metadata (replace with valid JSON) + "False", # cache_hit + "cache_key_789", # cache_key + "tag3,tag4", # request_tags +] + +data = [row1, row2] +resp = client.insert( + "spend_logs", + data, + column_names=[ + "request_id", + "call_type", + "api_key", + "spend", + "total_tokens", + "prompt_tokens", + "completion_tokens", + "startTime", + "endTime", + "model", + "user", + "metadata", + "cache_hit", + "cache_key", + "request_tags", + ], +) + +print(resp) diff --git a/litellm/integrations/clickhouse.py b/litellm/integrations/clickhouse.py new file mode 100644 index 000000000..fde831a36 --- /dev/null +++ b/litellm/integrations/clickhouse.py @@ -0,0 +1,116 @@ +# callback to make a request to an API endpoint + +#### What this does #### +# On success, logs events to Promptlayer +import dotenv, os +import requests + +from litellm.proxy._types import UserAPIKeyAuth +from litellm.caching import DualCache + +from typing import Literal, Union + +dotenv.load_dotenv() # Loading env variables using dotenv +import traceback + + +#### What this does #### +# On success + failure, log events to Supabase + +import dotenv, os +import requests + +dotenv.load_dotenv() # Loading env variables using dotenv +import traceback +import datetime, subprocess, sys +import litellm, uuid +from litellm._logging import print_verbose, verbose_logger + + +class ClickhouseLogger: + # Class variables or attributes + def __init__(self, endpoint=None, headers=None): + import clickhouse_connect + + print_verbose( + f"ClickhouseLogger init, host {os.getenv('CLICKHOUSE_HOST')}, port {os.getenv('CLICKHOUSE_PORT')}, username {os.getenv('CLICKHOUSE_USERNAME')}" + ) + + port = os.getenv("CLICKHOUSE_PORT") + if port is not None and isinstance(port, str): + port = int(port) + + client = clickhouse_connect.get_client( + host=os.getenv("CLICKHOUSE_HOST"), + port=port, + username=os.getenv("CLICKHOUSE_USERNAME"), + password=os.getenv("CLICKHOUSE_PASSWORD"), + ) + self.client = client + + # This is sync, because we run this in a separate thread. Running in a sepearate thread ensures it will never block an LLM API call + # Experience with s3, Langfuse shows that async logging events are complicated and can block LLM calls + def log_event( + self, kwargs, response_obj, start_time, end_time, user_id, print_verbose + ): + try: + verbose_logger.debug( + f"ClickhouseLogger Logging - Enters logging function for model {kwargs}" + ) + + # construct payload to send custom logger + # follows the same params as langfuse.py + litellm_params = kwargs.get("litellm_params", {}) + metadata = ( + litellm_params.get("metadata", {}) or {} + ) # if litellm_params['metadata'] == None + messages = kwargs.get("messages") + cost = kwargs.get("response_cost", 0.0) + optional_params = kwargs.get("optional_params", {}) + call_type = kwargs.get("call_type", "litellm.completion") + cache_hit = kwargs.get("cache_hit", False) + usage = response_obj["usage"] + id = response_obj.get("id", str(uuid.uuid4())) + + from litellm.proxy.utils import get_logging_payload + + payload = get_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + # Build the initial payload + + # Ensure everything in the payload is converted to str + # for key, value in payload.items(): + # try: + # print("key=", key, "type=", type(value)) + # # payload[key] = str(value) + # except: + # # non blocking if it can't cast to a str + # pass + + print_verbose(f"\nClickhouse Logger - Logging payload = {payload}") + + # just get the payload items in one array and payload keys in 2nd array + values = [] + keys = [] + for key, value in payload.items(): + keys.append(key) + values.append(value) + data = [values] + + # print("logging data=", data) + # print("logging keys=", keys) + + response = self.client.insert("spend_logs", data, column_names=keys) + + # make request to endpoint with payload + print_verbose( + f"Clickhouse Logger - final response status = {response_status}, response text = {response_text}" + ) + except Exception as e: + traceback.print_exc() + verbose_logger.debug(f"Clickhouse - {str(e)}\n{traceback.format_exc()}") + pass diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 868977885..8defd918c 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -1369,6 +1369,7 @@ def get_logging_payload(kwargs, response_obj, start_time, end_time): "user": kwargs.get("user", ""), "metadata": metadata, "cache_key": cache_key, + "spend": kwargs.get("response_cost", 0), "total_tokens": usage.get("total_tokens", 0), "prompt_tokens": usage.get("prompt_tokens", 0), "completion_tokens": usage.get("completion_tokens", 0), diff --git a/litellm/tests/test_clickhouse_logger.py b/litellm/tests/test_clickhouse_logger.py new file mode 100644 index 000000000..ab9a72a38 --- /dev/null +++ b/litellm/tests/test_clickhouse_logger.py @@ -0,0 +1,38 @@ +import sys +import os +import io, asyncio + +# import logging +# logging.basicConfig(level=logging.DEBUG) +sys.path.insert(0, os.path.abspath("../..")) +print("Modified sys.path:", sys.path) + + +from litellm import completion +import litellm + +litellm.num_retries = 3 + +import time, random +import pytest + + +@pytest.mark.asyncio +async def test_custom_api_logging(): + try: + litellm.success_callback = ["clickhouse"] + litellm.set_verbose = True + await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": f"This is a test"}], + max_tokens=10, + temperature=0.7, + user="ishaan-2", + ) + + except Exception as e: + pytest.fail(f"An exception occurred - {e}") + finally: + # post, close log file and verify + # Reset stdout to the original value + print("Passed!") diff --git a/litellm/utils.py b/litellm/utils.py index 38ca669fc..7de5199b4 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -68,6 +68,7 @@ from .integrations.custom_logger import CustomLogger from .integrations.langfuse import LangFuseLogger from .integrations.dynamodb import DyanmoDBLogger from .integrations.s3 import S3Logger +from .integrations.clickhouse import ClickhouseLogger from .integrations.litedebugger import LiteDebugger from .proxy._types import KeyManagementSystem from openai import OpenAIError as OriginalError @@ -124,6 +125,7 @@ langFuseLogger = None dynamoLogger = None s3Logger = None genericAPILogger = None +clickHouseLogger = None llmonitorLogger = None aispendLogger = None berrispendLogger = None @@ -1413,6 +1415,37 @@ class Logging: user_id=kwargs.get("user", None), print_verbose=print_verbose, ) + if callback == "clickhouse": + global clickHouseLogger + verbose_logger.debug("reaches clickhouse for success logging!") + kwargs = {} + for k, v in self.model_call_details.items(): + if ( + k != "original_response" + ): # copy.deepcopy raises errors as this could be a coroutine + kwargs[k] = v + # this only logs streaming once, complete_streaming_response exists i.e when stream ends + if self.stream: + verbose_logger.debug( + f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" + ) + if complete_streaming_response is None: + break + else: + print_verbose( + "reaches clickhouse for streaming logging!" + ) + result = kwargs["complete_streaming_response"] + if clickHouseLogger is None: + clickHouseLogger = ClickhouseLogger() + clickHouseLogger.log_event( + kwargs=kwargs, + response_obj=result, + start_time=start_time, + end_time=end_time, + user_id=kwargs.get("user", None), + print_verbose=print_verbose, + ) if callback == "cache" and litellm.cache is not None: # this only logs streaming once, complete_streaming_response exists i.e when stream ends print_verbose("success_callback: reaches cache for logging!")