Merge pull request #4567 from BerriAI/litellm_add_galileo_logging

[Feat] Add Galileo Logging Callback
This commit is contained in:
Ishaan Jaff 2024-07-05 19:55:30 -07:00 committed by GitHub
commit 982dfe64c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 237 additions and 0 deletions

View file

@ -7,10 +7,13 @@ import TabItem from '@theme/TabItem';
Log Proxy Input, Output, Exceptions using Langfuse, OpenTelemetry, Custom Callbacks, DataDog, DynamoDB, s3 Bucket Log Proxy Input, Output, Exceptions using Langfuse, OpenTelemetry, Custom Callbacks, DataDog, DynamoDB, s3 Bucket
## Table of Contents
- [Logging to Langfuse](#logging-proxy-inputoutput---langfuse) - [Logging to Langfuse](#logging-proxy-inputoutput---langfuse)
- [Logging with OpenTelemetry (OpenTelemetry)](#logging-proxy-inputoutput-in-opentelemetry-format) - [Logging with OpenTelemetry (OpenTelemetry)](#logging-proxy-inputoutput-in-opentelemetry-format)
- [Async Custom Callbacks](#custom-callback-class-async) - [Async Custom Callbacks](#custom-callback-class-async)
- [Async Custom Callback APIs](#custom-callback-apis-async) - [Async Custom Callback APIs](#custom-callback-apis-async)
- [Logging to Galileo](#logging-llm-io-to-galielo)
- [Logging to OpenMeter](#logging-proxy-inputoutput---langfuse) - [Logging to OpenMeter](#logging-proxy-inputoutput---langfuse)
- [Logging to s3 Buckets](#logging-proxy-inputoutput---s3-buckets) - [Logging to s3 Buckets](#logging-proxy-inputoutput---s3-buckets)
- [Logging to DataDog](#logging-proxy-inputoutput---datadog) - [Logging to DataDog](#logging-proxy-inputoutput---datadog)
@ -1056,6 +1059,67 @@ litellm_settings:
Start the LiteLLM Proxy and make a test request to verify the logs reached your callback API Start the LiteLLM Proxy and make a test request to verify the logs reached your callback API
## [Beta] Logging LLM I/O to Galileo
Log LLM I/O on [www.rungalileo.io](https://www.rungalileo.io/)
:::info
Beta Integration
:::
**Required Env Variables**
```bash
export GALILEO_BASE_URL="" # For most users, this is the same as their console URL except with the word 'console' replaced by 'api' (e.g. http://www.console.galileo.myenterprise.com -> http://www.api.galileo.myenterprise.com)
export GALILEO_PROJECT_ID=""
export GALILEO_USERNAME=""
export GALILEO_PASSWORD=""
```
### Quick Start
1. Add to Config.yaml
```yaml
model_list:
- litellm_params:
api_base: https://exampleopenaiendpoint-production.up.railway.app/
api_key: my-fake-key
model: openai/my-fake-model
model_name: fake-openai-endpoint
litellm_settings:
success_callback: ["galileo"] # 👈 KEY CHANGE
```
2. Start Proxy
```
litellm --config /path/to/config.yaml
```
3. Test it!
```bash
curl --location 'http://0.0.0.0:4000/chat/completions' \
--header 'Content-Type: application/json' \
--data ' {
"model": "fake-openai-endpoint",
"messages": [
{
"role": "user",
"content": "what llm are you"
}
],
}
'
```
🎉 That's it - Expect to see your Logs on your Galileo Dashboard
## Logging Proxy Cost + Usage - OpenMeter ## Logging Proxy Cost + Usage - OpenMeter
Bill customers according to their LLM API usage with [OpenMeter](../observability/openmeter.md) Bill customers according to their LLM API usage with [OpenMeter](../observability/openmeter.md)

View file

@ -0,0 +1,134 @@
import os
from datetime import datetime
from typing import List
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.types.galileo import LLMResponse
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
class GalileoObserve(CustomLogger):
def __init__(self) -> None:
self.in_memory_records: List[dict] = []
self.batch_size = 1
self.base_url = os.getenv("GALILEO_BASE_URL", None)
self.project_id = os.getenv("GALILEO_PROJECT_ID", None)
self.headers = None
self.async_httpx_handler = AsyncHTTPHandler()
pass
def set_galileo_headers(self):
# following https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#logging-your-records
headers = {
"accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
}
galileo_login_response = self.async_httpx_handler.post(
url=f"{self.base_url}/login",
headers=headers,
data={
"username": os.getenv("GALILEO_USERNAME"),
"password": os.getenv("GALILEO_PASSWORD"),
},
)
access_token = galileo_login_response.json()["access_token"]
self.headers = {
"accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}
def get_output_str_from_response(self, response_obj, kwargs):
output = None
if response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
):
output = None
elif response_obj is not None and isinstance(
response_obj, litellm.ModelResponse
):
output = response_obj["choices"][0]["message"].json()
elif response_obj is not None and isinstance(
response_obj, litellm.TextCompletionResponse
):
output = response_obj.choices[0].text
elif response_obj is not None and isinstance(
response_obj, litellm.ImageResponse
):
output = response_obj["data"]
return output
async def async_log_success_event(
self,
kwargs,
start_time,
end_time,
response_obj,
):
verbose_logger.debug(f"On Async Success")
_latency_ms = int((end_time - start_time).total_seconds() * 1000)
_call_type = kwargs.get("call_type", "litellm")
input_text = litellm.utils.get_formatted_prompt(
data=kwargs, call_type=_call_type
)
_usage = response_obj.get("usage", {}) or {}
num_input_tokens = _usage.get("prompt_tokens", 0)
num_output_tokens = _usage.get("completion_tokens", 0)
output_text = self.get_output_str_from_response(
response_obj=response_obj, kwargs=kwargs
)
request_record = LLMResponse(
latency_ms=_latency_ms,
status_code=200,
input_text=input_text,
output_text=output_text,
node_type=_call_type,
model=kwargs.get("model", "-"),
num_input_tokens=num_input_tokens,
num_output_tokens=num_output_tokens,
created_at=start_time.strftime(
"%Y-%m-%dT%H:%M:%S"
), # timestamp str constructed in "%Y-%m-%dT%H:%M:%S" format
)
# dump to dict
request_dict = request_record.model_dump()
self.in_memory_records.append(request_dict)
if len(self.in_memory_records) >= self.batch_size:
await self.flush_in_memory_records()
async def flush_in_memory_records(self):
verbose_logger.debug("flushing in memory records")
response = await self.async_httpx_handler.post(
url=f"{self.base_url}/projects/{self.project_id}/observe/ingest",
headers=self.headers,
json={"records": self.in_memory_records},
)
if response.status_code == 200:
verbose_logger.debug(
"Galileo Logger:successfully flushed in memory records"
)
self.in_memory_records = []
else:
verbose_logger.debug("Galileo Logger: failed to flush in memory records")
verbose_logger.debug(
"Galileo Logger error=%s, status code=%s",
response.text,
response.status_code,
)
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
verbose_logger.debug(f"On Async Failure")

View file

@ -0,0 +1,25 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
# from here: https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#structuring-your-records
class LLMResponse(BaseModel):
latency_ms: int
status_code: int
input_text: str
output_text: str
node_type: str
model: str
num_input_tokens: int
num_output_tokens: int
output_logprobs: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional. When available, logprobs are used to compute Uncertainty.",
)
created_at: str = Field(
..., description='timestamp constructed in "%Y-%m-%dT%H:%M:%S" format'
)
tags: Optional[List[str]] = None
user_metadata: Optional[Dict[str, Any]] = None

View file

@ -56,6 +56,7 @@ from ..integrations.clickhouse import ClickhouseLogger
from ..integrations.custom_logger import CustomLogger from ..integrations.custom_logger import CustomLogger
from ..integrations.datadog import DataDogLogger from ..integrations.datadog import DataDogLogger
from ..integrations.dynamodb import DyanmoDBLogger from ..integrations.dynamodb import DyanmoDBLogger
from ..integrations.galileo import GalileoObserve
from ..integrations.greenscale import GreenscaleLogger from ..integrations.greenscale import GreenscaleLogger
from ..integrations.helicone import HeliconeLogger from ..integrations.helicone import HeliconeLogger
from ..integrations.lago import LagoLogger from ..integrations.lago import LagoLogger
@ -1925,6 +1926,15 @@ def _init_custom_logger_compatible_class(
_openmeter_logger = OpenMeterLogger() _openmeter_logger = OpenMeterLogger()
_in_memory_loggers.append(_openmeter_logger) _in_memory_loggers.append(_openmeter_logger)
return _openmeter_logger # type: ignore return _openmeter_logger # type: ignore
elif logging_integration == "galileo":
for callback in _in_memory_loggers:
if isinstance(callback, GalileoObserve):
return callback # type: ignore
galileo_logger = GalileoObserve()
_in_memory_loggers.append(galileo_logger)
return galileo_logger # type: ignore
elif logging_integration == "logfire": elif logging_integration == "logfire":
if "LOGFIRE_TOKEN" not in os.environ: if "LOGFIRE_TOKEN" not in os.environ:
raise ValueError("LOGFIRE_TOKEN not found in environment variables") raise ValueError("LOGFIRE_TOKEN not found in environment variables")
@ -1981,6 +1991,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers: for callback in _in_memory_loggers:
if isinstance(callback, OpenMeterLogger): if isinstance(callback, OpenMeterLogger):
return callback return callback
elif logging_integration == "galileo":
for callback in _in_memory_loggers:
if isinstance(callback, GalileoObserve):
return callback
elif logging_integration == "logfire": elif logging_integration == "logfire":
if "LOGFIRE_TOKEN" not in os.environ: if "LOGFIRE_TOKEN" not in os.environ:
raise ValueError("LOGFIRE_TOKEN not found in environment variables") raise ValueError("LOGFIRE_TOKEN not found in environment variables")