forked from phoenix/litellm-mirror
* Added Opik logging and evaluation * Updated doc examples * Default tags should be [] in case appending * WIP * Work in progress * Opik integration * Opik integration * Revert changes on litellm_logging.py * Updated Opik integration for synchronous API calls * Updated Opik documentation --------- Co-authored-by: Douglas Blank <doug@comet.com> Co-authored-by: Doug Blank <doug.blank@gmail.com>
175 lines
5.9 KiB
Python
175 lines
5.9 KiB
Python
import io
|
|
import os
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.abspath("../.."))
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
import pytest
|
|
|
|
import litellm
|
|
from litellm._logging import verbose_logger
|
|
from unittest.mock import AsyncMock, Mock
|
|
|
|
verbose_logger.setLevel(logging.DEBUG)
|
|
|
|
litellm.set_verbose = True
|
|
import time
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_opik_logging_http_request():
|
|
"""
|
|
- Test that HTTP requests are made to Opik
|
|
- Traces and spans are batched correctly
|
|
"""
|
|
try:
|
|
from litellm.integrations.opik.opik import OpikLogger
|
|
|
|
os.environ["OPIK_URL_OVERRIDE"] = "https://fake.comet.com/opik/api"
|
|
os.environ["OPIK_API_KEY"] = "anything"
|
|
os.environ["OPIK_WORKSPACE"] = "anything"
|
|
|
|
# Initialize OpikLogger
|
|
test_opik_logger = OpikLogger()
|
|
|
|
litellm.callbacks = [test_opik_logger]
|
|
test_opik_logger.batch_size = 12
|
|
litellm.set_verbose = True
|
|
|
|
# Create a mock for the async_client's post method
|
|
mock_post = AsyncMock()
|
|
mock_post.return_value.status_code = 202
|
|
mock_post.return_value.text = "Accepted"
|
|
test_opik_logger.async_httpx_client.post = mock_post
|
|
|
|
# Make multiple calls to ensure we don't hit the batch size
|
|
for _ in range(5):
|
|
response = await litellm.acompletion(
|
|
model="gpt-3.5-turbo",
|
|
messages=[{"role": "user", "content": "Test message"}],
|
|
max_tokens=10,
|
|
temperature=0.2,
|
|
mock_response="This is a mock response",
|
|
)
|
|
await asyncio.sleep(1)
|
|
|
|
# Check batching of events and that the queue contains 5 trace events and 5 span events
|
|
assert mock_post.called == False, "HTTP request was made but events should have been batched"
|
|
assert len(test_opik_logger.log_queue) == 10
|
|
|
|
# Now make calls to exceed the batch size
|
|
for _ in range(3):
|
|
response = await litellm.acompletion(
|
|
model="gpt-3.5-turbo",
|
|
messages=[{"role": "user", "content": "Test message"}],
|
|
max_tokens=10,
|
|
temperature=0.2,
|
|
mock_response="This is a mock response",
|
|
)
|
|
|
|
# Wait a short time for any asynchronous operations to complete
|
|
await asyncio.sleep(1)
|
|
|
|
# Check that the queue was flushed after exceeding batch size
|
|
assert len(test_opik_logger.log_queue) < test_opik_logger.batch_size
|
|
|
|
# Check that the data has been sent when it goes above the flush interval
|
|
await asyncio.sleep(test_opik_logger.flush_interval)
|
|
assert len(test_opik_logger.log_queue) == 0
|
|
|
|
# Clean up
|
|
for cb in litellm.callbacks:
|
|
if isinstance(cb, OpikLogger):
|
|
await cb.async_httpx_client.client.aclose()
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"Error occurred: {e}")
|
|
|
|
def test_sync_opik_logging_http_request():
|
|
"""
|
|
- Test that HTTP requests are made to Opik
|
|
- Traces and spans are batched correctly
|
|
"""
|
|
try:
|
|
from litellm.integrations.opik.opik import OpikLogger
|
|
|
|
os.environ["OPIK_URL_OVERRIDE"] = "https://fake.comet.com/opik/api"
|
|
os.environ["OPIK_API_KEY"] = "anything"
|
|
os.environ["OPIK_WORKSPACE"] = "anything"
|
|
|
|
# Initialize OpikLogger
|
|
test_opik_logger = OpikLogger()
|
|
|
|
litellm.callbacks = [test_opik_logger]
|
|
litellm.set_verbose = True
|
|
|
|
# Create a mock for the clients's post method
|
|
mock_post = Mock()
|
|
mock_post.return_value.status_code = 204
|
|
mock_post.return_value.text = "Accepted"
|
|
test_opik_logger.sync_httpx_client.post = mock_post
|
|
|
|
# Make multiple calls to ensure we don't hit the batch size
|
|
for _ in range(5):
|
|
response = litellm.completion(
|
|
model="gpt-3.5-turbo",
|
|
messages=[{"role": "user", "content": "Test message"}],
|
|
max_tokens=10,
|
|
temperature=0.2,
|
|
mock_response="This is a mock response",
|
|
)
|
|
|
|
# Need to wait for a short amount of time as the log_success callback is called in a different thread
|
|
time.sleep(1)
|
|
|
|
# Check that 5 spans and 5 traces were sent
|
|
assert mock_post.call_count == 10, f"Expected 10 HTTP requests, but got {mock_post.call_count}"
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"Error occurred: {e}")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip(reason="local-only test, to test if everything works fine.")
|
|
async def test_opik_logging():
|
|
try:
|
|
from litellm.integrations.opik.opik import OpikLogger
|
|
|
|
# Initialize OpikLogger
|
|
test_opik_logger = OpikLogger()
|
|
litellm.callbacks = [test_opik_logger]
|
|
litellm.set_verbose = True
|
|
|
|
# Log a chat completion call
|
|
response = await litellm.acompletion(
|
|
model="gpt-3.5-turbo",
|
|
messages=[{"role": "user", "content": "What LLM are you ?"}],
|
|
max_tokens=10,
|
|
temperature=0.2,
|
|
metadata={"opik": {"custom_field": "custom_value"}}
|
|
)
|
|
print("Non-streaming response:", response)
|
|
|
|
# Log a streaming completion call
|
|
stream_response = await litellm.acompletion(
|
|
model="gpt-3.5-turbo",
|
|
messages=[{"role": "user", "content": "Stream = True - What llm are you ?"}],
|
|
max_tokens=10,
|
|
temperature=0.2,
|
|
stream=True,
|
|
metadata={"opik": {"custom_field": "custom_value"}}
|
|
)
|
|
print("Streaming response:")
|
|
async for chunk in stream_response:
|
|
print(chunk.choices[0].delta.content, end='', flush=True)
|
|
print() # New line after streaming response
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
assert len(test_opik_logger.log_queue) == 4
|
|
|
|
await asyncio.sleep(test_opik_logger.flush_interval + 1)
|
|
assert len(test_opik_logger.log_queue) == 0
|
|
except Exception as e:
|
|
pytest.fail(f"Error occurred: {e}")
|