Merge pull request #4758 from BerriAI/litellm_langsmith_async_support

[Feat] Use Async Httpx client for langsmith logging
This commit is contained in:
Ishaan Jaff 2024-07-17 16:54:40 -07:00 committed by GitHub
commit ee53b9093b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 249 additions and 111 deletions

View file

@ -14,7 +14,7 @@ https://github.com/BerriAI/litellm
An all-in-one developer platform for every step of the application lifecycle
https://smith.langchain.com/
<Image img={require('../../img/langsmith.png')} />
<Image img={require('../../img/langsmith_new.png')} />
:::info
We want to learn how we can make the callbacks better! Meet the LiteLLM [founders](https://calendly.com/d/4mp-gd3-k5k/berriai-1-1-onboarding-litellm-hosted-version) or

View file

@ -5,6 +5,7 @@ Log Proxy input, output, and exceptions using:
- Langfuse
- OpenTelemetry
- Custom Callbacks
- Langsmith
- DataDog
- DynamoDB
- s3 Bucket
@ -1086,6 +1087,50 @@ litellm_settings:
Start the LiteLLM Proxy and make a test request to verify the logs reached your callback API
## Logging LLM IO to Langsmith
1. Set `success_callback: ["langsmith"]` on litellm config.yaml
If you're using a custom LangSmith instance, you can set the
`LANGSMITH_BASE_URL` environment variable to point to your instance.
```yaml
litellm_settings:
success_callback: ["langsmith"]
environment_variables:
LANGSMITH_API_KEY: "lsv2_pt_xxxxxxxx"
LANGSMITH_PROJECT: "litellm-proxy"
LANGSMITH_BASE_URL: "https://api.smith.langchain.com" # (Optional - only needed if you have a custom Langsmith instance)
```
2. Start Proxy
```
litellm --config /path/to/config.yaml
```
3. Test it!
```bash
curl --location 'http://0.0.0.0:4000/chat/completions' \
--header 'Content-Type: application/json' \
--data ' {
"model": "fake-openai-endpoint",
"messages": [
{
"role": "user",
"content": "Hello, Claude gm!"
}
],
}
'
```
Expect to see your log on Langfuse
<Image img={require('../../img/langsmith_new.png')} />
## Logging LLM IO to Galileo
[BETA]

Binary file not shown.

After

Width:  |  Height:  |  Size: 353 KiB

View file

@ -38,7 +38,7 @@ success_callback: List[Union[str, Callable]] = []
failure_callback: List[Union[str, Callable]] = []
service_callback: List[Union[str, Callable]] = []
_custom_logger_compatible_callbacks_literal = Literal[
"lago", "openmeter", "logfire", "dynamic_rate_limiter"
"lago", "openmeter", "logfire", "dynamic_rate_limiter", "langsmith", "galileo"
]
callbacks: List[Union[Callable, _custom_logger_compatible_callbacks_literal]] = []
_langfuse_default_tags: Optional[

View file

@ -5,12 +5,17 @@ import os
import traceback
import types
from datetime import datetime
from typing import Any, List, Optional
from typing import Any, List, Optional, Union
import dotenv # type: ignore
import requests # type: ignore
from pydantic import BaseModel # type: ignore
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
class LangsmithInputs(BaseModel):
model: Optional[str] = None
@ -24,7 +29,7 @@ class LangsmithInputs(BaseModel):
custom_llm_provider: Optional[str] = None
input: Optional[List[Any]] = None
log_event_type: Optional[str] = None
original_response: Optional[str] = None
original_response: Optional[Any] = None
response_cost: Optional[float] = None
# LiteLLM Virtual Key specific fields
@ -43,7 +48,7 @@ def is_serializable(value):
return not isinstance(value, non_serializable_types)
class LangsmithLogger:
class LangsmithLogger(CustomLogger):
# Class variables or attributes
def __init__(self):
self.langsmith_api_key = os.getenv("LANGSMITH_API_KEY")
@ -54,84 +59,116 @@ class LangsmithLogger:
self.langsmith_base_url = os.getenv(
"LANGSMITH_BASE_URL", "https://api.smith.langchain.com"
)
self.async_httpx_client = AsyncHTTPHandler()
def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose):
# Method definition
# inspired by Langsmith http api here: https://github.com/langchain-ai/langsmith-cookbook/blob/main/tracing-examples/rest/rest.ipynb
metadata = (
kwargs.get("litellm_params", {}).get("metadata", {}) or {}
) # if metadata is None
def _prepare_log_data(self, kwargs, response_obj, start_time, end_time):
import datetime
from datetime import timezone
metadata = kwargs.get("litellm_params", {}).get("metadata", {}) or {}
# set user_api_key, user_team_id, user_api_key_user_id
kwargs["user_api_key"] = metadata.get("user_api_key", None)
kwargs["user_api_key_user_id"] = metadata.get("user_api_key_user_id", None)
kwargs["user_api_key_team_alias"] = metadata.get(
"user_api_key_team_alias", None
)
# set project name and run_name for langsmith logging
# users can pass project_name and run name to litellm.completion()
# Example: litellm.completion(model, messages, metadata={"project_name": "my-litellm-project", "run_name": "my-langsmith-run"})
# if not set litellm will fallback to the environment variable LANGSMITH_PROJECT, then to the default project_name = litellm-completion, run_name = LLMRun
project_name = metadata.get("project_name", self.langsmith_project)
run_name = metadata.get("run_name", self.langsmith_default_run_name)
run_id = metadata.get("id", None)
print_verbose(
verbose_logger.debug(
f"Langsmith Logging - project_name: {project_name}, run_name {run_name}"
)
langsmith_base_url = os.getenv(
"LANGSMITH_BASE_URL", "https://api.smith.langchain.com"
)
try:
print_verbose(
f"Langsmith Logging - Enters logging function for model {kwargs}"
)
import datetime
from datetime import timezone
start_time = kwargs["start_time"].astimezone(timezone.utc).isoformat()
end_time = kwargs["end_time"].astimezone(timezone.utc).isoformat()
except:
start_time = datetime.datetime.utcnow().isoformat()
end_time = datetime.datetime.utcnow().isoformat()
import requests
# filter out kwargs to not include any dicts, langsmith throws an erros when trying to log kwargs
logged_kwargs = LangsmithInputs(**kwargs)
kwargs = logged_kwargs.model_dump()
new_kwargs = {}
for key in kwargs:
value = kwargs[key]
if key == "start_time" or key == "end_time" or value is None:
pass
elif key == "original_response" and not isinstance(value, str):
new_kwargs[key] = str(value)
elif type(value) == datetime.datetime:
new_kwargs[key] = value.isoformat()
elif type(value) != dict and is_serializable(value=value):
new_kwargs[key] = value
elif not is_serializable(value=value):
continue
if isinstance(response_obj, BaseModel):
try:
start_time = kwargs["start_time"].astimezone(timezone.utc).isoformat()
end_time = kwargs["end_time"].astimezone(timezone.utc).isoformat()
response_obj = response_obj.model_dump()
except:
start_time = datetime.datetime.utcnow().isoformat()
end_time = datetime.datetime.utcnow().isoformat()
response_obj = response_obj.dict() # type: ignore
# filter out kwargs to not include any dicts, langsmith throws an erros when trying to log kwargs
logged_kwargs = LangsmithInputs(**kwargs)
kwargs = logged_kwargs.model_dump()
data = {
"name": run_name,
"run_type": "llm", # this should always be llm, since litellm always logs llm calls. Langsmith allow us to log "chain"
"inputs": new_kwargs,
"outputs": response_obj,
"session_name": project_name,
"start_time": start_time,
"end_time": end_time,
}
new_kwargs = {}
for key in kwargs:
value = kwargs[key]
if key == "start_time" or key == "end_time" or value is None:
pass
elif type(value) == datetime.datetime:
new_kwargs[key] = value.isoformat()
elif type(value) != dict and is_serializable(value=value):
new_kwargs[key] = value
if run_id:
data["id"] = run_id
if isinstance(response_obj, BaseModel):
try:
response_obj = response_obj.model_dump()
except:
response_obj = response_obj.dict() # type: ignore
verbose_logger.debug("Langsmith Logging data on langsmith: %s", data)
data = {
"name": run_name,
"run_type": "llm", # this should always be llm, since litellm always logs llm calls. Langsmith allow us to log "chain"
"inputs": new_kwargs,
"outputs": response_obj,
"session_name": project_name,
"start_time": start_time,
"end_time": end_time,
"id": run_id,
}
return data
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
"Langsmith Async Layer Logging - kwargs: %s, response_obj: %s",
kwargs,
response_obj,
)
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
url = f"{self.langsmith_base_url}/runs"
verbose_logger.debug(f"Langsmith Logging - About to send data to {url} ...")
headers = {"x-api-key": self.langsmith_api_key}
response = await self.async_httpx_client.post(
url=url, json=data, headers=headers
)
if response.status_code >= 300:
verbose_logger.error(
f"Langmsith Error: {response.status_code} - {response.text}"
)
else:
verbose_logger.debug(
"Run successfully created, response=%s", response.text
)
verbose_logger.debug(
f"Langsmith Layer Logging - final response object: {response_obj}. Response text from langsmith={response.text}"
)
except:
verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}")
def log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
"Langsmith Sync Layer Logging - kwargs: %s, response_obj: %s",
kwargs,
response_obj,
)
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
url = f"{self.langsmith_base_url}/runs"
verbose_logger.debug(f"Langsmith Logging - About to send data to {url} ...")
url = f"{langsmith_base_url}/runs"
print_verbose(f"Langsmith Logging - About to send data to {url} ...")
response = requests.post(
url=url,
json=data,
@ -139,16 +176,14 @@ class LangsmithLogger:
)
if response.status_code >= 300:
print_verbose(f"Error: {response.status_code}")
verbose_logger.error(f"Error: {response.status_code} - {response.text}")
else:
print_verbose("Run successfully created")
print_verbose(
verbose_logger.debug("Run successfully created")
verbose_logger.debug(
f"Langsmith Layer Logging - final response object: {response_obj}. Response text from langsmith={response.text}"
)
return
except:
print_verbose(f"Langsmith Layer Error - {traceback.format_exc()}")
pass
verbose_logger.error(f"Langsmith Layer Error - {traceback.format_exc()}")
def get_run_by_id(self, run_id):

View file

@ -39,7 +39,6 @@ from litellm.utils import (
add_breadcrumb,
capture_exception,
customLogger,
langsmithLogger,
liteDebuggerClient,
logfireLogger,
lunaryLogger,
@ -89,7 +88,6 @@ alerts_channel = None
heliconeLogger = None
athinaLogger = None
promptLayerLogger = None
langsmithLogger = None
logfireLogger = None
weightsBiasesLogger = None
customLogger = None
@ -136,7 +134,7 @@ in_memory_trace_id_cache = ServiceTraceIDCache()
class Logging:
global supabaseClient, liteDebuggerClient, promptLayerLogger, weightsBiasesLogger, langsmithLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger, logfireLogger, prometheusLogger, slack_app
global supabaseClient, liteDebuggerClient, promptLayerLogger, weightsBiasesLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger, logfireLogger, prometheusLogger, slack_app
custom_pricing: bool = False
stream_options = None
@ -738,23 +736,6 @@ class Logging:
end_time=end_time,
print_verbose=print_verbose,
)
if callback == "langsmith":
print_verbose("reaches langsmith for logging!")
if self.stream:
if "complete_streaming_response" not in kwargs:
continue
else:
print_verbose(
"reaches langsmith for streaming logging!"
)
result = kwargs["complete_streaming_response"]
langsmithLogger.log_event(
kwargs=self.model_call_details,
response_obj=result,
start_time=start_time,
end_time=end_time,
print_verbose=print_verbose,
)
if callback == "logfire":
global logfireLogger
verbose_logger.debug("reaches logfire for success logging!")
@ -1829,7 +1810,7 @@ def set_callbacks(callback_list, function_id=None):
"""
Globally sets the callback client
"""
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, langsmithLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
try:
for callback in callback_list:
@ -1910,8 +1891,6 @@ def set_callbacks(callback_list, function_id=None):
s3Logger = S3Logger()
elif callback == "wandb":
weightsBiasesLogger = WeightsBiasesLogger()
elif callback == "langsmith":
langsmithLogger = LangsmithLogger()
elif callback == "logfire":
logfireLogger = LogfireLogger()
elif callback == "aispend":
@ -1964,6 +1943,15 @@ def _init_custom_logger_compatible_class(
_in_memory_loggers.append(_openmeter_logger)
return _openmeter_logger # type: ignore
elif logging_integration == "langsmith":
for callback in _in_memory_loggers:
if isinstance(callback, LangsmithLogger):
return callback # type: ignore
_langsmith_logger = LangsmithLogger()
_in_memory_loggers.append(_langsmith_logger)
return _langsmith_logger # type: ignore
elif logging_integration == "galileo":
for callback in _in_memory_loggers:
if isinstance(callback, GalileoObserve):
@ -2032,6 +2020,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers:
if isinstance(callback, GalileoObserve):
return callback
elif logging_integration == "langsmith":
for callback in _in_memory_loggers:
if isinstance(callback, LangsmithLogger):
return callback
elif logging_integration == "logfire":
if "LOGFIRE_TOKEN" not in os.environ:
raise ValueError("LOGFIRE_TOKEN not found in environment variables")

View file

@ -4,24 +4,33 @@ import sys
sys.path.insert(0, os.path.abspath("../.."))
import asyncio
import logging
import uuid
import pytest
import litellm
from litellm import completion
from litellm._logging import verbose_logger
from litellm.integrations.langsmith import LangsmithLogger
verbose_logger.setLevel(logging.DEBUG)
litellm.set_verbose = True
import time
test_langsmith_logger = LangsmithLogger()
def test_langsmith_logging():
@pytest.mark.asyncio()
async def test_langsmith_logging():
try:
import uuid
run_id = str(uuid.uuid4())
litellm.set_verbose = True
litellm.success_callback = ["langsmith"]
response = completion(
litellm.callbacks = ["langsmith"]
response = await litellm.acompletion(
model="claude-instant-1.2",
messages=[{"role": "user", "content": "what llm are u"}],
max_tokens=10,
@ -40,7 +49,7 @@ def test_langsmith_logging():
},
)
print(response)
time.sleep(3)
await asyncio.sleep(3)
print("run_id", run_id)
logged_run_on_langsmith = test_langsmith_logger.get_run_by_id(run_id=run_id)
@ -50,13 +59,15 @@ def test_langsmith_logging():
print("fields in logged_run_on_langsmith", logged_run_on_langsmith.keys())
input_fields_on_langsmith = logged_run_on_langsmith.get("inputs")
extra_fields_on_langsmith = logged_run_on_langsmith.get("extra")
extra_fields_on_langsmith = logged_run_on_langsmith.get("extra").get(
"invocation_params"
)
print("\nLogged INPUT ON LANGSMITH", input_fields_on_langsmith)
print("\nextra fields on langsmith", extra_fields_on_langsmith)
assert input_fields_on_langsmith is not None
assert isinstance(input_fields_on_langsmith, dict)
assert "api_key" not in input_fields_on_langsmith
assert "api_key" not in extra_fields_on_langsmith
@ -67,6 +78,7 @@ def test_langsmith_logging():
except Exception as e:
print(e)
pytest.fail(f"Error occurred: {e}")
# test_langsmith_logging()
@ -75,6 +87,7 @@ def test_langsmith_logging():
def test_langsmith_logging_with_metadata():
try:
litellm.success_callback = ["langsmith"]
litellm.set_verbose = True
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "what llm are u"}],
@ -83,28 +96,66 @@ def test_langsmith_logging_with_metadata():
)
print(response)
time.sleep(3)
except Exception as e:
pytest.fail(f"Error occurred: {e}")
print(e)
# test_langsmith_logging_with_metadata()
def test_langsmith_logging_with_streaming_and_metadata():
@pytest.mark.parametrize("sync_mode", [False, True])
@pytest.mark.asyncio
async def test_langsmith_logging_with_streaming_and_metadata(sync_mode):
try:
litellm.success_callback = ["langsmith"]
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "what llm are u"}],
max_tokens=10,
temperature=0.2,
stream=True,
litellm.set_verbose = True
run_id = str(uuid.uuid4())
messages = [{"role": "user", "content": "what llm are u"}]
if sync_mode is True:
response = completion(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=10,
temperature=0.2,
stream=True,
metadata={"id": run_id},
)
for chunk in response:
continue
time.sleep(3)
else:
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=10,
temperature=0.2,
mock_response="This is a mock request",
stream=True,
metadata={"id": run_id},
)
async for chunk in response:
continue
await asyncio.sleep(3)
print("run_id", run_id)
logged_run_on_langsmith = test_langsmith_logger.get_run_by_id(run_id=run_id)
print("logged_run_on_langsmith", logged_run_on_langsmith)
print("fields in logged_run_on_langsmith", logged_run_on_langsmith.keys())
input_fields_on_langsmith = logged_run_on_langsmith.get("inputs")
extra_fields_on_langsmith = logged_run_on_langsmith.get("extra").get(
"invocation_params"
)
for chunk in response:
continue
time.sleep(3)
assert logged_run_on_langsmith.get("run_type") == "llm"
print("\nLogged INPUT ON LANGSMITH", input_fields_on_langsmith)
print("\nextra fields on langsmith", extra_fields_on_langsmith)
assert isinstance(input_fields_on_langsmith, dict)
except Exception as e:
pytest.fail(f"Error occurred: {e}")
print(e)
# test_langsmith_logging_with_streaming_and_metadata()

View file

@ -417,6 +417,21 @@ def function_setup(
# we only support async dynamo db logging for acompletion/aembedding since that's used on proxy
litellm._async_success_callback.append(callback)
removed_async_items.append(index)
elif callback == "langsmith":
callback_class = litellm.litellm_core_utils.litellm_logging._init_custom_logger_compatible_class( # type: ignore
callback, internal_usage_cache=None, llm_router=None
)
# don't double add a callback
if not any(
isinstance(cb, type(callback_class)) for cb in litellm.callbacks
):
litellm.callbacks.append(callback_class) # type: ignore
litellm.input_callback.append(callback_class) # type: ignore
litellm.success_callback.append(callback_class) # type: ignore
litellm.failure_callback.append(callback_class) # type: ignore
litellm._async_success_callback.append(callback_class) # type: ignore
litellm._async_failure_callback.append(callback_class) # type: ignore
# Pop the async items from success_callback in reverse order to avoid index issues
for index in reversed(removed_async_items):