Merge remote-tracking branch 'origin/main'

This commit is contained in:
Ishaan Jaff 2024-10-22 17:37:42 +05:30
commit 666741dbad
9 changed files with 25 additions and 604 deletions

View file

@ -1,72 +0,0 @@
import clickhouse_connect
import datetime as datetime
import os
client = clickhouse_connect.get_client(
host=os.getenv("CLICKHOUSE_HOST"),
port=int(os.getenv("CLICKHOUSE_PORT")),
username=os.getenv("CLICKHOUSE_USERNAME"),
password=os.getenv("CLICKHOUSE_PASSWORD"),
)
import clickhouse_connect
row1 = [
"ishaan", # request_id
"GET", # call_type
"api_key_123", # api_key
50.00, # spend
1000, # total_tokens
800, # prompt_tokens
200, # completion_tokens
datetime.datetime.now(), # startTime (replace with the actual timestamp)
datetime.datetime.now(), # endTime (replace with the actual timestamp)
"gpt-3.5", # model
"user123", # user
'{"key": "value"}', # metadata (replace with valid JSON)
"True", # cache_hit
"cache_key_123", # cache_key
"tag1,tag2", # request_tags
]
row2 = [
"jaffer", # request_id
"POST", # call_type
"api_key_456", # api_key
30.50, # spend
800, # total_tokens
600, # prompt_tokens
200, # completion_tokens
datetime.datetime.now(), # startTime (replace with the actual timestamp)
datetime.datetime.now(), # endTime (replace with the actual timestamp)
"gpt-4.0", # model
"user456", # user
'{"key": "value"}', # metadata (replace with valid JSON)
"False", # cache_hit
"cache_key_789", # cache_key
"tag3,tag4", # request_tags
]
data = [row1, row2]
resp = client.insert(
"spend_logs",
data,
column_names=[
"request_id",
"call_type",
"api_key",
"spend",
"total_tokens",
"prompt_tokens",
"completion_tokens",
"startTime",
"endTime",
"model",
"user",
"metadata",
"cache_hit",
"cache_key",
"request_tags",
],
)
print(resp)

View file

@ -1,39 +0,0 @@
# insert data into clickhouse
# response = client.command(
# """
# CREATE TEMPORARY TABLE temp_spend_logs AS (
# SELECT
# generateUUIDv4() AS request_id,
# arrayElement(['TypeA', 'TypeB', 'TypeC'], rand() % 3 + 1) AS call_type,
# 'ishaan' as api_key,
# rand() * 1000 AS spend,
# rand() * 100 AS total_tokens,
# rand() * 50 AS prompt_tokens,
# rand() * 50 AS completion_tokens,
# toDate('2024-02-01') + toIntervalDay(rand()%27) AS startTime,
# now() AS endTime,
# arrayElement(['azure/gpt-4', 'gpt-3.5', 'vertexai/gemini-pro', 'mistral/mistral-small', 'ollama/llama2'], rand() % 3 + 1) AS model,
# 'ishaan-insert-rand' as user,
# 'data' as metadata,
# 'true'AS cache_hit,
# 'ishaan' as cache_key,
# '{"tag1": "value1", "tag2": "value2"}' AS request_tags
# FROM numbers(1, 1000000)
# );
# """
# )
# client.command(
# """
# -- Insert data into spend_logs table
# INSERT INTO spend_logs
# SELECT * FROM temp_spend_logs;
# """
# )
# client.command(
# """
# DROP TABLE IF EXISTS temp_spend_logs;
# """
# )

View file

@ -777,7 +777,6 @@ general_settings:
| public_routes | List[str] | (Enterprise Feature) Control list of public routes |
| alert_types | List[str] | Control list of alert types to send to slack (Doc on alert types)[./alerting.md] |
| enforced_params | List[str] | (Enterprise Feature) List of params that must be included in all requests to the proxy |
| enable_oauth2_auth | boolean | (Enterprise Feature) If true, enables oauth2.0 authentication |
| use_x_forwarded_for | str | If true, uses the X-Forwarded-For header to get the client IP address |
| service_account_settings | List[Dict[str, Any]] | Set `service_account_settings` if you want to create settings that only apply to service account keys (Doc on service accounts)[./service_accounts.md] |
| image_generation_model | str | The default model to use for image generation - ignores model set in request |
@ -898,10 +897,6 @@ router_settings:
| BRAINTRUST_API_KEY | API key for Braintrust integration
| CIRCLE_OIDC_TOKEN | OpenID Connect token for CircleCI
| CIRCLE_OIDC_TOKEN_V2 | Version 2 of the OpenID Connect token for CircleCI
| CLICKHOUSE_HOST | Host for ClickHouse database
| CLICKHOUSE_PASSWORD | Password for ClickHouse authentication
| CLICKHOUSE_PORT | Port for ClickHouse database connection
| CLICKHOUSE_USERNAME | Username for ClickHouse authentication
| CONFIG_FILE_PATH | File path for configuration file
| CUSTOM_TIKTOKEN_CACHE_DIR | Custom directory for Tiktoken cache
| DATABASE_HOST | Hostname for the database server

View file

@ -84,21 +84,3 @@ model_cost = {
"output_cost_per_token": 0.000015,
},
}
class BerriSpendLogger:
# Class variables or attributes
def __init__(self):
# Instance variables
self.account_id = os.getenv("BERRISPEND_ACCOUNT_ID")
def price_calculator(self, model, response_obj, start_time, end_time):
return
def log_event(
self, model, messages, response_obj, start_time, end_time, print_verbose
):
"""
This integration is not implemented yet.
"""
return

View file

