mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
(Feat) Add langsmith key based logging (#6682)
* add langsmith_api_key to StandardCallbackDynamicParams * create a file for langsmith types * langsmith add key / team based logging * add key based logging for langsmith * fix langsmith key based logging * fix linting langsmith * remove NOQA violation * add unit test coverage for all helpers in test langsmith * test_langsmith_key_based_logging * docs langsmith key based logging * run langsmith tests in logging callback tests * fix logging testing * test_langsmith_key_based_logging * test_add_callback_via_key_litellm_pre_call_utils_langsmith * add debug statement langsmith key based logging * test_langsmith_key_based_logging
This commit is contained in:
parent
1e2ba3e045
commit
c3bc9e6b12
9 changed files with 810 additions and 179 deletions
|
@ -23,34 +23,8 @@ from litellm.llms.custom_httpx.http_handler import (
|
|||
get_async_httpx_client,
|
||||
httpxSpecialProvider,
|
||||
)
|
||||
from litellm.types.utils import StandardLoggingPayload
|
||||
|
||||
|
||||
class LangsmithInputs(BaseModel):
|
||||
model: Optional[str] = None
|
||||
messages: Optional[List[Any]] = None
|
||||
stream: Optional[bool] = None
|
||||
call_type: Optional[str] = None
|
||||
litellm_call_id: Optional[str] = None
|
||||
completion_start_time: Optional[datetime] = None
|
||||
temperature: Optional[float] = None
|
||||
max_tokens: Optional[int] = None
|
||||
custom_llm_provider: Optional[str] = None
|
||||
input: Optional[List[Any]] = None
|
||||
log_event_type: Optional[str] = None
|
||||
original_response: Optional[Any] = None
|
||||
response_cost: Optional[float] = None
|
||||
|
||||
# LiteLLM Virtual Key specific fields
|
||||
user_api_key: Optional[str] = None
|
||||
user_api_key_user_id: Optional[str] = None
|
||||
user_api_key_team_alias: Optional[str] = None
|
||||
|
||||
|
||||
class LangsmithCredentialsObject(TypedDict):
|
||||
LANGSMITH_API_KEY: str
|
||||
LANGSMITH_PROJECT: str
|
||||
LANGSMITH_BASE_URL: str
|
||||
from litellm.types.integrations.langsmith import *
|
||||
from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload
|
||||
|
||||
|
||||
def is_serializable(value):
|
||||
|
@ -93,15 +67,16 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
)
|
||||
if _batch_size:
|
||||
self.batch_size = int(_batch_size)
|
||||
self.log_queue: List[LangsmithQueueObject] = []
|
||||
asyncio.create_task(self.periodic_flush())
|
||||
self.flush_lock = asyncio.Lock()
|
||||
super().__init__(**kwargs, flush_lock=self.flush_lock)
|
||||
|
||||
def get_credentials_from_env(
|
||||
self,
|
||||
langsmith_api_key: Optional[str],
|
||||
langsmith_project: Optional[str],
|
||||
langsmith_base_url: Optional[str],
|
||||
langsmith_api_key: Optional[str] = None,
|
||||
langsmith_project: Optional[str] = None,
|
||||
langsmith_base_url: Optional[str] = None,
|
||||
) -> LangsmithCredentialsObject:
|
||||
|
||||
_credentials_api_key = langsmith_api_key or os.getenv("LANGSMITH_API_KEY")
|
||||
|
@ -132,42 +107,19 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
LANGSMITH_PROJECT=_credentials_project,
|
||||
)
|
||||
|
||||
def _prepare_log_data( # noqa: PLR0915
|
||||
self, kwargs, response_obj, start_time, end_time
|
||||
def _prepare_log_data(
|
||||
self,
|
||||
kwargs,
|
||||
response_obj,
|
||||
start_time,
|
||||
end_time,
|
||||
credentials: LangsmithCredentialsObject,
|
||||
):
|
||||
import json
|
||||
from datetime import datetime as dt
|
||||
|
||||
try:
|
||||
_litellm_params = kwargs.get("litellm_params", {}) or {}
|
||||
metadata = _litellm_params.get("metadata", {}) or {}
|
||||
new_metadata = {}
|
||||
for key, value in metadata.items():
|
||||
if (
|
||||
isinstance(value, list)
|
||||
or isinstance(value, str)
|
||||
or isinstance(value, int)
|
||||
or isinstance(value, float)
|
||||
):
|
||||
new_metadata[key] = value
|
||||
elif isinstance(value, BaseModel):
|
||||
new_metadata[key] = value.model_dump_json()
|
||||
elif isinstance(value, dict):
|
||||
for k, v in value.items():
|
||||
if isinstance(v, dt):
|
||||
value[k] = v.isoformat()
|
||||
new_metadata[key] = value
|
||||
|
||||
metadata = new_metadata
|
||||
|
||||
kwargs["user_api_key"] = metadata.get("user_api_key", None)
|
||||
kwargs["user_api_key_user_id"] = metadata.get("user_api_key_user_id", None)
|
||||
kwargs["user_api_key_team_alias"] = metadata.get(
|
||||
"user_api_key_team_alias", None
|
||||
)
|
||||
|
||||
project_name = metadata.get(
|
||||
"project_name", self.default_credentials["LANGSMITH_PROJECT"]
|
||||
"project_name", credentials["LANGSMITH_PROJECT"]
|
||||
)
|
||||
run_name = metadata.get("run_name", self.langsmith_default_run_name)
|
||||
run_id = metadata.get("id", None)
|
||||
|
@ -175,16 +127,10 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
trace_id = metadata.get("trace_id", None)
|
||||
session_id = metadata.get("session_id", None)
|
||||
dotted_order = metadata.get("dotted_order", None)
|
||||
tags = metadata.get("tags", []) or []
|
||||
verbose_logger.debug(
|
||||
f"Langsmith Logging - project_name: {project_name}, run_name {run_name}"
|
||||
)
|
||||
|
||||
# filter out kwargs to not include any dicts, langsmith throws an erros when trying to log kwargs
|
||||
# logged_kwargs = LangsmithInputs(**kwargs)
|
||||
# kwargs = logged_kwargs.model_dump()
|
||||
|
||||
# new_kwargs = {}
|
||||
# Ensure everything in the payload is converted to str
|
||||
payload: Optional[StandardLoggingPayload] = kwargs.get(
|
||||
"standard_logging_object", None
|
||||
|
@ -193,7 +139,6 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
if payload is None:
|
||||
raise Exception("Error logging request payload. Payload=none.")
|
||||
|
||||
new_kwargs = payload
|
||||
metadata = payload[
|
||||
"metadata"
|
||||
] # ensure logged metadata is json serializable
|
||||
|
@ -201,12 +146,12 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
data = {
|
||||
"name": run_name,
|
||||
"run_type": "llm", # this should always be llm, since litellm always logs llm calls. Langsmith allow us to log "chain"
|
||||
"inputs": new_kwargs,
|
||||
"outputs": new_kwargs["response"],
|
||||
"inputs": payload,
|
||||
"outputs": payload["response"],
|
||||
"session_name": project_name,
|
||||
"start_time": new_kwargs["startTime"],
|
||||
"end_time": new_kwargs["endTime"],
|
||||
"tags": tags,
|
||||
"start_time": payload["startTime"],
|
||||
"end_time": payload["endTime"],
|
||||
"tags": payload["request_tags"],
|
||||
"extra": metadata,
|
||||
}
|
||||
|
||||
|
@ -243,37 +188,6 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
except Exception:
|
||||
raise
|
||||
|
||||
def _send_batch(self):
|
||||
if not self.log_queue:
|
||||
return
|
||||
|
||||
langsmith_api_key = self.default_credentials["LANGSMITH_API_KEY"]
|
||||
langsmith_api_base = self.default_credentials["LANGSMITH_BASE_URL"]
|
||||
|
||||
url = f"{langsmith_api_base}/runs/batch"
|
||||
|
||||
headers = {"x-api-key": langsmith_api_key}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
url=url,
|
||||
json=self.log_queue,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if response.status_code >= 300:
|
||||
verbose_logger.error(
|
||||
f"Langsmith Error: {response.status_code} - {response.text}"
|
||||
)
|
||||
else:
|
||||
verbose_logger.debug(
|
||||
f"Batch of {len(self.log_queue)} runs successfully created"
|
||||
)
|
||||
|
||||
self.log_queue.clear()
|
||||
except Exception:
|
||||
verbose_logger.exception("Langsmith Layer Error - Error sending batch.")
|
||||
|
||||
def log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||
try:
|
||||
sampling_rate = (
|
||||
|
@ -295,8 +209,20 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
kwargs,
|
||||
response_obj,
|
||||
)
|
||||
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
||||
self.log_queue.append(data)
|
||||
credentials = self._get_credentials_to_use_for_request(kwargs=kwargs)
|
||||
data = self._prepare_log_data(
|
||||
kwargs=kwargs,
|
||||
response_obj=response_obj,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
credentials=credentials,
|
||||
)
|
||||
self.log_queue.append(
|
||||
LangsmithQueueObject(
|
||||
data=data,
|
||||
credentials=credentials,
|
||||
)
|
||||
)
|
||||
verbose_logger.debug(
|
||||
f"Langsmith, event added to queue. Will flush in {self.flush_interval} seconds..."
|
||||
)
|
||||
|
@ -323,8 +249,20 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
kwargs,
|
||||
response_obj,
|
||||
)
|
||||
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
||||
self.log_queue.append(data)
|
||||
credentials = self._get_credentials_to_use_for_request(kwargs=kwargs)
|
||||
data = self._prepare_log_data(
|
||||
kwargs=kwargs,
|
||||
response_obj=response_obj,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
credentials=credentials,
|
||||
)
|
||||
self.log_queue.append(
|
||||
LangsmithQueueObject(
|
||||
data=data,
|
||||
credentials=credentials,
|
||||
)
|
||||
)
|
||||
verbose_logger.debug(
|
||||
"Langsmith logging: queue length %s, batch size %s",
|
||||
len(self.log_queue),
|
||||
|
@ -349,8 +287,20 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
return # Skip logging
|
||||
verbose_logger.info("Langsmith Failure Event Logging!")
|
||||
try:
|
||||
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
|
||||
self.log_queue.append(data)
|
||||
credentials = self._get_credentials_to_use_for_request(kwargs=kwargs)
|
||||
data = self._prepare_log_data(
|
||||
kwargs=kwargs,
|
||||
response_obj=response_obj,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
credentials=credentials,
|
||||
)
|
||||
self.log_queue.append(
|
||||
LangsmithQueueObject(
|
||||
data=data,
|
||||
credentials=credentials,
|
||||
)
|
||||
)
|
||||
verbose_logger.debug(
|
||||
"Langsmith logging: queue length %s, batch size %s",
|
||||
len(self.log_queue),
|
||||
|
@ -365,31 +315,58 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
|
||||
async def async_send_batch(self):
|
||||
"""
|
||||
sends runs to /batch endpoint
|
||||
Handles sending batches of runs to Langsmith
|
||||
|
||||
Sends runs from self.log_queue
|
||||
self.log_queue contains LangsmithQueueObjects
|
||||
Each LangsmithQueueObject has the following:
|
||||
- "credentials" - credentials to use for the request (langsmith_api_key, langsmith_project, langsmith_base_url)
|
||||
- "data" - data to log on to langsmith for the request
|
||||
|
||||
|
||||
This function
|
||||
- groups the queue objects by credentials
|
||||
- loops through each unique credentials and sends batches to Langsmith
|
||||
|
||||
|
||||
This was added to support key/team based logging on langsmith
|
||||
"""
|
||||
if not self.log_queue:
|
||||
return
|
||||
|
||||
batch_groups = self._group_batches_by_credentials()
|
||||
for batch_group in batch_groups.values():
|
||||
await self._log_batch_on_langsmith(
|
||||
credentials=batch_group.credentials,
|
||||
queue_objects=batch_group.queue_objects,
|
||||
)
|
||||
|
||||
async def _log_batch_on_langsmith(
|
||||
self,
|
||||
credentials: LangsmithCredentialsObject,
|
||||
queue_objects: List[LangsmithQueueObject],
|
||||
):
|
||||
"""
|
||||
Logs a batch of runs to Langsmith
|
||||
sends runs to /batch endpoint for the given credentials
|
||||
|
||||
Args:
|
||||
credentials: LangsmithCredentialsObject
|
||||
queue_objects: List[LangsmithQueueObject]
|
||||
|
||||
Returns: None
|
||||
|
||||
Raises: Does not raise an exception, will only verbose_logger.exception()
|
||||
"""
|
||||
if not self.log_queue:
|
||||
return
|
||||
|
||||
langsmith_api_base = self.default_credentials["LANGSMITH_BASE_URL"]
|
||||
|
||||
langsmith_api_base = credentials["LANGSMITH_BASE_URL"]
|
||||
langsmith_api_key = credentials["LANGSMITH_API_KEY"]
|
||||
url = f"{langsmith_api_base}/runs/batch"
|
||||
|
||||
langsmith_api_key = self.default_credentials["LANGSMITH_API_KEY"]
|
||||
|
||||
headers = {"x-api-key": langsmith_api_key}
|
||||
elements_to_log = [queue_object["data"] for queue_object in queue_objects]
|
||||
|
||||
try:
|
||||
response = await self.async_httpx_client.post(
|
||||
url=url,
|
||||
json={
|
||||
"post": self.log_queue,
|
||||
},
|
||||
json={"post": elements_to_log},
|
||||
headers=headers,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
@ -411,6 +388,74 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
f"Langsmith Layer Error - {traceback.format_exc()}"
|
||||
)
|
||||
|
||||
def _group_batches_by_credentials(self) -> Dict[CredentialsKey, BatchGroup]:
|
||||
"""Groups queue objects by credentials using a proper key structure"""
|
||||
log_queue_by_credentials: Dict[CredentialsKey, BatchGroup] = {}
|
||||
|
||||
for queue_object in self.log_queue:
|
||||
credentials = queue_object["credentials"]
|
||||
key = CredentialsKey(
|
||||
api_key=credentials["LANGSMITH_API_KEY"],
|
||||
project=credentials["LANGSMITH_PROJECT"],
|
||||
base_url=credentials["LANGSMITH_BASE_URL"],
|
||||
)
|
||||
|
||||
if key not in log_queue_by_credentials:
|
||||
log_queue_by_credentials[key] = BatchGroup(
|
||||
credentials=credentials, queue_objects=[]
|
||||
)
|
||||
|
||||
log_queue_by_credentials[key].queue_objects.append(queue_object)
|
||||
|
||||
return log_queue_by_credentials
|
||||
|
||||
def _get_credentials_to_use_for_request(
|
||||
self, kwargs: Dict[str, Any]
|
||||
) -> LangsmithCredentialsObject:
|
||||
"""
|
||||
Handles key/team based logging
|
||||
|
||||
If standard_callback_dynamic_params are provided, use those credentials.
|
||||
|
||||
Otherwise, use the default credentials.
|
||||
"""
|
||||
standard_callback_dynamic_params: Optional[StandardCallbackDynamicParams] = (
|
||||
kwargs.get("standard_callback_dynamic_params", None)
|
||||
)
|
||||
if standard_callback_dynamic_params is not None:
|
||||
credentials = self.get_credentials_from_env(
|
||||
langsmith_api_key=standard_callback_dynamic_params.get(
|
||||
"langsmith_api_key", None
|
||||
),
|
||||
langsmith_project=standard_callback_dynamic_params.get(
|
||||
"langsmith_project", None
|
||||
),
|
||||
langsmith_base_url=standard_callback_dynamic_params.get(
|
||||
"langsmith_base_url", None
|
||||
),
|
||||
)
|
||||
else:
|
||||
credentials = self.default_credentials
|
||||
return credentials
|
||||
|
||||
def _send_batch(self):
|
||||
"""Calls async_send_batch in an event loop"""
|
||||
if not self.log_queue:
|
||||
return
|
||||
|
||||
try:
|
||||
# Try to get the existing event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
# If we're already in an event loop, create a task
|
||||
asyncio.create_task(self.async_send_batch())
|
||||
else:
|
||||
# If no event loop is running, run the coroutine directly
|
||||
loop.run_until_complete(self.async_send_batch())
|
||||
except RuntimeError:
|
||||
# If we can't get an event loop, create a new one
|
||||
asyncio.run(self.async_send_batch())
|
||||
|
||||
def get_run_by_id(self, run_id):
|
||||
|
||||
langsmith_api_key = self.default_credentials["LANGSMITH_API_KEY"]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue