Merge pull request #4830 from BerriAI/litellm_braintrust_integration

Braintrust logging integration
This commit is contained in:
Krish Dholakia 2024-07-22 22:40:47 -07:00 committed by GitHub
commit b69b7503db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 586 additions and 3 deletions

View file

@ -0,0 +1,147 @@
import Image from '@theme/IdealImage';
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# ⚡️ Braintrust - Evals + Logging
[Braintrust](https://www.braintrust.dev/) manages evaluations, logging, prompt playground, to data management for AI products.
## Quick Start
```python
# pip install langfuse
import litellm
import os
# set env
os.environ["BRAINTRUST_API_KEY"] = ""
os.environ['OPENAI_API_KEY']=""
# set braintrust as a callback, litellm will send the data to braintrust
litellm.callbacks = ["braintrust"]
# openai call
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "Hi 👋 - i'm openai"}
]
)
```
## OpenAI Proxy Usage
1. Add keys to env
```env
BRAINTRUST_API_KEY=""
```
2. Add braintrust to callbacks
```yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: gpt-3.5-turbo
api_key: os.environ/OPENAI_API_KEY
litellm_settings:
callbacks: ["braintrust"]
```
3. Test it!
```bash
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-D '{
"model": "groq-llama3",
"messages": [
{ "role": "system", "content": "Use your tools smartly"},
{ "role": "user", "content": "What time is it now? Use your tool"}
]
}'
```
## Advanced - pass Project ID
<Tabs>
<TabItem value="sdk" label="SDK">
```python
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "Hi 👋 - i'm openai"}
],
metadata={
"project_id": "my-special-project"
}
)
```
</TabItem>
<TabItem value="proxy" label="PROXY">
**Curl**
```bash
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-D '{
"model": "groq-llama3",
"messages": [
{ "role": "system", "content": "Use your tools smartly"},
{ "role": "user", "content": "What time is it now? Use your tool"}
],
"metadata": {
"project_id": "my-special-project"
}
}'
```
**OpenAI SDK**
```python
import openai
client = openai.OpenAI(
api_key="anything",
base_url="http://0.0.0.0:4000"
)
# request sent to model set on litellm proxy, `litellm --model`
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages = [
{
"role": "user",
"content": "this is a test request, write a short poem"
}
],
extra_body={ # pass in any provider-specific param, if not supported by openai, https://docs.litellm.ai/docs/completion/input#provider-specific-params
"metadata": { # 👈 use for logging additional params (e.g. to langfuse)
"project_id": "my-special-project"
}
}
)
print(response)
```
For more examples, [**Click Here**](../proxy/user_keys.md#chatcompletions)
</TabItem>
</Tabs>
## Full API Spec
Here's everything you can pass in metadata for a braintrust request
`braintrust_*` - any metadata field starting with `braintrust_` will be passed as metadata to the logging request
`project_id` - set the project id for a braintrust call. Default is `litellm`.

View file

@ -1,4 +1,4 @@
# 🧠 Helicone - OSS LLM Observability Platform
# 🧊 Helicone - OSS LLM Observability Platform
:::tip

View file

@ -199,9 +199,10 @@ const sidebars = {
"observability/raw_request_response",
"observability/custom_callback",
"observability/scrub_data",
"observability/helicone_integration",
"observability/braintrust",
"observability/sentry",
"observability/lago",
"observability/helicone_integration",
"observability/openmeter",
"observability/promptlayer_integration",
"observability/wandb_integration",

View file

@ -44,6 +44,7 @@ _custom_logger_compatible_callbacks_literal = Literal[
"dynamic_rate_limiter",
"langsmith",
"galileo",
"braintrust",
"arize",
]
_known_custom_logger_compatible_callbacks: List = list(

View file

@ -0,0 +1,369 @@
# What is this?
## Log success + failure events to Braintrust
import copy
import json
import os
import threading
import traceback
import uuid
from typing import Literal, Optional
import dotenv
import httpx
import litellm
from litellm import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
from litellm.utils import get_formatted_prompt
global_braintrust_http_handler = AsyncHTTPHandler()
global_braintrust_sync_http_handler = HTTPHandler()
API_BASE = "https://api.braintrustdata.com/v1"
def get_utc_datetime():
import datetime as dt
from datetime import datetime
if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore
class BraintrustLogger(CustomLogger):
def __init__(
self, api_key: Optional[str] = None, api_base: Optional[str] = None
) -> None:
super().__init__()
self.validate_environment(api_key=api_key)
self.api_base = api_base or API_BASE
self.default_project_id = None
self.api_key: str = api_key or os.getenv("BRAINTRUST_API_KEY") # type: ignore
self.headers = {
"Authorization": "Bearer " + self.api_key,
"Content-Type": "application/json",
}
def validate_environment(self, api_key: Optional[str]):
"""
Expects
BRAINTRUST_API_KEY
in the environment
"""
missing_keys = []
if api_key is None and os.getenv("BRAINTRUST_API_KEY", None) is None:
missing_keys.append("BRAINTRUST_API_KEY")
if len(missing_keys) > 0:
raise Exception("Missing keys={} in environment.".format(missing_keys))
@staticmethod
def add_metadata_from_header(litellm_params: dict, metadata: dict) -> dict:
"""
Adds metadata from proxy request headers to Langfuse logging if keys start with "langfuse_"
and overwrites litellm_params.metadata if already included.
For example if you want to append your trace to an existing `trace_id` via header, send
`headers: { ..., langfuse_existing_trace_id: your-existing-trace-id }` via proxy request.
"""
if litellm_params is None:
return metadata
if litellm_params.get("proxy_server_request") is None:
return metadata
if metadata is None:
metadata = {}
proxy_headers = (
litellm_params.get("proxy_server_request", {}).get("headers", {}) or {}
)
for metadata_param_key in proxy_headers:
if metadata_param_key.startswith("braintrust"):
trace_param_key = metadata_param_key.replace("braintrust", "", 1)
if trace_param_key in metadata:
verbose_logger.warning(
f"Overwriting Braintrust `{trace_param_key}` from request header"
)
else:
verbose_logger.debug(
f"Found Braintrust `{trace_param_key}` in request header"
)
metadata[trace_param_key] = proxy_headers.get(metadata_param_key)
return metadata
async def create_default_project_and_experiment(self):
project = await global_braintrust_http_handler.post(
f"{self.api_base}/project", headers=self.headers, json={"name": "litellm"}
)
project_dict = project.json()
self.default_project_id = project_dict["id"]
def create_sync_default_project_and_experiment(self):
project = global_braintrust_sync_http_handler.post(
f"{self.api_base}/project", headers=self.headers, json={"name": "litellm"}
)
project_dict = project.json()
self.default_project_id = project_dict["id"]
def log_success_event(self, kwargs, response_obj, start_time, end_time):
verbose_logger.debug("REACHES BRAINTRUST SUCCESS")
try:
litellm_call_id = kwargs.get("litellm_call_id")
project_id = kwargs.get("project_id", None)
if project_id is None:
if self.default_project_id is None:
self.create_sync_default_project_and_experiment()
project_id = self.default_project_id
prompt = {"messages": kwargs.get("messages")}
if response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
):
input = prompt
output = None
elif response_obj is not None and isinstance(
response_obj, litellm.ModelResponse
):
input = prompt
output = response_obj["choices"][0]["message"].json()
elif response_obj is not None and isinstance(
response_obj, litellm.TextCompletionResponse
):
input = prompt
output = response_obj.choices[0].text
elif response_obj is not None and isinstance(
response_obj, litellm.ImageResponse
):
input = prompt
output = response_obj["data"]
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
metadata = self.add_metadata_from_header(litellm_params, metadata)
clean_metadata = {}
try:
metadata = copy.deepcopy(
metadata
) # Avoid modifying the original metadata
except:
new_metadata = {}
for key, value in metadata.items():
if (
isinstance(value, list)
or isinstance(value, dict)
or isinstance(value, str)
or isinstance(value, int)
or isinstance(value, float)
):
new_metadata[key] = copy.deepcopy(value)
metadata = new_metadata
tags = []
if isinstance(metadata, dict):
for key, value in metadata.items():
# generate langfuse tags - Default Tags sent to Langfuse from LiteLLM Proxy
if (
litellm._langfuse_default_tags is not None
and isinstance(litellm._langfuse_default_tags, list)
and key in litellm._langfuse_default_tags
):
tags.append(f"{key}:{value}")
# clean litellm metadata before logging
if key in [
"headers",
"endpoint",
"caching_groups",
"previous_models",
]:
continue
else:
clean_metadata[key] = value
cost = kwargs.get("response_cost", None)
if cost is not None:
clean_metadata["litellm_response_cost"] = cost
metrics: Optional[dict] = None
if (
response_obj is not None
and hasattr(response_obj, "usage")
and isinstance(response_obj.usage, litellm.Usage)
):
generation_id = litellm.utils.get_logging_id(start_time, response_obj)
metrics = {
"prompt_tokens": response_obj.usage.prompt_tokens,
"completion_tokens": response_obj.usage.completion_tokens,
"total_tokens": response_obj.usage.total_tokens,
"total_cost": cost,
}
request_data = {
"id": litellm_call_id,
"input": prompt,
"output": output,
"metadata": clean_metadata,
"tags": tags,
}
if metrics is not None:
request_data["metrics"] = metrics
try:
global_braintrust_sync_http_handler.post(
url=f"{self.api_base}/project_logs/{project_id}/insert",
json={"events": [request_data]},
headers=self.headers,
)
except httpx.HTTPStatusError as e:
raise Exception(e.response.text)
except Exception as e:
verbose_logger.error(
"Error logging to braintrust - Exception received - {}\n{}".format(
str(e), traceback.format_exc()
)
)
raise e
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
verbose_logger.debug("REACHES BRAINTRUST SUCCESS")
try:
litellm_call_id = kwargs.get("litellm_call_id")
project_id = kwargs.get("project_id", None)
if project_id is None:
if self.default_project_id is None:
await self.create_default_project_and_experiment()
project_id = self.default_project_id
prompt = {"messages": kwargs.get("messages")}
if response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
):
input = prompt
output = None
elif response_obj is not None and isinstance(
response_obj, litellm.ModelResponse
):
input = prompt
output = response_obj["choices"][0]["message"].json()
elif response_obj is not None and isinstance(
response_obj, litellm.TextCompletionResponse
):
input = prompt
output = response_obj.choices[0].text
elif response_obj is not None and isinstance(
response_obj, litellm.ImageResponse
):
input = prompt
output = response_obj["data"]
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
metadata = self.add_metadata_from_header(litellm_params, metadata)
clean_metadata = {}
try:
metadata = copy.deepcopy(
metadata
) # Avoid modifying the original metadata
except:
new_metadata = {}
for key, value in metadata.items():
if (
isinstance(value, list)
or isinstance(value, dict)
or isinstance(value, str)
or isinstance(value, int)
or isinstance(value, float)
):
new_metadata[key] = copy.deepcopy(value)
metadata = new_metadata
tags = []
if isinstance(metadata, dict):
for key, value in metadata.items():
# generate langfuse tags - Default Tags sent to Langfuse from LiteLLM Proxy
if (
litellm._langfuse_default_tags is not None
and isinstance(litellm._langfuse_default_tags, list)
and key in litellm._langfuse_default_tags
):
tags.append(f"{key}:{value}")
# clean litellm metadata before logging
if key in [
"headers",
"endpoint",
"caching_groups",
"previous_models",
]:
continue
else:
clean_metadata[key] = value
cost = kwargs.get("response_cost", None)
if cost is not None:
clean_metadata["litellm_response_cost"] = cost
metrics: Optional[dict] = None
if (
response_obj is not None
and hasattr(response_obj, "usage")
and isinstance(response_obj.usage, litellm.Usage)
):
generation_id = litellm.utils.get_logging_id(start_time, response_obj)
metrics = {
"prompt_tokens": response_obj.usage.prompt_tokens,
"completion_tokens": response_obj.usage.completion_tokens,
"total_tokens": response_obj.usage.total_tokens,
"total_cost": cost,
}
request_data = {
"id": litellm_call_id,
"input": prompt,
"output": output,
"metadata": clean_metadata,
"tags": tags,
}
if metrics is not None:
request_data["metrics"] = metrics
try:
await global_braintrust_http_handler.post(
url=f"{self.api_base}/project_logs/{project_id}/insert",
json={"events": [request_data]},
headers=self.headers,
)
except httpx.HTTPStatusError as e:
raise Exception(e.response.text)
except Exception as e:
verbose_logger.error(
"Error logging to braintrust - Exception received - {}\n{}".format(
str(e), traceback.format_exc()
)
)
raise e
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
return super().log_failure_event(kwargs, response_obj, start_time, end_time)

View file

@ -53,6 +53,7 @@ from litellm.utils import (
from ..integrations.aispend import AISpendLogger
from ..integrations.athina import AthinaLogger
from ..integrations.berrispend import BerriSpendLogger
from ..integrations.braintrust_logging import BraintrustLogger
from ..integrations.clickhouse import ClickhouseLogger
from ..integrations.custom_logger import CustomLogger
from ..integrations.datadog import DataDogLogger
@ -1945,7 +1946,14 @@ def _init_custom_logger_compatible_class(
_openmeter_logger = OpenMeterLogger()
_in_memory_loggers.append(_openmeter_logger)
return _openmeter_logger # type: ignore
elif logging_integration == "braintrust":
for callback in _in_memory_loggers:
if isinstance(callback, BraintrustLogger):
return callback # type: ignore
braintrust_logger = BraintrustLogger()
_in_memory_loggers.append(braintrust_logger)
return braintrust_logger # type: ignore
elif logging_integration == "langsmith":
for callback in _in_memory_loggers:
if isinstance(callback, LangsmithLogger):
@ -2056,6 +2064,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers:
if isinstance(callback, OpenMeterLogger):
return callback
elif logging_integration == "braintrust":
for callback in _in_memory_loggers:
if isinstance(callback, BraintrustLogger):
return callback
elif logging_integration == "galileo":
for callback in _in_memory_loggers:
if isinstance(callback, GalileoObserve):

View file

@ -6,4 +6,4 @@ model_list:
litellm_settings:
callbacks: ["logfire"]
redact_user_api_key_info: true
redact_user_api_key_info: true

View file

@ -0,0 +1,53 @@
# What is this?
## This tests the braintrust integration
import asyncio
import os
import random
import sys
import time
import traceback
from datetime import datetime
from dotenv import load_dotenv
from fastapi import Request
load_dotenv()
import os
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import asyncio
import logging
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import litellm
from litellm.llms.custom_httpx.http_handler import HTTPHandler
def test_braintrust_logging():
import litellm
http_client = HTTPHandler()
setattr(
litellm.integrations.braintrust_logging,
"global_braintrust_sync_http_handler",
http_client,
)
with patch.object(http_client, "post", new=MagicMock()) as mock_client:
# set braintrust as a callback, litellm will send the data to braintrust
litellm.callbacks = ["braintrust"]
# openai call
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}],
)
mock_client.assert_called()