@ -1,334 +0,0 @@
# callback to make a request to an API endpoint
#### What this does ####
# On success, logs events to Promptlayer
import datetime
import json
import os
import traceback
from typing import Literal, Optional, Union
import dotenv
import requests
import litellm
from litellm._logging import verbose_logger
from litellm.caching.caching import DualCache
from litellm.proxy._types import UserAPIKeyAuth
from litellm.types.utils import StandardLoggingPayload
#### What this does ####
# On success + failure, log events to Supabase
def create_client():
try:
import clickhouse_connect
port = os.getenv("CLICKHOUSE_PORT")
clickhouse_host = os.getenv("CLICKHOUSE_HOST")
if clickhouse_host is not None:
verbose_logger.debug("setting up clickhouse")
port = os.getenv("CLICKHOUSE_PORT")
if port is not None and isinstance(port, str):
port = int(port)
host: Optional[str] = os.getenv("CLICKHOUSE_HOST")
if host is None:
raise ValueError("CLICKHOUSE_HOST is not set")
username: Optional[str] = os.getenv("CLICKHOUSE_USERNAME")
if username is None:
raise ValueError("CLICKHOUSE_USERNAME is not set")
password: Optional[str] = os.getenv("CLICKHOUSE_PASSWORD")
if password is None:
raise ValueError("CLICKHOUSE_PASSWORD is not set")
if port is None:
raise ValueError("CLICKHOUSE_PORT is not set")
client = clickhouse_connect.get_client(
host=host,
port=port,
username=username,
password=password,
)
return client
else:
raise Exception("Clickhouse: Clickhouse host not set")
except Exception as e:
raise ValueError(f"Clickhouse: {e}")
def build_daily_metrics():
click_house_client = create_client()
# get daily spend
daily_spend = click_house_client.query_df(
"""
SELECT sumMerge(DailySpend) as daily_spend, day FROM daily_aggregated_spend GROUP BY day
"""
)
# get daily spend per model
daily_spend_per_model = click_house_client.query_df(
"""
SELECT sumMerge(DailySpend) as daily_spend, day, model FROM daily_aggregated_spend_per_model GROUP BY day, model
"""
)
new_df = daily_spend_per_model.to_dict(orient="records")
import pandas as pd
df = pd.DataFrame(new_df)
# Group by 'day' and create a dictionary for each group
result_dict = {}
for day, group in df.groupby("day"):
models = group["model"].tolist()
spend = group["daily_spend"].tolist()
spend_per_model = {model: spend for model, spend in zip(models, spend)}
result_dict[day] = spend_per_model
# Display the resulting dictionary
# get daily spend per API key
daily_spend_per_api_key = click_house_client.query_df(
"""
SELECT
daily_spend,
day,
api_key
FROM (
SELECT
sumMerge(DailySpend) as daily_spend,
day,
api_key,
RANK() OVER (PARTITION BY day ORDER BY sumMerge(DailySpend) DESC) as spend_rank
FROM
daily_aggregated_spend_per_api_key
GROUP BY
day,
api_key
) AS ranked_api_keys
WHERE
spend_rank <= 5
AND day IS NOT NULL
ORDER BY
day,
daily_spend DESC
"""
)
new_df = daily_spend_per_api_key.to_dict(orient="records")
import pandas as pd
df = pd.DataFrame(new_df)
# Group by 'day' and create a dictionary for each group
api_key_result_dict = {}
for day, group in df.groupby("day"):
api_keys = group["api_key"].tolist()
spend = group["daily_spend"].tolist()
spend_per_api_key = {api_key: spend for api_key, spend in zip(api_keys, spend)}
api_key_result_dict[day] = spend_per_api_key
# Display the resulting dictionary
# Calculate total spend across all days
total_spend = daily_spend["daily_spend"].sum()
# Identify top models and top API keys with the highest spend across all days
top_models = {}
top_api_keys = {}
for day, spend_per_model in result_dict.items():
for model, model_spend in spend_per_model.items():
if model not in top_models or model_spend > top_models[model]:
top_models[model] = model_spend
for day, spend_per_api_key in api_key_result_dict.items():
for api_key, api_key_spend in spend_per_api_key.items():
if api_key not in top_api_keys or api_key_spend > top_api_keys[api_key]:
top_api_keys[api_key] = api_key_spend
# for each day in daily spend, look up the day in result_dict and api_key_result_dict
# Assuming daily_spend DataFrame has 'day' column
result = []
for index, row in daily_spend.iterrows():
day = row["day"]
data_day = row.to_dict()
# Look up in result_dict
if day in result_dict:
spend_per_model = result_dict[day]
# Assuming there is a column named 'model' in daily_spend
data_day["spend_per_model"] = spend_per_model # Assign 0 if model not found
# Look up in api_key_result_dict
if day in api_key_result_dict:
spend_per_api_key = api_key_result_dict[day]
# Assuming there is a column named 'api_key' in daily_spend
data_day["spend_per_api_key"] = spend_per_api_key
result.append(data_day)
data_to_return = {}
data_to_return["daily_spend"] = result
data_to_return["total_spend"] = total_spend
data_to_return["top_models"] = top_models
data_to_return["top_api_keys"] = top_api_keys
return data_to_return
# build_daily_metrics()
def _start_clickhouse():
import clickhouse_connect
port = os.getenv("CLICKHOUSE_PORT")
clickhouse_host = os.getenv("CLICKHOUSE_HOST")
if clickhouse_host is not None:
verbose_logger.debug("setting up clickhouse")
if port is not None and isinstance(port, str):
port = int(port)
port = os.getenv("CLICKHOUSE_PORT")
if port is not None and isinstance(port, str):
port = int(port)
host: Optional[str] = os.getenv("CLICKHOUSE_HOST")
if host is None:
raise ValueError("CLICKHOUSE_HOST is not set")
username: Optional[str] = os.getenv("CLICKHOUSE_USERNAME")
if username is None:
raise ValueError("CLICKHOUSE_USERNAME is not set")
password: Optional[str] = os.getenv("CLICKHOUSE_PASSWORD")
if password is None:
raise ValueError("CLICKHOUSE_PASSWORD is not set")
if port is None:
raise ValueError("CLICKHOUSE_PORT is not set")
client = clickhouse_connect.get_client(
host=host,
port=port,
username=username,
password=password,
)
# view all tables in DB
response = client.query("SHOW TABLES")
verbose_logger.debug(
f"checking if litellm spend logs exists, all tables={response.result_rows}"
)
# all tables is returned like this: all tables = [('new_table',), ('spend_logs',)]
# check if spend_logs in all tables
table_names = [all_tables[0] for all_tables in response.result_rows]
if "spend_logs" not in table_names:
verbose_logger.debug(
"Clickhouse: spend logs table does not exist... creating it"
)
response = client.command(
"""
CREATE TABLE default.spend_logs
(
`request_id` String,
`call_type` String,
`api_key` String,
`spend` Float64,
`total_tokens` Int256,
`prompt_tokens` Int256,
`completion_tokens` Int256,
`startTime` DateTime,
`endTime` DateTime,
`model` String,
`user` String,
`metadata` String,
`cache_hit` String,
`cache_key` String,
`request_tags` String
)
ENGINE = MergeTree
ORDER BY tuple();
"""
)
else:
# check if spend logs exist, if it does then return the schema
response = client.query("DESCRIBE default.spend_logs")
verbose_logger.debug(f"spend logs schema ={response.result_rows}")
class ClickhouseLogger:
# Class variables or attributes
def __init__(self, endpoint=None, headers=None):
import clickhouse_connect
_start_clickhouse()
verbose_logger.debug(
f"ClickhouseLogger init, host {os.getenv('CLICKHOUSE_HOST')}, port {os.getenv('CLICKHOUSE_PORT')}, username {os.getenv('CLICKHOUSE_USERNAME')}"
)
port = os.getenv("CLICKHOUSE_PORT")
if port is not None and isinstance(port, str):
port = int(port)
host: Optional[str] = os.getenv("CLICKHOUSE_HOST")
if host is None:
raise ValueError("CLICKHOUSE_HOST is not set")
username: Optional[str] = os.getenv("CLICKHOUSE_USERNAME")
if username is None:
raise ValueError("CLICKHOUSE_USERNAME is not set")
password: Optional[str] = os.getenv("CLICKHOUSE_PASSWORD")
if password is None:
raise ValueError("CLICKHOUSE_PASSWORD is not set")
if port is None:
raise ValueError("CLICKHOUSE_PORT is not set")
client = clickhouse_connect.get_client(
host=host,
port=port,
username=username,
password=password,
)
self.client = client
# This is sync, because we run this in a separate thread. Running in a sepearate thread ensures it will never block an LLM API call
# Experience with s3, Langfuse shows that async logging events are complicated and can block LLM calls
def log_event(
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
):
try:
verbose_logger.debug(
f"ClickhouseLogger Logging - Enters logging function for model {kwargs}"
)
# follows the same params as langfuse.py
payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object"
)
if payload is None:
return
# Build the initial payload
verbose_logger.debug(f"\nClickhouse Logger - Logging payload = {payload}")
# just get the payload items in one array and payload keys in 2nd array
values = []
keys = []
for key, value in payload.items():
keys.append(key)
values.append(value)
data = [values]
response = self.client.insert("default.spend_logs", data, column_names=keys)
# make request to endpoint with payload
verbose_logger.debug(f"Clickhouse Logger - final response = {response}")
except Exception as e:
verbose_logger.debug(f"Clickhouse - {str(e)}\n{traceback.format_exc()}")
pass

