forked from phoenix/litellm-mirror
(feat) use clickhouse as a logging provider
This commit is contained in:
parent
13f1d7b96f
commit
8045053398
3 changed files with 64 additions and 13 deletions
|
@ -36,9 +36,13 @@ class ClickhouseLogger:
|
|||
f"ClickhouseLogger init, host {os.getenv('CLICKHOUSE_HOST')}, port {os.getenv('CLICKHOUSE_PORT')}, username {os.getenv('CLICKHOUSE_USERNAME')}"
|
||||
)
|
||||
|
||||
port = os.getenv("CLICKHOUSE_PORT")
|
||||
if port is not None and isinstance(port, str):
|
||||
port = int(port)
|
||||
|
||||
client = clickhouse_connect.get_client(
|
||||
host=os.getenv("CLICKHOUSE_HOST"),
|
||||
port=os.getenv("CLICKHOUSE_PORT"),
|
||||
port=port,
|
||||
username=os.getenv("CLICKHOUSE_USERNAME"),
|
||||
password=os.getenv("CLICKHOUSE_PASSWORD"),
|
||||
)
|
||||
|
@ -79,21 +83,34 @@ class ClickhouseLogger:
|
|||
# Build the initial payload
|
||||
|
||||
# Ensure everything in the payload is converted to str
|
||||
for key, value in payload.items():
|
||||
try:
|
||||
payload[key] = str(value)
|
||||
except:
|
||||
# non blocking if it can't cast to a str
|
||||
pass
|
||||
# for key, value in payload.items():
|
||||
# try:
|
||||
# print("key=", key, "type=", type(value))
|
||||
# # payload[key] = str(value)
|
||||
# except:
|
||||
# # non blocking if it can't cast to a str
|
||||
# pass
|
||||
|
||||
print_verbose(f"\nGeneric Logger - Logging payload = {payload}")
|
||||
print_verbose(f"\nClickhouse Logger - Logging payload = {payload}")
|
||||
|
||||
# just get the payload items in one array and payload keys in 2nd array
|
||||
values = []
|
||||
keys = []
|
||||
for key, value in payload.items():
|
||||
keys.append(key)
|
||||
values.append(value)
|
||||
data = [values]
|
||||
|
||||
# print("logging data=", data)
|
||||
# print("logging keys=", keys)
|
||||
|
||||
response = self.client.insert("spend_logs", data, column_names=keys)
|
||||
|
||||
# make request to endpoint with payload
|
||||
# print_verbose(
|
||||
# f"Generic Logger - final response status = {response_status}, response text = {response_text}"
|
||||
# )
|
||||
# return response
|
||||
print_verbose(
|
||||
f"Clickhouse Logger - final response status = {response_status}, response text = {response_text}"
|
||||
)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
verbose_logger.debug(f"Generic - {str(e)}\n{traceback.format_exc()}")
|
||||
verbose_logger.debug(f"Clickhouse - {str(e)}\n{traceback.format_exc()}")
|
||||
pass
|
||||
|
|
|
@ -1360,6 +1360,7 @@ def get_logging_payload(kwargs, response_obj, start_time, end_time):
|
|||
"user": kwargs.get("user", ""),
|
||||
"metadata": metadata,
|
||||
"cache_key": cache_key,
|
||||
"spend": kwargs.get("response_cost", 0),
|
||||
"total_tokens": usage.get("total_tokens", 0),
|
||||
"prompt_tokens": usage.get("prompt_tokens", 0),
|
||||
"completion_tokens": usage.get("completion_tokens", 0),
|
||||
|
|
|
@ -68,6 +68,7 @@ from .integrations.custom_logger import CustomLogger
|
|||
from .integrations.langfuse import LangFuseLogger
|
||||
from .integrations.dynamodb import DyanmoDBLogger
|
||||
from .integrations.s3 import S3Logger
|
||||
from .integrations.clickhouse import ClickhouseLogger
|
||||
from .integrations.litedebugger import LiteDebugger
|
||||
from .proxy._types import KeyManagementSystem
|
||||
from openai import OpenAIError as OriginalError
|
||||
|
@ -124,6 +125,7 @@ langFuseLogger = None
|
|||
dynamoLogger = None
|
||||
s3Logger = None
|
||||
genericAPILogger = None
|
||||
clickHouseLogger = None
|
||||
llmonitorLogger = None
|
||||
aispendLogger = None
|
||||
berrispendLogger = None
|
||||
|
@ -1413,6 +1415,37 @@ class Logging:
|
|||
user_id=kwargs.get("user", None),
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
if callback == "clickhouse":
|
||||
global clickHouseLogger
|
||||
verbose_logger.debug("reaches clickhouse for success logging!")
|
||||
kwargs = {}
|
||||
for k, v in self.model_call_details.items():
|
||||
if (
|
||||
k != "original_response"
|
||||
): # copy.deepcopy raises errors as this could be a coroutine
|
||||
kwargs[k] = v
|
||||
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
|
||||
if self.stream:
|
||||
verbose_logger.debug(
|
||||
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}"
|
||||
)
|
||||
if complete_streaming_response is None:
|
||||
break
|
||||
else:
|
||||
print_verbose(
|
||||
"reaches clickhouse for streaming logging!"
|
||||
)
|
||||
result = kwargs["complete_streaming_response"]
|
||||
if clickHouseLogger is None:
|
||||
clickHouseLogger = ClickhouseLogger()
|
||||
clickHouseLogger.log_event(
|
||||
kwargs=kwargs,
|
||||
response_obj=result,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
user_id=kwargs.get("user", None),
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
if callback == "cache" and litellm.cache is not None:
|
||||
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
|
||||
print_verbose("success_callback: reaches cache for logging!")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue