Litellm Minor Fixes & Improvements (10/03/2024) (#6049)

* fix(proxy_server.py): remove spendlog fixes from proxy startup logic

Moves  https://github.com/BerriAI/litellm/pull/4794 to `/db_scripts` and cleans up some caching-related debug info (easier to trace debug logs)

* fix(langfuse_endpoints.py): Fixes https://github.com/BerriAI/litellm/issues/6041

* fix(azure.py): fix health checks for azure audio transcription models

Fixes https://github.com/BerriAI/litellm/issues/5999

* Feat: Add Literal AI Integration (#5653)

* feat: add Literal AI integration

* update readme

* Update README.md

* fix: address comments

* fix: remove literalai sdk

* fix: use HTTPHandler

* chore: add test

* fix: add asyncio lock

* fix(literal_ai.py): fix linting errors

* fix(literal_ai.py): fix linting errors

* refactor: cleanup

---------

Co-authored-by: Willy Douhard <willy.douhard@gmail.com>
This commit is contained in:
Krish Dholakia 2024-10-03 18:02:28 -04:00 committed by GitHub
parent f9d0bcc5a1
commit 5c33d1c9af
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 557 additions and 44 deletions

View file

@ -4,6 +4,7 @@ python script to pre-create all views required by LiteLLM Proxy Server
import asyncio
import os
from update_unassigned_teams import apply_db_fixes
# Enter your DATABASE_URL here
@ -204,6 +205,7 @@ async def check_view_exists():
print("Last30dTopEndUsersSpend Created!") # noqa
await apply_db_fixes(db=db)
return

View file

@ -0,0 +1,27 @@
from prisma import Prisma
async def apply_db_fixes(db: Prisma):
try:
sql_query = """
UPDATE "LiteLLM_SpendLogs"
SET team_id = (
SELECT vt.team_id
FROM "LiteLLM_VerificationToken" vt
WHERE vt.token = "LiteLLM_SpendLogs".api_key
)
WHERE team_id IS NULL
AND EXISTS (
SELECT 1
FROM "LiteLLM_VerificationToken" vt
WHERE vt.token = "LiteLLM_SpendLogs".api_key
);
"""
response = await db.query_raw(sql_query)
print(
"Updated unassigned teams, Response=%s",
response,
)
except Exception as e:
raise Exception(f"Error apply_db_fixes: {str(e)}")
return

View file

@ -0,0 +1,119 @@
import Image from '@theme/IdealImage';
# Literal AI - Log, Evaluate, Monitor
[Literal AI](https://literalai.com) is a collaborative observability, evaluation and analytics platform for building production-grade LLM apps.
<Image img={require('../../img/literalai.png')} />
## Pre-Requisites
Ensure you have the `literalai` package installed:
```shell
pip install literalai litellm
```
## Quick Start
```python
import litellm
import os
os.environ["LITERAL_API_KEY"] = ""
os.environ['OPENAI_API_KEY']= ""
litellm.success_callback = ["literalai"] # Log Input/Output to LiteralAI
litellm.failure_callback = ["literalai"] # Log Errors to LiteralAI
# openai call
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "Hi 👋 - i'm openai"}
]
)
```
## Multi Step Traces
This integration is compatible with the Literal AI SDK decorators, enabling conversation and agent tracing
```py
import litellm
from literalai import LiteralClient
import os
os.environ["LITERAL_API_KEY"] = ""
os.environ['OPENAI_API_KEY']= ""
litellm.input_callback = ["literalai"] # Support other Literal AI decorators and prompt templates
litellm.success_callback = ["literalai"] # Log Input/Output to LiteralAI
litellm.failure_callback = ["literalai"] # Log Errors to LiteralAI
literalai_client = LiteralClient()
@literalai_client.run
def my_agent(question: str):
# agent logic here
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": question}
]
)
return response
my_agent("Hello world")
# Waiting to send all logs before exiting, not needed in a production server
literalai_client.flush()
```
Learn more about [Literal AI logging capabilities](https://docs.literalai.com/guides/logs).
## Bind a Generation to its Prompt Template
This integration works out of the box with prompts managed on Literal AI. This means that a specific LLM generation will be bound to its template.
Learn more about [Prompt Management](https://docs.literalai.com/guides/prompt-management#pull-a-prompt-template-from-literal-ai) on Literal AI.
## OpenAI Proxy Usage
If you are using the Lite LLM proxy, you can use the Literal AI OpenAI instrumentation to log your calls.
```py
from literalai import LiteralClient
from openai import OpenAI
client = OpenAI(
api_key="anything", # litellm proxy virtual key
base_url="http://0.0.0.0:4000" # litellm proxy base_url
)
literalai_client = LiteralClient(api_key="")
# Instrument the OpenAI client
literalai_client.instrument_openai()
settings = {
"model": "gpt-3.5-turbo", # model you want to send litellm proxy
"temperature": 0,
# ... more settings
}
response = client.chat.completions.create(
messages=[
{
"content": "You are a helpful bot, you always reply in Spanish",
"role": "system"
},
{
"content": message.content,
"role": "user"
}
],
**settings
)
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 298 KiB

View file

@ -43,6 +43,7 @@ _custom_logger_compatible_callbacks_literal = Literal[
"lago",
"openmeter",
"logfire",
"literalai",
"dynamic_rate_limiter",
"langsmith",
"prometheus",

View file

@ -105,9 +105,6 @@ class InMemoryCache(BaseCache):
# This can occur when an object is referenced by another object, but the reference is never removed.
def set_cache(self, key, value, **kwargs):
print_verbose(
"InMemoryCache: set_cache. current size= {}".format(len(self.cache_dict))
)
if len(self.cache_dict) >= self.max_size_in_memory:
# only evict when cache is full
self.evict_cache()
@ -1835,7 +1832,6 @@ class DualCache(BaseCache):
def set_cache(self, key, value, local_only: bool = False, **kwargs):
# Update both Redis and in-memory cache
try:
print_verbose(f"set cache: key: {key}; value: {value}")
if self.in_memory_cache is not None:
if "ttl" not in kwargs and self.default_in_memory_ttl is not None:
kwargs["ttl"] = self.default_in_memory_ttl
@ -1873,7 +1869,6 @@ class DualCache(BaseCache):
def get_cache(self, key, local_only: bool = False, **kwargs):
# Try to fetch from in-memory cache first
try:
print_verbose(f"get cache: cache key: {key}; local_only: {local_only}")
result = None
if self.in_memory_cache is not None:
in_memory_result = self.in_memory_cache.get_cache(key, **kwargs)
@ -1906,7 +1901,6 @@ class DualCache(BaseCache):
if self.in_memory_cache is not None:
in_memory_result = self.in_memory_cache.batch_get_cache(keys, **kwargs)
print_verbose(f"in_memory_result: {in_memory_result}")
if in_memory_result is not None:
result = in_memory_result

View file

@ -0,0 +1,318 @@
#### What this does ####
# This file contains the LiteralAILogger class which is used to log steps to the LiteralAI observability platform.
import asyncio
import os
import traceback
import uuid
from typing import Optional
import httpx
from litellm._logging import verbose_logger
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.litellm_core_utils.redact_messages import redact_user_api_key_info
from litellm.llms.custom_httpx.http_handler import (
HTTPHandler,
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.types.utils import StandardLoggingPayload
class LiteralAILogger(CustomBatchLogger):
def __init__(
self,
literalai_api_key=None,
literalai_api_url="https://cloud.getliteral.ai",
env=None,
**kwargs,
):
self.literalai_api_url = os.getenv("LITERAL_API_URL") or literalai_api_url
self.headers = {
"Content-Type": "application/json",
"x-api-key": literalai_api_key or os.getenv("LITERAL_API_KEY"),
"x-client-name": "litellm",
}
if env:
self.headers["x-env"] = env
self.async_httpx_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
self.sync_http_handler = HTTPHandler()
batch_size = os.getenv("LITERAL_BATCH_SIZE", None)
self.flush_lock = asyncio.Lock()
super().__init__(
**kwargs,
flush_lock=self.flush_lock,
batch_size=int(batch_size) if batch_size else None,
)
def log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
"Literal AI Layer Logging - kwargs: %s, response_obj: %s",
kwargs,
response_obj,
)
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
self.log_queue.append(data)
verbose_logger.debug(
"Literal AI logging: queue length %s, batch size %s",
len(self.log_queue),
self.batch_size,
)
if len(self.log_queue) >= self.batch_size:
self._send_batch()
except Exception:
verbose_logger.exception(
"Literal AI Layer Error - error logging success event."
)
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
verbose_logger.info("Literal AI Failure Event Logging!")
try:
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
self.log_queue.append(data)
verbose_logger.debug(
"Literal AI logging: queue length %s, batch size %s",
len(self.log_queue),
self.batch_size,
)
if len(self.log_queue) >= self.batch_size:
self._send_batch()
except Exception:
verbose_logger.exception(
"Literal AI Layer Error - error logging failure event."
)
def _send_batch(self):
if not self.log_queue:
return
url = f"{self.literalai_api_url}/api/graphql"
query = self._steps_query_builder(self.log_queue)
variables = self._steps_variables_builder(self.log_queue)
try:
response = self.sync_http_handler.post(
url=url,
json={
"query": query,
"variables": variables,
},
headers=self.headers,
)
response.raise_for_status()
if response.status_code >= 300:
verbose_logger.error(
f"Literal AI Error: {response.status_code} - {response.text}"
)
else:
verbose_logger.debug(
f"Batch of {len(self.log_queue)} runs successfully created"
)
except Exception:
verbose_logger.exception("Literal AI Layer Error")
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
"Literal AI Async Layer Logging - kwargs: %s, response_obj: %s",
kwargs,
response_obj,
)
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
self.log_queue.append(data)
verbose_logger.debug(
"Literal AI logging: queue length %s, batch size %s",
len(self.log_queue),
self.batch_size,
)
if len(self.log_queue) >= self.batch_size:
await self.flush_queue()
except Exception:
verbose_logger.exception(
"Literal AI Layer Error - error logging async success event."
)
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
verbose_logger.info("Literal AI Failure Event Logging!")
try:
data = self._prepare_log_data(kwargs, response_obj, start_time, end_time)
self.log_queue.append(data)
verbose_logger.debug(
"Literal AI logging: queue length %s, batch size %s",
len(self.log_queue),
self.batch_size,
)
if len(self.log_queue) >= self.batch_size:
await self.flush_queue()
except Exception:
verbose_logger.exception(
"Literal AI Layer Error - error logging async failure event."
)
async def async_send_batch(self):
if not self.log_queue:
return
url = f"{self.literalai_api_url}/api/graphql"
query = self._steps_query_builder(self.log_queue)
variables = self._steps_variables_builder(self.log_queue)
try:
response = await self.async_httpx_client.post(
url=url,
json={
"query": query,
"variables": variables,
},
headers=self.headers,
)
response.raise_for_status()
if response.status_code >= 300:
verbose_logger.error(
f"Literal AI Error: {response.status_code} - {response.text}"
)
else:
verbose_logger.debug(
f"Batch of {len(self.log_queue)} runs successfully created"
)
except httpx.HTTPStatusError as e:
verbose_logger.exception(
f"Literal AI HTTP Error: {e.response.status_code} - {e.response.text}"
)
except Exception:
verbose_logger.exception("Literal AI Layer Error")
def _prepare_log_data(self, kwargs, response_obj, start_time, end_time) -> dict:
logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None
)
if logging_payload is None:
raise ValueError("standard_logging_object not found in kwargs")
clean_metadata = logging_payload["metadata"]
metadata = kwargs.get("litellm_params", {}).get("metadata", {})
settings = logging_payload["model_parameters"]
messages = logging_payload["messages"]
prompt_id = None
variables = None
if messages and isinstance(messages, list) and isinstance(messages[0], dict):
for message in messages:
if literal_prompt := getattr(message, "__literal_prompt__", None):
prompt_id = literal_prompt.get("prompt_id")
variables = literal_prompt.get("variables")
message["uuid"] = literal_prompt.get("uuid")
message["templated"] = True
tools = settings.pop("tools", None)
step = {
"id": metadata.get("step_id", str(uuid.uuid4())),
"error": logging_payload["error_str"],
"name": kwargs.get("model", ""),
"threadId": metadata.get("literalai_thread_id", None),
"parentId": metadata.get("literalai_parent_id", None),
"rootRunId": metadata.get("literalai_root_run_id", None),
"input": None,
"output": None,
"type": "llm",
"tags": metadata.get("tags", metadata.get("literalai_tags", None)),
"startTime": str(start_time),
"endTime": str(end_time),
"metadata": clean_metadata,
"generation": {
"inputTokenCount": logging_payload["prompt_tokens"],
"outputTokenCount": logging_payload["completion_tokens"],
"tokenCount": logging_payload["total_tokens"],
"promptId": prompt_id,
"variables": variables,
"provider": kwargs.get("custom_llm_provider", "litellm"),
"model": kwargs.get("model", ""),
"duration": (end_time - start_time).total_seconds(),
"settings": settings,
"messages": messages,
"tools": tools,
},
}
return step
def _steps_query_variables_builder(self, steps):
generated = ""
for id in range(len(steps)):
generated += f"""$id_{id}: String!
$threadId_{id}: String
$rootRunId_{id}: String
$type_{id}: StepType
$startTime_{id}: DateTime
$endTime_{id}: DateTime
$error_{id}: String
$input_{id}: Json
$output_{id}: Json
$metadata_{id}: Json
$parentId_{id}: String
$name_{id}: String
$tags_{id}: [String!]
$generation_{id}: GenerationPayloadInput
$scores_{id}: [ScorePayloadInput!]
$attachments_{id}: [AttachmentPayloadInput!]
"""
return generated
def _steps_ingest_steps_builder(self, steps):
generated = ""
for id in range(len(steps)):
generated += f"""
step{id}: ingestStep(
id: $id_{id}
threadId: $threadId_{id}
rootRunId: $rootRunId_{id}
startTime: $startTime_{id}
endTime: $endTime_{id}
type: $type_{id}
error: $error_{id}
input: $input_{id}
output: $output_{id}
metadata: $metadata_{id}
parentId: $parentId_{id}
name: $name_{id}
tags: $tags_{id}
generation: $generation_{id}
scores: $scores_{id}
attachments: $attachments_{id}
) {{
ok
message
}}
"""
return generated
def _steps_query_builder(self, steps):
return f"""
mutation AddStep({self._steps_query_variables_builder(steps)}) {{
{self._steps_ingest_steps_builder(steps)}
}}
"""
def _steps_variables_builder(self, steps):
def serialize_step(event, id):
result = {}
for key, value in event.items():
# Only keep the keys that are not None to avoid overriding existing values
if value is not None:
result[f"{key}_{id}"] = value
return result
variables = {}
for i in range(len(steps)):
step = steps[i]
variables.update(serialize_step(step, i))
return variables

View file

@ -72,6 +72,7 @@ from ..integrations.lago import LagoLogger
from ..integrations.langfuse import LangFuseLogger
from ..integrations.langsmith import LangsmithLogger
from ..integrations.litedebugger import LiteDebugger
from ..integrations.literal_ai import LiteralAILogger
from ..integrations.logfire_logger import LogfireLevel, LogfireLogger
from ..integrations.lunary import LunaryLogger
from ..integrations.openmeter import OpenMeterLogger
@ -2245,6 +2246,14 @@ def _init_custom_logger_compatible_class(
_langsmith_logger = LangsmithLogger()
_in_memory_loggers.append(_langsmith_logger)
return _langsmith_logger # type: ignore
elif logging_integration == "literalai":
for callback in _in_memory_loggers:
if isinstance(callback, LiteralAILogger):
return callback # type: ignore
_literalai_logger = LiteralAILogger()
_in_memory_loggers.append(_literalai_logger)
return _literalai_logger # type: ignore
elif logging_integration == "prometheus":
for callback in _in_memory_loggers:
if isinstance(callback, PrometheusLogger):
@ -2394,6 +2403,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers:
if isinstance(callback, LangsmithLogger):
return callback
elif logging_integration == "literalai":
for callback in _in_memory_loggers:
if isinstance(callback, LiteralAILogger):
return callback
elif logging_integration == "prometheus":
for callback in _in_memory_loggers:
if isinstance(callback, PrometheusLogger):

View file

@ -1813,7 +1813,9 @@ class AzureChatCompletion(BaseLLM):
elif mode == "audio_transcription":
# Get the current directory of the file being run
pwd = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(pwd, "../tests/gettysburg.wav")
file_path = os.path.join(
pwd, "../../../tests/gettysburg.wav"
) # proxy address
audio_file = open(file_path, "rb")
completion = await client.audio.transcriptions.with_raw_response.create(
file=audio_file,

View file

@ -9,5 +9,4 @@ model_list:
litellm_params:
model: openai/gpt-4o-realtime-preview-2024-10-01
api_key: os.environ/OPENAI_API_KEY
api_base: http://localhost:8080
api_base: http://localhost:8080

View file

@ -2902,13 +2902,6 @@ async def startup_event():
)
)
### CHECK IF VIEW EXISTS ###
if prisma_client is not None:
await prisma_client.check_view_exists()
# Apply misc fixes on DB
# [non-blocking] helper to apply fixes from older litellm versions
asyncio.create_task(prisma_client.apply_db_fixes())
### START BATCH WRITING DB + CHECKING NEW MODELS###
if prisma_client is not None:
scheduler = AsyncIOScheduler()

View file

@ -2360,34 +2360,6 @@ class PrismaClient:
)
raise e
async def apply_db_fixes(self):
try:
verbose_proxy_logger.debug(
"Applying LiteLLM - DB Fixes fixing logs in SpendLogs"
)
sql_query = """
UPDATE "LiteLLM_SpendLogs"
SET team_id = (
SELECT vt.team_id
FROM "LiteLLM_VerificationToken" vt
WHERE vt.token = "LiteLLM_SpendLogs".api_key
)
WHERE team_id IS NULL
AND EXISTS (
SELECT 1
FROM "LiteLLM_VerificationToken" vt
WHERE vt.token = "LiteLLM_SpendLogs".api_key
);
"""
response = await self.db.query_raw(sql_query)
verbose_proxy_logger.debug(
"Applied LiteLLM - DB Fixes fixing logs in SpendLogs, Response=%s",
response,
)
except Exception as e:
verbose_proxy_logger.debug(f"Error apply_db_fixes: {str(e)}")
return
### CUSTOM FILE ###
def get_instance_fn(value: str, config_file_path: Optional[str] = None) -> Any:

View file

@ -140,6 +140,7 @@ async def langfuse_proxy_route(
request,
fastapi_response,
user_api_key_dict,
query_params=dict(request.query_params), # type: ignore
)
return received_value

View file

@ -0,0 +1,72 @@
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
import asyncio
import logging
import pytest
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.literal_ai import LiteralAILogger
verbose_logger.setLevel(logging.DEBUG)
litellm.set_verbose = True
@pytest.mark.asyncio
async def test_literalai_queue_logging():
try:
# Initialize LiteralAILogger
test_literalai_logger = LiteralAILogger()
litellm.callbacks = [test_literalai_logger]
test_literalai_logger.batch_size = 6
litellm.set_verbose = True
# Make multiple calls to ensure we don't hit the batch size
for _ in range(5):
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
)
await asyncio.sleep(3)
# Check that logs are in the queue
assert len(test_literalai_logger.log_queue) == 5
# Now make calls to exceed the batch size
for _ in range(3):
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Test message"}],
max_tokens=10,
temperature=0.2,
mock_response="This is a mock response",
)
# Wait a short time for any asynchronous operations to complete
await asyncio.sleep(1)
print(
"Length of literalai log queue: {}".format(
len(test_literalai_logger.log_queue)
)
)
# Check that the queue was flushed after exceeding batch size
assert len(test_literalai_logger.log_queue) < 5
# Clean up
for cb in litellm.callbacks:
if isinstance(cb, LiteralAILogger):
await cb.async_httpx_client.client.aclose()
except Exception as e:
pytest.fail(f"Error occurred: {e}")