View file

@ -61,9 +61,7 @@ from litellm.utils import (
from ..integrations.aispend import AISpendLogger
from ..integrations.argilla import ArgillaLogger
from ..integrations.athina import AthinaLogger
from ..integrations.berrispend import BerriSpendLogger
from ..integrations.braintrust_logging import BraintrustLogger
from ..integrations.clickhouse import ClickhouseLogger
from ..integrations.datadog.datadog import DataDogLogger
from ..integrations.dynamodb import DyanmoDBLogger
from ..integrations.galileo import GalileoObserve
@ -122,13 +120,10 @@ prometheusLogger = None
dynamoLogger = None
s3Logger = None
genericAPILogger = None
clickHouseLogger = None
greenscaleLogger = None
lunaryLogger = None
aispendLogger = None
berrispendLogger = None
supabaseClient = None
liteDebuggerClient = None
callback_list: Optional[List[str]] = []
user_logger_fn = None
additional_details: Optional[Dict[str, str]] = {}
@ -191,7 +186,7 @@ in_memory_dynamic_logger_cache = DynamicLoggingCache()
class Logging:
global supabaseClient, liteDebuggerClient, promptLayerLogger, weightsBiasesLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger, logfireLogger, prometheusLogger, slack_app
global supabaseClient, promptLayerLogger, weightsBiasesLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger, logfireLogger, prometheusLogger, slack_app
custom_pricing: bool = False
stream_options = None
@ -970,22 +965,6 @@ class Logging:
):
print_verbose("no-log request, skipping logging")
continue
if callback == "lite_debugger" and liteDebuggerClient is not None:
print_verbose("reaches lite_debugger for logging!")
print_verbose(f"liteDebuggerClient: {liteDebuggerClient}")
print_verbose(
f"liteDebuggerClient details function {self.call_type} and stream set to {self.stream}"
)
liteDebuggerClient.log_event(
end_user=kwargs.get("user", "default"),
response_obj=result,
start_time=start_time,
end_time=end_time,
litellm_call_id=self.litellm_call_id,
print_verbose=print_verbose,
call_type=self.call_type,
stream=self.stream,
)
if callback == "promptlayer" and promptLayerLogger is not None:
print_verbose("reaches promptlayer for logging!")
promptLayerLogger.log_event(
@ -1248,37 +1227,6 @@ class Logging:
user_id=kwargs.get("user", None),
print_verbose=print_verbose,
)
if callback == "clickhouse":
global clickHouseLogger
verbose_logger.debug("reaches clickhouse for success logging!")
kwargs = {}
for k, v in self.model_call_details.items():
if (
k != "original_response"
): # copy.deepcopy raises errors as this could be a coroutine
kwargs[k] = v
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
if self.stream:
verbose_logger.debug(
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}"
)
if complete_streaming_response is None:
continue
else:
print_verbose(
"reaches clickhouse for streaming logging!"
)
result = kwargs["complete_streaming_response"]
if clickHouseLogger is None:
clickHouseLogger = ClickhouseLogger()
clickHouseLogger.log_event(
kwargs=kwargs,
response_obj=result,
start_time=start_time,
end_time=end_time,
user_id=kwargs.get("user", None),
print_verbose=print_verbose,
)
if callback == "greenscale" and greenscaleLogger is not None:
kwargs = {}
for k, v in self.model_call_details.items():
@ -1874,9 +1822,7 @@ class Logging:
)
for callback in callbacks:
try:
if callback == "lite_debugger" and liteDebuggerClient is not None:
pass
elif callback == "lunary" and lunaryLogger is not None:
if callback == "lunary" and lunaryLogger is not None:
print_verbose("reaches lunary for logging error!")
model = self.model
@ -2195,7 +2141,7 @@ def set_callbacks(callback_list, function_id=None): # noqa: PLR0915
"""
Globally sets the callback client
"""
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, supabaseClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
try:
for callback in callback_list:
@ -2277,24 +2223,12 @@ def set_callbacks(callback_list, function_id=None): # noqa: PLR0915
logfireLogger = LogfireLogger()
elif callback == "aispend":
aispendLogger = AISpendLogger()
elif callback == "berrispend":
berrispendLogger = BerriSpendLogger()
elif callback == "supabase":
print_verbose("instantiating supabase")
supabaseClient = Supabase()
elif callback == "greenscale":
greenscaleLogger = GreenscaleLogger()
print_verbose("Initialized Greenscale Logger")
elif callback == "lite_debugger":
print_verbose("instantiating lite_debugger")
if function_id:
liteDebuggerClient = LiteDebugger(email=function_id)
elif litellm.token:
liteDebuggerClient = LiteDebugger(email=litellm.token)
elif litellm.email:
liteDebuggerClient = LiteDebugger(email=litellm.email)
else:
liteDebuggerClient = LiteDebugger(email=str(uuid.uuid4()))
elif callable(callback):
customLogger = CustomLogger()
except Exception as e:

View file

@ -15,27 +15,28 @@ def get_response_headers(_response_headers: Optional[dict] = None) -> dict:
dict: _response_headers with OpenAI headers and llm_provider-{header}
"""
if _response_headers is not None:
openai_headers = {}
if "x-ratelimit-limit-requests" in _response_headers:
openai_headers["x-ratelimit-limit-requests"] = _response_headers[
"x-ratelimit-limit-requests"
]
if "x-ratelimit-remaining-requests" in _response_headers:
openai_headers["x-ratelimit-remaining-requests"] = _response_headers[
"x-ratelimit-remaining-requests"
]
if "x-ratelimit-limit-tokens" in _response_headers:
openai_headers["x-ratelimit-limit-tokens"] = _response_headers[
"x-ratelimit-limit-tokens"
]
if "x-ratelimit-remaining-tokens" in _response_headers:
openai_headers["x-ratelimit-remaining-tokens"] = _response_headers[
"x-ratelimit-remaining-tokens"
]
llm_provider_headers = _get_llm_provider_headers(_response_headers)
return {**llm_provider_headers, **openai_headers}
return {}
if _response_headers is None:
return {}
openai_headers = {}
if "x-ratelimit-limit-requests" in _response_headers:
openai_headers["x-ratelimit-limit-requests"] = _response_headers[
"x-ratelimit-limit-requests"
]
if "x-ratelimit-remaining-requests" in _response_headers:
openai_headers["x-ratelimit-remaining-requests"] = _response_headers[
"x-ratelimit-remaining-requests"
]
if "x-ratelimit-limit-tokens" in _response_headers:
openai_headers["x-ratelimit-limit-tokens"] = _response_headers[
"x-ratelimit-limit-tokens"
]
if "x-ratelimit-remaining-tokens" in _response_headers:
openai_headers["x-ratelimit-remaining-tokens"] = _response_headers[
"x-ratelimit-remaining-tokens"
]
llm_provider_headers = _get_llm_provider_headers(_response_headers)
return {**llm_provider_headers, **openai_headers}
def _get_llm_provider_headers(response_headers: dict) -> dict:

View file

@ -213,13 +213,10 @@ prometheusLogger = None
dynamoLogger = None
s3Logger = None
genericAPILogger = None
clickHouseLogger = None
greenscaleLogger = None
lunaryLogger = None
aispendLogger = None
berrispendLogger = None
supabaseClient = None
liteDebuggerClient = None
callback_list: Optional[List[str]] = []
user_logger_fn = None
additional_details: Optional[Dict[str, str]] = {}
@ -609,7 +606,6 @@ def function_setup( # noqa: PLR0915
def client(original_function): # noqa: PLR0915
global liteDebuggerClient
rules_obj = Rules()
def check_coroutine(value) -> bool:

View file

@ -1,42 +0,0 @@
import sys
import os
import io, asyncio
# import logging
# logging.basicConfig(level=logging.DEBUG)
sys.path.insert(0, os.path.abspath("../.."))
print("Modified sys.path:", sys.path)
from litellm import completion
import litellm
from litellm._logging import verbose_logger
import logging
litellm.num_retries = 3
import time, random
import pytest
@pytest.mark.asyncio
@pytest.mark.skip(reason="beta test - this is a new feature")
async def test_custom_api_logging():
try:
litellm.success_callback = ["clickhouse"]
litellm.set_verbose = True
verbose_logger.setLevel(logging.DEBUG)
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"This is a test"}],
max_tokens=10,
temperature=0.7,
user="ishaan-2",
)
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
finally:
# post, close log file and verify
# Reset stdout to the original value
print("Passed!")