litellm/tests/logging_callback_tests/test_langsmith_unit_test.py
Ishaan Jaff c3bc9e6b12
(Feat) Add langsmith key based logging (#6682)
* add langsmith_api_key to StandardCallbackDynamicParams

* create a file for langsmith types

* langsmith add key / team based logging

* add key based logging for langsmith

* fix langsmith key based logging

* fix linting langsmith

* remove NOQA violation

* add unit test coverage for all helpers in test langsmith

* test_langsmith_key_based_logging

* docs langsmith key based logging

* run langsmith tests in logging callback tests

* fix logging testing

* test_langsmith_key_based_logging

* test_add_callback_via_key_litellm_pre_call_utils_langsmith

* add debug statement langsmith key based logging

* test_langsmith_key_based_logging
2024-11-11 13:58:06 -08:00

394 lines
12 KiB
Python

import io
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
import asyncio
import gzip
import json
import logging
import time
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from datetime import datetime, timezone
from litellm.integrations.langsmith import (
LangsmithLogger,
LangsmithQueueObject,
CredentialsKey,
BatchGroup,
)
import litellm
# Test get_credentials_from_env
@pytest.mark.asyncio
async def test_get_credentials_from_env():
# Test with direct parameters
logger = LangsmithLogger(
langsmith_api_key="test-key",
langsmith_project="test-project",
langsmith_base_url="http://test-url",
)
credentials = logger.get_credentials_from_env(
langsmith_api_key="custom-key",
langsmith_project="custom-project",
langsmith_base_url="http://custom-url",
)
assert credentials["LANGSMITH_API_KEY"] == "custom-key"
assert credentials["LANGSMITH_PROJECT"] == "custom-project"
assert credentials["LANGSMITH_BASE_URL"] == "http://custom-url"
# assert that the default api base is used if not provided
credentials = logger.get_credentials_from_env()
assert credentials["LANGSMITH_BASE_URL"] == "https://api.smith.langchain.com"
@pytest.mark.asyncio
async def test_group_batches_by_credentials():
logger = LangsmithLogger(langsmith_api_key="test-key")
# Create test queue objects
queue_obj1 = LangsmithQueueObject(
data={"test": "data1"},
credentials={
"LANGSMITH_API_KEY": "key1",
"LANGSMITH_PROJECT": "proj1",
"LANGSMITH_BASE_URL": "url1",
},
)
queue_obj2 = LangsmithQueueObject(
data={"test": "data2"},
credentials={
"LANGSMITH_API_KEY": "key1",
"LANGSMITH_PROJECT": "proj1",
"LANGSMITH_BASE_URL": "url1",
},
)
logger.log_queue = [queue_obj1, queue_obj2]
grouped = logger._group_batches_by_credentials()
# Check grouping
assert len(grouped) == 1 # Should have one group since credentials are same
key = list(grouped.keys())[0]
assert isinstance(key, CredentialsKey)
assert len(grouped[key].queue_objects) == 2
@pytest.mark.asyncio
async def test_group_batches_by_credentials_multiple_credentials():
# Test with multiple different credentials
logger = LangsmithLogger(langsmith_api_key="test-key")
queue_obj1 = LangsmithQueueObject(
data={"test": "data1"},
credentials={
"LANGSMITH_API_KEY": "key1",
"LANGSMITH_PROJECT": "proj1",
"LANGSMITH_BASE_URL": "url1",
},
)
queue_obj2 = LangsmithQueueObject(
data={"test": "data2"},
credentials={
"LANGSMITH_API_KEY": "key2", # Different API key
"LANGSMITH_PROJECT": "proj1",
"LANGSMITH_BASE_URL": "url1",
},
)
queue_obj3 = LangsmithQueueObject(
data={"test": "data3"},
credentials={
"LANGSMITH_API_KEY": "key1",
"LANGSMITH_PROJECT": "proj2", # Different project
"LANGSMITH_BASE_URL": "url1",
},
)
logger.log_queue = [queue_obj1, queue_obj2, queue_obj3]
grouped = logger._group_batches_by_credentials()
# Check grouping
assert len(grouped) == 3 # Should have three groups since credentials differ
for key, batch_group in grouped.items():
assert isinstance(key, CredentialsKey)
assert len(batch_group.queue_objects) == 1 # Each group should have one object
# Test make_dot_order
@pytest.mark.asyncio
async def test_make_dot_order():
logger = LangsmithLogger(langsmith_api_key="test-key")
run_id = "729cff0e-f30c-4336-8b79-45d6b61c64b4"
dot_order = logger.make_dot_order(run_id)
print("dot_order=", dot_order)
# Check format: YYYYMMDDTHHMMSSfffZ + run_id
# Check the timestamp portion (first 23 characters)
timestamp_part = dot_order[:-36] # 36 is length of run_id
assert len(timestamp_part) == 22
assert timestamp_part[8] == "T" # Check T separator
assert timestamp_part[-1] == "Z" # Check Z suffix
# Verify timestamp format
try:
# Parse the timestamp portion (removing the Z)
datetime.strptime(timestamp_part[:-1], "%Y%m%dT%H%M%S%f")
except ValueError:
pytest.fail("Timestamp portion is not in correct format")
# Verify run_id portion
assert dot_order[-36:] == run_id
# Test is_serializable
@pytest.mark.asyncio
async def test_is_serializable():
from litellm.integrations.langsmith import is_serializable
from pydantic import BaseModel
# Test basic types
assert is_serializable("string") is True
assert is_serializable(123) is True
assert is_serializable({"key": "value"}) is True
# Test non-serializable types
async def async_func():
pass
assert is_serializable(async_func) is False
class TestModel(BaseModel):
field: str
assert is_serializable(TestModel(field="test")) is False
@pytest.mark.asyncio
async def test_async_send_batch():
logger = LangsmithLogger(langsmith_api_key="test-key")
# Mock the httpx client
mock_response = AsyncMock()
mock_response.status_code = 200
logger.async_httpx_client = AsyncMock()
logger.async_httpx_client.post.return_value = mock_response
# Add test data to queue
logger.log_queue = [
LangsmithQueueObject(
data={"test": "data"}, credentials=logger.default_credentials
)
]
await logger.async_send_batch()
# Verify the API call
logger.async_httpx_client.post.assert_called_once()
call_args = logger.async_httpx_client.post.call_args
assert "runs/batch" in call_args[1]["url"]
assert "x-api-key" in call_args[1]["headers"]
@pytest.mark.asyncio
async def test_langsmith_key_based_logging(mocker):
"""
In key based logging langsmith_api_key and langsmith_project are passed directly to litellm.acompletion
"""
try:
# Mock the httpx post request
mock_post = mocker.patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post"
)
mock_post.return_value.status_code = 200
mock_post.return_value.raise_for_status = lambda: None
litellm.set_verbose = True
litellm.callbacks = [LangsmithLogger()]
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
langsmith_api_key="fake_key_project2",
langsmith_project="fake_project2",
)
print("Waiting for logs to be flushed to Langsmith.....")
await asyncio.sleep(15)
print("done sleeping 15 seconds...")
# Verify the post request was made with correct parameters
mock_post.assert_called_once()
call_args = mock_post.call_args
print("call_args", call_args)
# Check URL contains /runs/batch
assert "/runs/batch" in call_args[1]["url"]
# Check headers contain the correct API key
assert call_args[1]["headers"]["x-api-key"] == "fake_key_project2"
# Verify the request body contains the expected data
request_body = call_args[1]["json"]
assert "post" in request_body
assert len(request_body["post"]) == 1 # Should contain one run
# EXPECTED BODY
expected_body = {
"post": [
{
"name": "LLMRun",
"run_type": "llm",
"inputs": {
"id": "chatcmpl-82699ee4-7932-4fc0-9585-76abc8caeafa",
"call_type": "acompletion",
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Test message"}],
"model_parameters": {
"temperature": 0.2,
"max_tokens": 10,
"extra_body": {},
},
},
"outputs": {
"id": "chatcmpl-82699ee4-7932-4fc0-9585-76abc8caeafa",
"model": "gpt-3.5-turbo",
"choices": [
{
"finish_reason": "stop",
"index": 0,
"message": {
"content": "This is a mock response",
"role": "assistant",
"tool_calls": None,
"function_call": None,
},
}
],
"usage": {
"completion_tokens": 20,
"prompt_tokens": 10,
"total_tokens": 30,
},
},
"session_name": "fake_project2",
}
]
}
# Print both bodies for debugging
actual_body = call_args[1]["json"]
print("\nExpected body:")
print(json.dumps(expected_body, indent=2))
print("\nActual body:")
print(json.dumps(actual_body, indent=2))
assert len(actual_body["post"]) == 1
# Assert only the critical parts we care about
assert actual_body["post"][0]["name"] == expected_body["post"][0]["name"]
assert (
actual_body["post"][0]["run_type"] == expected_body["post"][0]["run_type"]
)
assert (
actual_body["post"][0]["inputs"]["messages"]
== expected_body["post"][0]["inputs"]["messages"]
)
assert (
actual_body["post"][0]["inputs"]["model_parameters"]
== expected_body["post"][0]["inputs"]["model_parameters"]
)
assert (
actual_body["post"][0]["outputs"]["choices"]
== expected_body["post"][0]["outputs"]["choices"]
)
assert (
actual_body["post"][0]["outputs"]["usage"]["completion_tokens"]
== expected_body["post"][0]["outputs"]["usage"]["completion_tokens"]
)
assert (
actual_body["post"][0]["outputs"]["usage"]["prompt_tokens"]
== expected_body["post"][0]["outputs"]["usage"]["prompt_tokens"]
)
assert (
actual_body["post"][0]["outputs"]["usage"]["total_tokens"]
== expected_body["post"][0]["outputs"]["usage"]["total_tokens"]
)
assert (
actual_body["post"][0]["session_name"]
== expected_body["post"][0]["session_name"]
)
except Exception as e:
pytest.fail(f"Error occurred: {e}")
@pytest.mark.asyncio
async def test_langsmith_queue_logging():
try:
# Initialize LangsmithLogger
test_langsmith_logger = LangsmithLogger()
litellm.callbacks = [test_langsmith_logger]
test_langsmith_logger.batch_size = 6
litellm.set_verbose = True
# Make multiple calls to ensure we don't hit the batch size
for _ in range(5):
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
)
await asyncio.sleep(3)
# Check that logs are in the queue
assert len(test_langsmith_logger.log_queue) == 5
# Now make calls to exceed the batch size
for _ in range(3):
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
)
# Wait a short time for any asynchronous operations to complete
await asyncio.sleep(1)
print(
"Length of langsmith log queue: {}".format(
len(test_langsmith_logger.log_queue)
)
)
# Check that the queue was flushed after exceeding batch size
assert len(test_langsmith_logger.log_queue) < 5
# Clean up
for cb in litellm.callbacks:
if isinstance(cb, LangsmithLogger):
await cb.async_httpx_client.client.aclose()
except Exception as e:
pytest.fail(f"Error occurred: {e}")