Merge pull request #2187 from BerriAI/litellm_clickhouse_logs

[FEAT] Use Logging on clickhouse
This commit is contained in:
Ishaan Jaff 2024-02-26 08:26:02 -08:00 committed by GitHub
commit 4067d4f1e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 260 additions and 0 deletions

View file

@ -0,0 +1,72 @@
import clickhouse_connect
import datetime as datetime
import os
client = clickhouse_connect.get_client(
host=os.getenv("CLICKHOUSE_HOST"),
port=int(os.getenv("CLICKHOUSE_PORT")),
username=os.getenv("CLICKHOUSE_USERNAME"),
password=os.getenv("CLICKHOUSE_PASSWORD"),
)
import clickhouse_connect
row1 = [
"ishaan", # request_id
"GET", # call_type
"api_key_123", # api_key
50.00, # spend
1000, # total_tokens
800, # prompt_tokens
200, # completion_tokens
datetime.datetime.now(), # startTime (replace with the actual timestamp)
datetime.datetime.now(), # endTime (replace with the actual timestamp)
"gpt-3.5", # model
"user123", # user
'{"key": "value"}', # metadata (replace with valid JSON)
"True", # cache_hit
"cache_key_123", # cache_key
"tag1,tag2", # request_tags
]
row2 = [
"jaffer", # request_id
"POST", # call_type
"api_key_456", # api_key
30.50, # spend
800, # total_tokens
600, # prompt_tokens
200, # completion_tokens
datetime.datetime.now(), # startTime (replace with the actual timestamp)
datetime.datetime.now(), # endTime (replace with the actual timestamp)
"gpt-4.0", # model
"user456", # user
'{"key": "value"}', # metadata (replace with valid JSON)
"False", # cache_hit
"cache_key_789", # cache_key
"tag3,tag4", # request_tags
]
data = [row1, row2]
resp = client.insert(
"spend_logs",
data,
column_names=[
"request_id",
"call_type",
"api_key",
"spend",
"total_tokens",
"prompt_tokens",
"completion_tokens",
"startTime",
"endTime",
"model",
"user",
"metadata",
"cache_hit",
"cache_key",
"request_tags",
],
)
print(resp)

View file

@ -0,0 +1,116 @@
# callback to make a request to an API endpoint
#### What this does ####
# On success, logs events to Promptlayer
import dotenv, os
import requests
from litellm.proxy._types import UserAPIKeyAuth
from litellm.caching import DualCache
from typing import Literal, Union
dotenv.load_dotenv() # Loading env variables using dotenv
import traceback
#### What this does ####
# On success + failure, log events to Supabase
import dotenv, os
import requests
dotenv.load_dotenv() # Loading env variables using dotenv
import traceback
import datetime, subprocess, sys
import litellm, uuid
from litellm._logging import print_verbose, verbose_logger
class ClickhouseLogger:
# Class variables or attributes
def __init__(self, endpoint=None, headers=None):
import clickhouse_connect
print_verbose(
f"ClickhouseLogger init, host {os.getenv('CLICKHOUSE_HOST')}, port {os.getenv('CLICKHOUSE_PORT')}, username {os.getenv('CLICKHOUSE_USERNAME')}"
)
port = os.getenv("CLICKHOUSE_PORT")
if port is not None and isinstance(port, str):
port = int(port)
client = clickhouse_connect.get_client(
host=os.getenv("CLICKHOUSE_HOST"),
port=port,
username=os.getenv("CLICKHOUSE_USERNAME"),
password=os.getenv("CLICKHOUSE_PASSWORD"),
)
self.client = client
# This is sync, because we run this in a separate thread. Running in a sepearate thread ensures it will never block an LLM API call
# Experience with s3, Langfuse shows that async logging events are complicated and can block LLM calls
def log_event(
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
):
try:
verbose_logger.debug(
f"ClickhouseLogger Logging - Enters logging function for model {kwargs}"
)
# construct payload to send custom logger
# follows the same params as langfuse.py
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
messages = kwargs.get("messages")
cost = kwargs.get("response_cost", 0.0)
optional_params = kwargs.get("optional_params", {})
call_type = kwargs.get("call_type", "litellm.completion")
cache_hit = kwargs.get("cache_hit", False)
usage = response_obj["usage"]
id = response_obj.get("id", str(uuid.uuid4()))
from litellm.proxy.utils import get_logging_payload
payload = get_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
# Build the initial payload
# Ensure everything in the payload is converted to str
# for key, value in payload.items():
# try:
# print("key=", key, "type=", type(value))
# # payload[key] = str(value)
# except:
# # non blocking if it can't cast to a str
# pass
print_verbose(f"\nClickhouse Logger - Logging payload = {payload}")
# just get the payload items in one array and payload keys in 2nd array
values = []
keys = []
for key, value in payload.items():
keys.append(key)
values.append(value)
data = [values]
# print("logging data=", data)
# print("logging keys=", keys)
response = self.client.insert("spend_logs", data, column_names=keys)
# make request to endpoint with payload
print_verbose(
f"Clickhouse Logger - final response status = {response_status}, response text = {response_text}"
)
except Exception as e:
traceback.print_exc()
verbose_logger.debug(f"Clickhouse - {str(e)}\n{traceback.format_exc()}")
pass

View file

@ -1369,6 +1369,7 @@ def get_logging_payload(kwargs, response_obj, start_time, end_time):
"user": kwargs.get("user", ""), "user": kwargs.get("user", ""),
"metadata": metadata, "metadata": metadata,
"cache_key": cache_key, "cache_key": cache_key,
"spend": kwargs.get("response_cost", 0),
"total_tokens": usage.get("total_tokens", 0), "total_tokens": usage.get("total_tokens", 0),
"prompt_tokens": usage.get("prompt_tokens", 0), "prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0), "completion_tokens": usage.get("completion_tokens", 0),

View file

@ -0,0 +1,38 @@
import sys
import os
import io, asyncio
# import logging
# logging.basicConfig(level=logging.DEBUG)
sys.path.insert(0, os.path.abspath("../.."))
print("Modified sys.path:", sys.path)
from litellm import completion
import litellm
litellm.num_retries = 3
import time, random
import pytest
@pytest.mark.asyncio
async def test_custom_api_logging():
try:
litellm.success_callback = ["clickhouse"]
litellm.set_verbose = True
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"This is a test"}],
max_tokens=10,
temperature=0.7,
user="ishaan-2",
)
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
finally:
# post, close log file and verify
# Reset stdout to the original value
print("Passed!")

View file

@ -68,6 +68,7 @@ from .integrations.custom_logger import CustomLogger
from .integrations.langfuse import LangFuseLogger from .integrations.langfuse import LangFuseLogger
from .integrations.dynamodb import DyanmoDBLogger from .integrations.dynamodb import DyanmoDBLogger
from .integrations.s3 import S3Logger from .integrations.s3 import S3Logger
from .integrations.clickhouse import ClickhouseLogger
from .integrations.litedebugger import LiteDebugger from .integrations.litedebugger import LiteDebugger
from .proxy._types import KeyManagementSystem from .proxy._types import KeyManagementSystem
from openai import OpenAIError as OriginalError from openai import OpenAIError as OriginalError
@ -124,6 +125,7 @@ langFuseLogger = None
dynamoLogger = None dynamoLogger = None
s3Logger = None s3Logger = None
genericAPILogger = None genericAPILogger = None
clickHouseLogger = None
llmonitorLogger = None llmonitorLogger = None
aispendLogger = None aispendLogger = None
berrispendLogger = None berrispendLogger = None
@ -1413,6 +1415,37 @@ class Logging:
user_id=kwargs.get("user", None), user_id=kwargs.get("user", None),
print_verbose=print_verbose, print_verbose=print_verbose,
) )
if callback == "clickhouse":
global clickHouseLogger
verbose_logger.debug("reaches clickhouse for success logging!")
kwargs = {}
for k, v in self.model_call_details.items():
if (
k != "original_response"
): # copy.deepcopy raises errors as this could be a coroutine
kwargs[k] = v
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
if self.stream:
verbose_logger.debug(
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}"
)
if complete_streaming_response is None:
break
else:
print_verbose(
"reaches clickhouse for streaming logging!"
)
result = kwargs["complete_streaming_response"]
if clickHouseLogger is None:
clickHouseLogger = ClickhouseLogger()
clickHouseLogger.log_event(
kwargs=kwargs,
response_obj=result,
start_time=start_time,
end_time=end_time,
user_id=kwargs.get("user", None),
print_verbose=print_verbose,
)
if callback == "cache" and litellm.cache is not None: if callback == "cache" and litellm.cache is not None:
# this only logs streaming once, complete_streaming_response exists i.e when stream ends # this only logs streaming once, complete_streaming_response exists i.e when stream ends
print_verbose("success_callback: reaches cache for logging!") print_verbose("success_callback: reaches cache for logging!")