[Feat] Observability integration - Opik by Comet (#6062)

* Added Opik logging and evaluation

* Updated doc examples

* Default tags should be [] in case appending

* WIP

* Work in progress

* Opik integration

* Opik integration

* Revert changes on litellm_logging.py

* Updated Opik integration for synchronous API calls

* Updated Opik documentation

---------

Co-authored-by: Douglas Blank <doug@comet.com>
Co-authored-by: Doug Blank <doug.blank@gmail.com>
This commit is contained in:
Jacques Verré 2024-10-10 13:57:50 +01:00 committed by GitHub
parent 89506053a4
commit 4064bfc6dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 686 additions and 0 deletions

View file

@ -0,0 +1,95 @@
import Image from '@theme/IdealImage';
# Comet Opik - Logging + Evals
Opik is an open source end-to-end [LLM Evaluation Platform](https://www.comet.com/site/products/opik/?utm_source=litelllm&utm_medium=docs&utm_content=intro_paragraph) that helps developers track their LLM prompts and responses during both development and production. Users can define and run evaluations to test their LLMs apps before deployment to check for hallucinations, accuracy, context retrevial, and more!
<Image img={require('../../img/opik.png')} />
:::info
We want to learn how we can make the callbacks better! Meet the LiteLLM [founders](https://calendly.com/d/4mp-gd3-k5k/berriai-1-1-onboarding-litellm-hosted-version) or
join our [discord](https://discord.gg/wuPM9dRgDw)
:::
## Pre-Requisites
You can learn more about setting up Opik in the [Opik quickstart guide](https://www.comet.com/docs/opik/quickstart/). You can also learn more about self-hosting Opik in our [self-hosting guide](https://www.comet.com/docs/opik/self-host/local_deployment).
## Quick Start
Use just 4 lines of code, to instantly log your responses **across all providers** with Opik
Get your Opik API Key by signing up [here](https://www.comet.com/signup?utm_source=litelllm&utm_medium=docs&utm_content=api_key_cell)!
```python
from litellm.integrations.opik.opik import OpikLogger
import litellm
opik_logger = OpikLogger()
litellm.callbacks = [opik_logger]
```
Full examples:
```python
from litellm.integrations.opik.opik import OpikLogger
import litellm
import os
# Configure the Opik API key or call opik.configure()
os.environ["OPIK_API_KEY"] = ""
os.environ["OPIK_WORKSPACE"] = ""
# LLM provider API Keys:
os.environ["OPENAI_API_KEY"] = ""
# set "opik" as a callback, litellm will send the data to an Opik server (such as comet.com)
opik_logger = OpikLogger()
litellm.callbacks = [opik_logger]
# openai call
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "Why is tracking and evaluation of LLMs important?"}
]
)
```
If you are liteLLM within a function tracked using Opik's `@track` decorator,
you will need provide the `current_span_data` field in the metadata attribute
so that the LLM call is assigned to the correct trace:
```python
from opik import track
from opik.opik_context import get_current_span_data
from litellm.integrations.opik.opik import OpikLogger
import litellm
opik_logger = OpikLogger()
litellm.callbacks = [opik_logger]
@track()
def streaming_function(input):
messages = [{"role": "user", "content": input}]
response = litellm.completion(
model="gpt-3.5-turbo",
messages=messages,
metadata = {
"opik": {
"current_span_data": get_current_span_data(),
"tags": ["streaming-test"],
},
}
)
return response
response = streaming_function("Why is tracking and evaluation of LLMs important?")
chunks = list(response)
```
## Support & Talk to Founders
- [Schedule Demo 👋](https://calendly.com/d/4mp-gd3-k5k/berriai-1-1-onboarding-litellm-hosted-version)
- [Community Discord 💭](https://discord.gg/wuPM9dRgDw)
- Our numbers 📞 +1 (770) 8783-106 / +1 (412) 618-6238
- Our emails ✉️ ishaan@berri.ai / krrish@berri.ai

Binary file not shown.

After

Width:  |  Height:  |  Size: 130 KiB

View file

@ -293,6 +293,7 @@ const sidebars = {
"observability/greenscale_integration",
"observability/supabase_integration",
`observability/telemetry`,
"observability/opik_integration",
],
},
{

View file

@ -52,6 +52,7 @@ _custom_logger_compatible_callbacks_literal = Literal[
"braintrust",
"arize",
"gcs_bucket",
"opik",
]
_known_custom_logger_compatible_callbacks: List = list(
get_args(_custom_logger_compatible_callbacks_literal)

View file

@ -0,0 +1,308 @@
"""
Opik Logger that logs LLM events to an Opik server
"""
from typing import Dict, List
import json
from litellm._logging import verbose_logger
import traceback
from .utils import (
get_opik_config_variable,
create_uuid7,
create_usage_object,
get_traces_and_spans_from_payload
)
import asyncio
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
_get_httpx_client,
httpxSpecialProvider,
)
from litellm.integrations.custom_batch_logger import CustomBatchLogger
class OpikLogger(CustomBatchLogger):
"""
Opik Logger for logging events to an Opik Server
"""
def __init__(self, **kwargs):
self.async_httpx_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
self.sync_httpx_client = _get_httpx_client()
self.opik_project_name = get_opik_config_variable(
"project_name",
user_value=kwargs.get("project_name", None),
default_value="Default Project"
)
opik_base_url = get_opik_config_variable(
"url_override",
user_value=kwargs.get("url", None),
default_value="https://www.comet.com/opik/api"
)
opik_api_key = get_opik_config_variable(
"api_key",
user_value=kwargs.get("api_key", None),
default_value=None
)
opik_workspace = get_opik_config_variable(
"workspace",
user_value=kwargs.get("workspace", None),
default_value=None
)
self.trace_url = f"{opik_base_url}/v1/private/traces/batch"
self.span_url = f"{opik_base_url}/v1/private/spans/batch"
self.headers = {}
if opik_workspace:
self.headers["Comet-Workspace"] = opik_workspace
if opik_api_key:
self.headers["authorization"] = opik_api_key
try:
asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock()
except Exception as e:
verbose_logger.debug(
f"OpikLogger - Asynchronous processing not initialized as we are not running in an async context"
)
self.flush_lock = None
super().__init__(**kwargs, flush_lock=self.flush_lock)
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
opik_payload = self._create_opik_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time
)
self.log_queue.extend(opik_payload)
verbose_logger.debug(f"OpikLogger added event to log_queue - Will flush in {self.flush_interval} seconds...")
if len(self.log_queue) >= self.batch_size:
verbose_logger.debug("OpikLogger - Flushing batch")
await self.flush_queue()
except Exception as e:
verbose_logger.exception(
f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}"
)
def _sync_send(self, url: str, headers: Dict[str, str], batch: List[Dict]):
try:
response = self.sync_httpx_client.post(
url=url,
headers=headers,
json=batch
)
response.raise_for_status()
if response.status_code != 204:
raise Exception(
f"Response from opik API status_code: {response.status_code}, text: {response.text}"
)
except Exception as e:
verbose_logger.exception(
f"OpikLogger failed to send batch - {str(e)}\n{traceback.format_exc()}"
)
def log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
opik_payload = self._create_opik_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time
)
traces, spans = get_traces_and_spans_from_payload(opik_payload)
if len(traces) > 0:
self._sync_send(self.trace_url, self.headers, {"traces": traces})
if len(spans) > 0:
self._sync_send(self.span_url, self.headers, {"spans": spans})
except Exception as e:
verbose_logger.exception(
f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}"
)
async def _submit_batch(self, url: str, headers: Dict[str, str], batch: List[Dict]):
try:
response = await self.async_httpx_client.post(
url=url,
headers=headers,
json=batch
)
response.raise_for_status()
if response.status_code >= 300:
verbose_logger.error(
f"OpikLogger - Error: {response.status_code} - {response.text}"
)
else:
verbose_logger.debug(
f"OpikLogger - {len(self.log_queue)} Opik events submitted"
)
except Exception as e:
verbose_logger.exception(
f"OpikLogger failed to send batch - {str(e)}\n{traceback.format_exc()}"
)
def _create_opik_headers(self):
headers = {}
if self.opik_workspace:
headers["Comet-Workspace"] = self.opik_workspace
if self.opik_api_key:
headers["authorization"] = self.opik_api_key
return headers
async def async_send_batch(self):
verbose_logger.exception("Calling async_send_batch")
if not self.log_queue:
return
# Split the log_queue into traces and spans
traces, spans = get_traces_and_spans_from_payload(self.log_queue)
# Send trace batch
if len(traces) > 0:
await self._submit_batch(self.trace_url, self.headers, {"traces": traces})
if len(spans) > 0:
await self._submit_batch(self.span_url, self.headers, {"spans": spans})
def _create_opik_payload(self, kwargs, response_obj, start_time, end_time) -> List[Dict]:
# Get metadata
_litellm_params = kwargs.get("litellm_params", {}) or {}
litellm_params_metadata = _litellm_params.get("metadata", {}) or {}
# Extract opik metadata
litellm_opik_metadata = litellm_params_metadata.get("opik", {})
verbose_logger.debug(f"litellm_opik_metadata - {json.dumps(litellm_opik_metadata, default=str)}")
project_name = litellm_opik_metadata.get("project_name", self.opik_project_name)
# Extract trace_id and parent_span_id
current_span_data = litellm_opik_metadata.get("current_span_data", None)
if isinstance(current_span_data, dict):
trace_id = current_span_data.get("trace_id", None)
parent_span_id = current_span_data.get("id", None)
elif current_span_data:
trace_id = current_span_data.trace_id
parent_span_id = current_span_data.id
else:
trace_id = None
parent_span_id = None
# Create Opik tags
opik_tags = litellm_opik_metadata.get("tags", [])
if kwargs.get("custom_llm_provider"):
opik_tags.append(kwargs["custom_llm_provider"])
# Use standard_logging_object to create metadata and input/output data
standard_logging_object = kwargs.get("standard_logging_object", None)
if standard_logging_object is None:
verbose_logger.debug("OpikLogger skipping event; no standard_logging_object found")
return []
# Create input and output data
input_data = standard_logging_object.get("messages", {})
output_data = standard_logging_object.get("response", {})
# Create usage object
usage = create_usage_object(response_obj["usage"])
# Define span and trace names
span_name = "%s_%s_%s" % (
response_obj.get("model", "unknown-model"),
response_obj.get("object", "unknown-object"),
response_obj.get("created", 0),
)
trace_name = response_obj.get("object", "unknown type")
# Create metadata object, we add the opik metadata first and then
# update it with the standard_logging_object metadata
metadata = litellm_opik_metadata
if "current_span_data" in metadata:
del metadata["current_span_data"]
metadata["created_from"] = "litellm"
metadata.update(standard_logging_object.get("metadata", {}))
if "call_type" in standard_logging_object:
metadata["type"] = standard_logging_object["call_type"]
if "status" in standard_logging_object:
metadata["status"] = standard_logging_object["status"]
if "response_cost" in kwargs:
metadata["cost"] = {
"total_tokens": kwargs["response_cost"],
"currency": "USD"
}
if "response_cost_failure_debug_info" in kwargs:
metadata["response_cost_failure_debug_info"] = kwargs["response_cost_failure_debug_info"]
if "model_map_information" in standard_logging_object:
metadata["model_map_information"] = standard_logging_object["model_map_information"]
if "model" in standard_logging_object:
metadata["model"] = standard_logging_object["model"]
if "model_id" in standard_logging_object:
metadata["model_id"] = standard_logging_object["model_id"]
if "model_group" in standard_logging_object:
metadata["model_group"] = standard_logging_object["model_group"]
if "api_base" in standard_logging_object:
metadata["api_base"] = standard_logging_object["api_base"]
if "cache_hit" in standard_logging_object:
metadata["cache_hit"] = standard_logging_object["cache_hit"]
if "saved_cache_cost" in standard_logging_object:
metadata["saved_cache_cost"] = standard_logging_object["saved_cache_cost"]
if "error_str" in standard_logging_object:
metadata["error_str"] = standard_logging_object["error_str"]
if "model_parameters" in standard_logging_object:
metadata["model_parameters"] = standard_logging_object["model_parameters"]
if "hidden_params" in standard_logging_object:
metadata["hidden_params"] = standard_logging_object["hidden_params"]
payload = []
if trace_id is None:
trace_id = create_uuid7()
verbose_logger.debug(f"OpikLogger creating payload for trace with id {trace_id}")
payload.append({
"project_name": project_name,
"id": trace_id,
"name": trace_name,
"start_time": start_time.isoformat() + "Z",
"end_time": end_time.isoformat() + "Z",
"input": input_data,
"output": output_data,
"metadata": metadata,
"tags": opik_tags,
})
span_id = create_uuid7()
verbose_logger.debug(f"OpikLogger creating payload for trace with id {trace_id} and span with id {span_id}")
payload.append({
"id": span_id,
"project_name": project_name,
"trace_id": trace_id,
"parent_span_id": parent_span_id,
"name": span_name,
"type": "llm",
"start_time": start_time.isoformat() + "Z",
"end_time": end_time.isoformat() + "Z",
"input": input_data,
"output": output_data,
"metadata": metadata,
"tags": opik_tags,
"usage": usage
})
verbose_logger.debug(f"Payload: {payload}")
return payload

View file

@ -0,0 +1,106 @@
import os
import time
from typing import Optional, Final, Dict, List
import configparser
from litellm.types.utils import ModelResponse
CONFIG_FILE_PATH_DEFAULT: Final[str] = "~/.opik.config"
def create_uuid7():
ns = time.time_ns()
last = [0, 0, 0, 0]
# Simple uuid7 implementation
sixteen_secs = 16_000_000_000
t1, rest1 = divmod(ns, sixteen_secs)
t2, rest2 = divmod(rest1 << 16, sixteen_secs)
t3, _ = divmod(rest2 << 12, sixteen_secs)
t3 |= 7 << 12 # Put uuid version in top 4 bits, which are 0 in t3
# The next two bytes are an int (t4) with two bits for
# the variant 2 and a 14 bit sequence counter which increments
# if the time is unchanged.
if t1 == last[0] and t2 == last[1] and t3 == last[2]:
# Stop the seq counter wrapping past 0x3FFF.
# This won't happen in practice, but if it does,
# uuids after the 16383rd with that same timestamp
# will not longer be correctly ordered but
# are still unique due to the 6 random bytes.
if last[3] < 0x3FFF:
last[3] += 1
else:
last[:] = (t1, t2, t3, 0)
t4 = (2 << 14) | last[3] # Put variant 0b10 in top two bits
# Six random bytes for the lower part of the uuid
rand = os.urandom(6)
return f"{t1:>08x}-{t2:>04x}-{t3:>04x}-{t4:>04x}-{rand.hex()}"
def _read_opik_config_file() -> Dict[str, str]:
config_path = os.path.expanduser(CONFIG_FILE_PATH_DEFAULT)
config = configparser.ConfigParser()
config.read(config_path)
config_values = {
section: dict(config.items(section)) for section in config.sections()
}
if "opik" in config_values:
return config_values["opik"]
return {}
def _get_env_variable(key: str) -> str:
env_prefix = "opik_"
return os.getenv((env_prefix + key).upper(), None)
def get_opik_config_variable(
key: str,
user_value: Optional[str] = None,
default_value: Optional[str] = None
) -> str:
"""
Get the configuration value of a variable, order priority is:
1. user provided value
2. environment variable
3. Opik configuration file
4. default value
"""
# Return user provided value if it is not None
if user_value is not None:
return user_value
# Return environment variable if it is not None
env_value = _get_env_variable(key)
if env_value is not None:
return env_value
# Return value from Opik configuration file if it is not None
config_values = _read_opik_config_file()
if key in config_values:
return config_values[key]
# Return default value if it is not None
return default_value
def create_usage_object(usage):
usage_dict = {}
if usage.completion_tokens is not None:
usage_dict["completion_tokens"] = usage.completion_tokens
if usage.prompt_tokens is not None:
usage_dict["prompt_tokens"] = usage.prompt_tokens
if usage.total_tokens is not None:
usage_dict["total_tokens"] = usage.total_tokens
return usage_dict
def _remove_nulls(x):
x_ = {k:v for k,v in x.items() if v is not None}
return x_
def get_traces_and_spans_from_payload(payload: List):
traces = [_remove_nulls(x) for x in payload if "type" not in x]
spans = [_remove_nulls(x) for x in payload if "type" in x]
return traces, spans

View file

@ -0,0 +1,175 @@
import io
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
import asyncio
import logging
import pytest
import litellm
from litellm._logging import verbose_logger
from unittest.mock import AsyncMock, Mock
verbose_logger.setLevel(logging.DEBUG)
litellm.set_verbose = True
import time
@pytest.mark.asyncio
async def test_opik_logging_http_request():
"""
- Test that HTTP requests are made to Opik
- Traces and spans are batched correctly
"""
try:
from litellm.integrations.opik.opik import OpikLogger
os.environ["OPIK_URL_OVERRIDE"] = "https://fake.comet.com/opik/api"
os.environ["OPIK_API_KEY"] = "anything"
os.environ["OPIK_WORKSPACE"] = "anything"
# Initialize OpikLogger
test_opik_logger = OpikLogger()
litellm.callbacks = [test_opik_logger]
test_opik_logger.batch_size = 12
litellm.set_verbose = True
# Create a mock for the async_client's post method
mock_post = AsyncMock()
mock_post.return_value.status_code = 202
mock_post.return_value.text = "Accepted"
test_opik_logger.async_httpx_client.post = mock_post
# Make multiple calls to ensure we don't hit the batch size
for _ in range(5):
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
)
await asyncio.sleep(1)
# Check batching of events and that the queue contains 5 trace events and 5 span events
assert mock_post.called == False, "HTTP request was made but events should have been batched"
assert len(test_opik_logger.log_queue) == 10
# Now make calls to exceed the batch size
for _ in range(3):
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
)
# Wait a short time for any asynchronous operations to complete
await asyncio.sleep(1)
# Check that the queue was flushed after exceeding batch size
assert len(test_opik_logger.log_queue) < test_opik_logger.batch_size
# Check that the data has been sent when it goes above the flush interval
await asyncio.sleep(test_opik_logger.flush_interval)
assert len(test_opik_logger.log_queue) == 0
# Clean up
for cb in litellm.callbacks:
if isinstance(cb, OpikLogger):
await cb.async_httpx_client.client.aclose()
except Exception as e:
pytest.fail(f"Error occurred: {e}")
def test_sync_opik_logging_http_request():
"""
- Test that HTTP requests are made to Opik
- Traces and spans are batched correctly
"""
try:
from litellm.integrations.opik.opik import OpikLogger
os.environ["OPIK_URL_OVERRIDE"] = "https://fake.comet.com/opik/api"
os.environ["OPIK_API_KEY"] = "anything"
os.environ["OPIK_WORKSPACE"] = "anything"
# Initialize OpikLogger
test_opik_logger = OpikLogger()
litellm.callbacks = [test_opik_logger]
litellm.set_verbose = True
# Create a mock for the clients's post method
mock_post = Mock()
mock_post.return_value.status_code = 204
mock_post.return_value.text = "Accepted"
test_opik_logger.sync_httpx_client.post = mock_post
# Make multiple calls to ensure we don't hit the batch size
for _ in range(5):
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
)
# Need to wait for a short amount of time as the log_success callback is called in a different thread
time.sleep(1)
# Check that 5 spans and 5 traces were sent
assert mock_post.call_count == 10, f"Expected 10 HTTP requests, but got {mock_post.call_count}"
except Exception as e:
pytest.fail(f"Error occurred: {e}")
@pytest.mark.asyncio
@pytest.mark.skip(reason="local-only test, to test if everything works fine.")
async def test_opik_logging():
try:
from litellm.integrations.opik.opik import OpikLogger
# Initialize OpikLogger
test_opik_logger = OpikLogger()
litellm.callbacks = [test_opik_logger]
litellm.set_verbose = True
# Log a chat completion call
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "What LLM are you ?"}],
max_tokens=10,
temperature=0.2,
metadata={"opik": {"custom_field": "custom_value"}}
)
print("Non-streaming response:", response)
# Log a streaming completion call
stream_response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Stream = True - What llm are you ?"}],
max_tokens=10,
temperature=0.2,
stream=True,
metadata={"opik": {"custom_field": "custom_value"}}
)
print("Streaming response:")
async for chunk in stream_response:
print(chunk.choices[0].delta.content, end='', flush=True)
print() # New line after streaming response
await asyncio.sleep(2)
assert len(test_opik_logger.log_queue) == 4
await asyncio.sleep(test_opik_logger.flush_interval + 1)
assert len(test_opik_logger.log_queue) == 0
except Exception as e:
pytest.fail(f"Error occurred: {e}")