diff --git a/docs/my-website/docs/observability/opik_integration.md b/docs/my-website/docs/observability/opik_integration.md new file mode 100644 index 000000000..d8075c70e --- /dev/null +++ b/docs/my-website/docs/observability/opik_integration.md @@ -0,0 +1,95 @@ +import Image from '@theme/IdealImage'; + +# Comet Opik - Logging + Evals +Opik is an open source end-to-end [LLM Evaluation Platform](https://www.comet.com/site/products/opik/?utm_source=litelllm&utm_medium=docs&utm_content=intro_paragraph) that helps developers track their LLM prompts and responses during both development and production. Users can define and run evaluations to test their LLMs apps before deployment to check for hallucinations, accuracy, context retrevial, and more! + + + + +:::info +We want to learn how we can make the callbacks better! Meet the LiteLLM [founders](https://calendly.com/d/4mp-gd3-k5k/berriai-1-1-onboarding-litellm-hosted-version) or +join our [discord](https://discord.gg/wuPM9dRgDw) +::: + +## Pre-Requisites + +You can learn more about setting up Opik in the [Opik quickstart guide](https://www.comet.com/docs/opik/quickstart/). You can also learn more about self-hosting Opik in our [self-hosting guide](https://www.comet.com/docs/opik/self-host/local_deployment). + +## Quick Start +Use just 4 lines of code, to instantly log your responses **across all providers** with Opik + +Get your Opik API Key by signing up [here](https://www.comet.com/signup?utm_source=litelllm&utm_medium=docs&utm_content=api_key_cell)! + +```python +from litellm.integrations.opik.opik import OpikLogger +import litellm + +opik_logger = OpikLogger() +litellm.callbacks = [opik_logger] +``` + +Full examples: + +```python +from litellm.integrations.opik.opik import OpikLogger +import litellm +import os + +# Configure the Opik API key or call opik.configure() +os.environ["OPIK_API_KEY"] = "" +os.environ["OPIK_WORKSPACE"] = "" + +# LLM provider API Keys: +os.environ["OPENAI_API_KEY"] = "" + +# set "opik" as a callback, litellm will send the data to an Opik server (such as comet.com) +opik_logger = OpikLogger() +litellm.callbacks = [opik_logger] + +# openai call +response = litellm.completion( + model="gpt-3.5-turbo", + messages=[ + {"role": "user", "content": "Why is tracking and evaluation of LLMs important?"} + ] +) +``` + +If you are liteLLM within a function tracked using Opik's `@track` decorator, +you will need provide the `current_span_data` field in the metadata attribute +so that the LLM call is assigned to the correct trace: + +```python +from opik import track +from opik.opik_context import get_current_span_data +from litellm.integrations.opik.opik import OpikLogger +import litellm + +opik_logger = OpikLogger() +litellm.callbacks = [opik_logger] + +@track() +def streaming_function(input): + messages = [{"role": "user", "content": input}] + response = litellm.completion( + model="gpt-3.5-turbo", + messages=messages, + metadata = { + "opik": { + "current_span_data": get_current_span_data(), + "tags": ["streaming-test"], + }, + } + ) + return response + +response = streaming_function("Why is tracking and evaluation of LLMs important?") +chunks = list(response) +``` + +## Support & Talk to Founders + +- [Schedule Demo 👋](https://calendly.com/d/4mp-gd3-k5k/berriai-1-1-onboarding-litellm-hosted-version) +- [Community Discord 💭](https://discord.gg/wuPM9dRgDw) +- Our numbers 📞 +1 (770) 8783-106 / ‭+1 (412) 618-6238‬ +- Our emails ✉️ ishaan@berri.ai / krrish@berri.ai diff --git a/docs/my-website/img/opik.png b/docs/my-website/img/opik.png new file mode 100644 index 000000000..d56195c5d Binary files /dev/null and b/docs/my-website/img/opik.png differ diff --git a/docs/my-website/sidebars.js b/docs/my-website/sidebars.js index 6389db9a2..0ab40a1c5 100644 --- a/docs/my-website/sidebars.js +++ b/docs/my-website/sidebars.js @@ -293,6 +293,7 @@ const sidebars = { "observability/greenscale_integration", "observability/supabase_integration", `observability/telemetry`, + "observability/opik_integration", ], }, { diff --git a/litellm/__init__.py b/litellm/__init__.py index 2f8ae7cde..55276570b 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -52,6 +52,7 @@ _custom_logger_compatible_callbacks_literal = Literal[ "braintrust", "arize", "gcs_bucket", + "opik", ] _known_custom_logger_compatible_callbacks: List = list( get_args(_custom_logger_compatible_callbacks_literal) diff --git a/litellm/integrations/opik/opik.py b/litellm/integrations/opik/opik.py new file mode 100644 index 000000000..5cb15b5bc --- /dev/null +++ b/litellm/integrations/opik/opik.py @@ -0,0 +1,308 @@ +""" +Opik Logger that logs LLM events to an Opik server +""" + +from typing import Dict, List +import json + +from litellm._logging import verbose_logger +import traceback + +from .utils import ( + get_opik_config_variable, + create_uuid7, + create_usage_object, + get_traces_and_spans_from_payload +) + +import asyncio +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, + _get_httpx_client, + httpxSpecialProvider, +) + +from litellm.integrations.custom_batch_logger import CustomBatchLogger + +class OpikLogger(CustomBatchLogger): + """ + Opik Logger for logging events to an Opik Server + """ + + def __init__(self, **kwargs): + self.async_httpx_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.sync_httpx_client = _get_httpx_client() + + self.opik_project_name = get_opik_config_variable( + "project_name", + user_value=kwargs.get("project_name", None), + default_value="Default Project" + ) + + opik_base_url = get_opik_config_variable( + "url_override", + user_value=kwargs.get("url", None), + default_value="https://www.comet.com/opik/api" + ) + opik_api_key = get_opik_config_variable( + "api_key", + user_value=kwargs.get("api_key", None), + default_value=None + ) + opik_workspace = get_opik_config_variable( + "workspace", + user_value=kwargs.get("workspace", None), + default_value=None + ) + + self.trace_url = f"{opik_base_url}/v1/private/traces/batch" + self.span_url = f"{opik_base_url}/v1/private/spans/batch" + + self.headers = {} + if opik_workspace: + self.headers["Comet-Workspace"] = opik_workspace + + if opik_api_key: + self.headers["authorization"] = opik_api_key + + try: + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + except Exception as e: + verbose_logger.debug( + f"OpikLogger - Asynchronous processing not initialized as we are not running in an async context" + ) + self.flush_lock = None + + super().__init__(**kwargs, flush_lock=self.flush_lock) + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + opik_payload = self._create_opik_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time + ) + + self.log_queue.extend(opik_payload) + verbose_logger.debug(f"OpikLogger added event to log_queue - Will flush in {self.flush_interval} seconds...") + + if len(self.log_queue) >= self.batch_size: + verbose_logger.debug("OpikLogger - Flushing batch") + await self.flush_queue() + except Exception as e: + verbose_logger.exception( + f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}" + ) + + def _sync_send(self, url: str, headers: Dict[str, str], batch: List[Dict]): + try: + response = self.sync_httpx_client.post( + url=url, + headers=headers, + json=batch + ) + response.raise_for_status() + if response.status_code != 204: + raise Exception( + f"Response from opik API status_code: {response.status_code}, text: {response.text}" + ) + except Exception as e: + verbose_logger.exception( + f"OpikLogger failed to send batch - {str(e)}\n{traceback.format_exc()}" + ) + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + opik_payload = self._create_opik_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time + ) + + traces, spans = get_traces_and_spans_from_payload(opik_payload) + if len(traces) > 0: + self._sync_send(self.trace_url, self.headers, {"traces": traces}) + if len(spans) > 0: + self._sync_send(self.span_url, self.headers, {"spans": spans}) + except Exception as e: + verbose_logger.exception( + f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}" + ) + + async def _submit_batch(self, url: str, headers: Dict[str, str], batch: List[Dict]): + try: + response = await self.async_httpx_client.post( + url=url, + headers=headers, + json=batch + ) + response.raise_for_status() + + if response.status_code >= 300: + verbose_logger.error( + f"OpikLogger - Error: {response.status_code} - {response.text}" + ) + else: + verbose_logger.debug( + f"OpikLogger - {len(self.log_queue)} Opik events submitted" + ) + except Exception as e: + verbose_logger.exception( + f"OpikLogger failed to send batch - {str(e)}\n{traceback.format_exc()}" + ) + + def _create_opik_headers(self): + headers = {} + if self.opik_workspace: + headers["Comet-Workspace"] = self.opik_workspace + + if self.opik_api_key: + headers["authorization"] = self.opik_api_key + return headers + + + async def async_send_batch(self): + verbose_logger.exception("Calling async_send_batch") + if not self.log_queue: + return + + # Split the log_queue into traces and spans + traces, spans = get_traces_and_spans_from_payload(self.log_queue) + + # Send trace batch + if len(traces) > 0: + await self._submit_batch(self.trace_url, self.headers, {"traces": traces}) + if len(spans) > 0: + await self._submit_batch(self.span_url, self.headers, {"spans": spans}) + + + def _create_opik_payload(self, kwargs, response_obj, start_time, end_time) -> List[Dict]: + + + # Get metadata + _litellm_params = kwargs.get("litellm_params", {}) or {} + litellm_params_metadata = _litellm_params.get("metadata", {}) or {} + + # Extract opik metadata + litellm_opik_metadata = litellm_params_metadata.get("opik", {}) + verbose_logger.debug(f"litellm_opik_metadata - {json.dumps(litellm_opik_metadata, default=str)}") + project_name = litellm_opik_metadata.get("project_name", self.opik_project_name) + + # Extract trace_id and parent_span_id + current_span_data = litellm_opik_metadata.get("current_span_data", None) + if isinstance(current_span_data, dict): + trace_id = current_span_data.get("trace_id", None) + parent_span_id = current_span_data.get("id", None) + elif current_span_data: + trace_id = current_span_data.trace_id + parent_span_id = current_span_data.id + else: + trace_id = None + parent_span_id = None + # Create Opik tags + opik_tags = litellm_opik_metadata.get("tags", []) + if kwargs.get("custom_llm_provider"): + opik_tags.append(kwargs["custom_llm_provider"]) + + # Use standard_logging_object to create metadata and input/output data + standard_logging_object = kwargs.get("standard_logging_object", None) + if standard_logging_object is None: + verbose_logger.debug("OpikLogger skipping event; no standard_logging_object found") + return [] + + # Create input and output data + input_data = standard_logging_object.get("messages", {}) + output_data = standard_logging_object.get("response", {}) + + # Create usage object + usage = create_usage_object(response_obj["usage"]) + + # Define span and trace names + span_name = "%s_%s_%s" % ( + response_obj.get("model", "unknown-model"), + response_obj.get("object", "unknown-object"), + response_obj.get("created", 0), + ) + trace_name = response_obj.get("object", "unknown type") + + # Create metadata object, we add the opik metadata first and then + # update it with the standard_logging_object metadata + metadata = litellm_opik_metadata + if "current_span_data" in metadata: + del metadata["current_span_data"] + metadata["created_from"] = "litellm" + + metadata.update(standard_logging_object.get("metadata", {})) + if "call_type" in standard_logging_object: + metadata["type"] = standard_logging_object["call_type"] + if "status" in standard_logging_object: + metadata["status"] = standard_logging_object["status"] + if "response_cost" in kwargs: + metadata["cost"] = { + "total_tokens": kwargs["response_cost"], + "currency": "USD" + } + if "response_cost_failure_debug_info" in kwargs: + metadata["response_cost_failure_debug_info"] = kwargs["response_cost_failure_debug_info"] + if "model_map_information" in standard_logging_object: + metadata["model_map_information"] = standard_logging_object["model_map_information"] + if "model" in standard_logging_object: + metadata["model"] = standard_logging_object["model"] + if "model_id" in standard_logging_object: + metadata["model_id"] = standard_logging_object["model_id"] + if "model_group" in standard_logging_object: + metadata["model_group"] = standard_logging_object["model_group"] + if "api_base" in standard_logging_object: + metadata["api_base"] = standard_logging_object["api_base"] + if "cache_hit" in standard_logging_object: + metadata["cache_hit"] = standard_logging_object["cache_hit"] + if "saved_cache_cost" in standard_logging_object: + metadata["saved_cache_cost"] = standard_logging_object["saved_cache_cost"] + if "error_str" in standard_logging_object: + metadata["error_str"] = standard_logging_object["error_str"] + if "model_parameters" in standard_logging_object: + metadata["model_parameters"] = standard_logging_object["model_parameters"] + if "hidden_params" in standard_logging_object: + metadata["hidden_params"] = standard_logging_object["hidden_params"] + + payload = [] + if trace_id is None: + trace_id = create_uuid7() + verbose_logger.debug(f"OpikLogger creating payload for trace with id {trace_id}") + + payload.append({ + "project_name": project_name, + "id": trace_id, + "name": trace_name, + "start_time": start_time.isoformat() + "Z", + "end_time": end_time.isoformat() + "Z", + "input": input_data, + "output": output_data, + "metadata": metadata, + "tags": opik_tags, + }) + + span_id = create_uuid7() + verbose_logger.debug(f"OpikLogger creating payload for trace with id {trace_id} and span with id {span_id}") + payload.append({ + "id": span_id, + "project_name": project_name, + "trace_id": trace_id, + "parent_span_id": parent_span_id, + "name": span_name, + "type": "llm", + "start_time": start_time.isoformat() + "Z", + "end_time": end_time.isoformat() + "Z", + "input": input_data, + "output": output_data, + "metadata": metadata, + "tags": opik_tags, + "usage": usage + }) + verbose_logger.debug(f"Payload: {payload}") + return payload diff --git a/litellm/integrations/opik/utils.py b/litellm/integrations/opik/utils.py new file mode 100644 index 000000000..39810de1f --- /dev/null +++ b/litellm/integrations/opik/utils.py @@ -0,0 +1,106 @@ +import os +import time +from typing import Optional, Final, Dict, List +import configparser +from litellm.types.utils import ModelResponse + +CONFIG_FILE_PATH_DEFAULT: Final[str] = "~/.opik.config" + +def create_uuid7(): + ns = time.time_ns() + last = [0, 0, 0, 0] + + # Simple uuid7 implementation + sixteen_secs = 16_000_000_000 + t1, rest1 = divmod(ns, sixteen_secs) + t2, rest2 = divmod(rest1 << 16, sixteen_secs) + t3, _ = divmod(rest2 << 12, sixteen_secs) + t3 |= 7 << 12 # Put uuid version in top 4 bits, which are 0 in t3 + + # The next two bytes are an int (t4) with two bits for + # the variant 2 and a 14 bit sequence counter which increments + # if the time is unchanged. + if t1 == last[0] and t2 == last[1] and t3 == last[2]: + # Stop the seq counter wrapping past 0x3FFF. + # This won't happen in practice, but if it does, + # uuids after the 16383rd with that same timestamp + # will not longer be correctly ordered but + # are still unique due to the 6 random bytes. + if last[3] < 0x3FFF: + last[3] += 1 + else: + last[:] = (t1, t2, t3, 0) + t4 = (2 << 14) | last[3] # Put variant 0b10 in top two bits + + # Six random bytes for the lower part of the uuid + rand = os.urandom(6) + return f"{t1:>08x}-{t2:>04x}-{t3:>04x}-{t4:>04x}-{rand.hex()}" + +def _read_opik_config_file() -> Dict[str, str]: + config_path = os.path.expanduser(CONFIG_FILE_PATH_DEFAULT) + + config = configparser.ConfigParser() + config.read(config_path) + + config_values = { + section: dict(config.items(section)) for section in config.sections() + } + + if "opik" in config_values: + return config_values["opik"] + + return {} + +def _get_env_variable(key: str) -> str: + env_prefix = "opik_" + return os.getenv((env_prefix + key).upper(), None) + +def get_opik_config_variable( + key: str, + user_value: Optional[str] = None, + default_value: Optional[str] = None + ) -> str: + """ + Get the configuration value of a variable, order priority is: + 1. user provided value + 2. environment variable + 3. Opik configuration file + 4. default value + """ + # Return user provided value if it is not None + if user_value is not None: + return user_value + + # Return environment variable if it is not None + env_value = _get_env_variable(key) + if env_value is not None: + return env_value + + # Return value from Opik configuration file if it is not None + config_values = _read_opik_config_file() + + if key in config_values: + return config_values[key] + + # Return default value if it is not None + return default_value + +def create_usage_object(usage): + usage_dict = {} + + if usage.completion_tokens is not None: + usage_dict["completion_tokens"] = usage.completion_tokens + if usage.prompt_tokens is not None: + usage_dict["prompt_tokens"] = usage.prompt_tokens + if usage.total_tokens is not None: + usage_dict["total_tokens"] = usage.total_tokens + return usage_dict + +def _remove_nulls(x): + x_ = {k:v for k,v in x.items() if v is not None} + return x_ + +def get_traces_and_spans_from_payload(payload: List): + traces = [_remove_nulls(x) for x in payload if "type" not in x] + spans = [_remove_nulls(x) for x in payload if "type" in x] + return traces, spans diff --git a/tests/local_testing/test_opik.py b/tests/local_testing/test_opik.py new file mode 100644 index 000000000..62c17fbb2 --- /dev/null +++ b/tests/local_testing/test_opik.py @@ -0,0 +1,175 @@ +import io +import os +import sys + +sys.path.insert(0, os.path.abspath("../..")) + +import asyncio +import logging + +import pytest + +import litellm +from litellm._logging import verbose_logger +from unittest.mock import AsyncMock, Mock + +verbose_logger.setLevel(logging.DEBUG) + +litellm.set_verbose = True +import time + +@pytest.mark.asyncio +async def test_opik_logging_http_request(): + """ + - Test that HTTP requests are made to Opik + - Traces and spans are batched correctly + """ + try: + from litellm.integrations.opik.opik import OpikLogger + + os.environ["OPIK_URL_OVERRIDE"] = "https://fake.comet.com/opik/api" + os.environ["OPIK_API_KEY"] = "anything" + os.environ["OPIK_WORKSPACE"] = "anything" + + # Initialize OpikLogger + test_opik_logger = OpikLogger() + + litellm.callbacks = [test_opik_logger] + test_opik_logger.batch_size = 12 + litellm.set_verbose = True + + # Create a mock for the async_client's post method + mock_post = AsyncMock() + mock_post.return_value.status_code = 202 + mock_post.return_value.text = "Accepted" + test_opik_logger.async_httpx_client.post = mock_post + + # Make multiple calls to ensure we don't hit the batch size + for _ in range(5): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + await asyncio.sleep(1) + + # Check batching of events and that the queue contains 5 trace events and 5 span events + assert mock_post.called == False, "HTTP request was made but events should have been batched" + assert len(test_opik_logger.log_queue) == 10 + + # Now make calls to exceed the batch size + for _ in range(3): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + # Wait a short time for any asynchronous operations to complete + await asyncio.sleep(1) + + # Check that the queue was flushed after exceeding batch size + assert len(test_opik_logger.log_queue) < test_opik_logger.batch_size + + # Check that the data has been sent when it goes above the flush interval + await asyncio.sleep(test_opik_logger.flush_interval) + assert len(test_opik_logger.log_queue) == 0 + + # Clean up + for cb in litellm.callbacks: + if isinstance(cb, OpikLogger): + await cb.async_httpx_client.client.aclose() + + except Exception as e: + pytest.fail(f"Error occurred: {e}") + +def test_sync_opik_logging_http_request(): + """ + - Test that HTTP requests are made to Opik + - Traces and spans are batched correctly + """ + try: + from litellm.integrations.opik.opik import OpikLogger + + os.environ["OPIK_URL_OVERRIDE"] = "https://fake.comet.com/opik/api" + os.environ["OPIK_API_KEY"] = "anything" + os.environ["OPIK_WORKSPACE"] = "anything" + + # Initialize OpikLogger + test_opik_logger = OpikLogger() + + litellm.callbacks = [test_opik_logger] + litellm.set_verbose = True + + # Create a mock for the clients's post method + mock_post = Mock() + mock_post.return_value.status_code = 204 + mock_post.return_value.text = "Accepted" + test_opik_logger.sync_httpx_client.post = mock_post + + # Make multiple calls to ensure we don't hit the batch size + for _ in range(5): + response = litellm.completion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + # Need to wait for a short amount of time as the log_success callback is called in a different thread + time.sleep(1) + + # Check that 5 spans and 5 traces were sent + assert mock_post.call_count == 10, f"Expected 10 HTTP requests, but got {mock_post.call_count}" + + except Exception as e: + pytest.fail(f"Error occurred: {e}") + +@pytest.mark.asyncio +@pytest.mark.skip(reason="local-only test, to test if everything works fine.") +async def test_opik_logging(): + try: + from litellm.integrations.opik.opik import OpikLogger + + # Initialize OpikLogger + test_opik_logger = OpikLogger() + litellm.callbacks = [test_opik_logger] + litellm.set_verbose = True + + # Log a chat completion call + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "What LLM are you ?"}], + max_tokens=10, + temperature=0.2, + metadata={"opik": {"custom_field": "custom_value"}} + ) + print("Non-streaming response:", response) + + # Log a streaming completion call + stream_response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Stream = True - What llm are you ?"}], + max_tokens=10, + temperature=0.2, + stream=True, + metadata={"opik": {"custom_field": "custom_value"}} + ) + print("Streaming response:") + async for chunk in stream_response: + print(chunk.choices[0].delta.content, end='', flush=True) + print() # New line after streaming response + + await asyncio.sleep(2) + + assert len(test_opik_logger.log_queue) == 4 + + await asyncio.sleep(test_opik_logger.flush_interval + 1) + assert len(test_opik_logger.log_queue) == 0 + except Exception as e: + pytest.fail(f"Error occurred: {e}")