diff --git a/db_scripts/create_views.py b/db_scripts/create_views.py index 1444ab96d..c42f67124 100644 --- a/db_scripts/create_views.py +++ b/db_scripts/create_views.py @@ -4,6 +4,7 @@ python script to pre-create all views required by LiteLLM Proxy Server import asyncio import os +from update_unassigned_teams import apply_db_fixes # Enter your DATABASE_URL here @@ -204,6 +205,7 @@ async def check_view_exists(): print("Last30dTopEndUsersSpend Created!") # noqa + await apply_db_fixes(db=db) return diff --git a/db_scripts/update_unassigned_teams.py b/db_scripts/update_unassigned_teams.py new file mode 100644 index 000000000..dc65e4c20 --- /dev/null +++ b/db_scripts/update_unassigned_teams.py @@ -0,0 +1,27 @@ +from prisma import Prisma + + +async def apply_db_fixes(db: Prisma): + try: + sql_query = """ + UPDATE "LiteLLM_SpendLogs" + SET team_id = ( + SELECT vt.team_id + FROM "LiteLLM_VerificationToken" vt + WHERE vt.token = "LiteLLM_SpendLogs".api_key + ) + WHERE team_id IS NULL + AND EXISTS ( + SELECT 1 + FROM "LiteLLM_VerificationToken" vt + WHERE vt.token = "LiteLLM_SpendLogs".api_key + ); + """ + response = await db.query_raw(sql_query) + print( + "Updated unassigned teams, Response=%s", + response, + ) + except Exception as e: + raise Exception(f"Error apply_db_fixes: {str(e)}") + return diff --git a/docs/my-website/docs/observability/literalai_integration.md b/docs/my-website/docs/observability/literalai_integration.md new file mode 100644 index 000000000..5a643ad47 --- /dev/null +++ b/docs/my-website/docs/observability/literalai_integration.md @@ -0,0 +1,119 @@ +import Image from '@theme/IdealImage'; + +# Literal AI - Log, Evaluate, Monitor + +[Literal AI](https://literalai.com) is a collaborative observability, evaluation and analytics platform for building production-grade LLM apps. + + + +## Pre-Requisites + +Ensure you have the `literalai` package installed: + +```shell +pip install literalai litellm +``` + +## Quick Start + +```python +import litellm +import os + +os.environ["LITERAL_API_KEY"] = "" +os.environ['OPENAI_API_KEY']= "" + +litellm.success_callback = ["literalai"] # Log Input/Output to LiteralAI +litellm.failure_callback = ["literalai"] # Log Errors to LiteralAI + +# openai call +response = litellm.completion( + model="gpt-3.5-turbo", + messages=[ + {"role": "user", "content": "Hi 👋 - i'm openai"} + ] +) +``` + +## Multi Step Traces + +This integration is compatible with the Literal AI SDK decorators, enabling conversation and agent tracing + +```py +import litellm +from literalai import LiteralClient +import os + +os.environ["LITERAL_API_KEY"] = "" +os.environ['OPENAI_API_KEY']= "" + +litellm.input_callback = ["literalai"] # Support other Literal AI decorators and prompt templates +litellm.success_callback = ["literalai"] # Log Input/Output to LiteralAI +litellm.failure_callback = ["literalai"] # Log Errors to LiteralAI + +literalai_client = LiteralClient() + +@literalai_client.run +def my_agent(question: str): + # agent logic here + response = litellm.completion( + model="gpt-3.5-turbo", + messages=[ + {"role": "user", "content": question} + ] + ) + return response + +my_agent("Hello world") + +# Waiting to send all logs before exiting, not needed in a production server +literalai_client.flush() +``` + +Learn more about [Literal AI logging capabilities](https://docs.literalai.com/guides/logs). + +## Bind a Generation to its Prompt Template + +This integration works out of the box with prompts managed on Literal AI. This means that a specific LLM generation will be bound to its template. + +Learn more about [Prompt Management](https://docs.literalai.com/guides/prompt-management#pull-a-prompt-template-from-literal-ai) on Literal AI. + +## OpenAI Proxy Usage + +If you are using the Lite LLM proxy, you can use the Literal AI OpenAI instrumentation to log your calls. + +```py +from literalai import LiteralClient +from openai import OpenAI + +client = OpenAI( + api_key="anything", # litellm proxy virtual key + base_url="http://0.0.0.0:4000" # litellm proxy base_url +) + +literalai_client = LiteralClient(api_key="") + +# Instrument the OpenAI client +literalai_client.instrument_openai() + +settings = { + "model": "gpt-3.5-turbo", # model you want to send litellm proxy + "temperature": 0, + # ... more settings +} + +response = client.chat.completions.create( + messages=[ + { + "content": "You are a helpful bot, you always reply in Spanish", + "role": "system" + }, + { + "content": message.content, + "role": "user" + } + ], + **settings + ) + +``` diff --git a/docs/my-website/img/literalai.png b/docs/my-website/img/literalai.png new file mode 100644 index 000000000..eb7b82b96 Binary files /dev/null and b/docs/my-website/img/literalai.png differ diff --git a/litellm/__init__.py b/litellm/__init__.py index 17154e2f7..02cec3c12 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -43,6 +43,7 @@ _custom_logger_compatible_callbacks_literal = Literal[ "lago", "openmeter", "logfire", + "literalai", "dynamic_rate_limiter", "langsmith", "prometheus", diff --git a/litellm/caching.py b/litellm/caching.py index 6f77d2500..f067bd03b 100644 --- a/litellm/caching.py +++ b/litellm/caching.py @@ -105,9 +105,6 @@ class InMemoryCache(BaseCache): # This can occur when an object is referenced by another object, but the reference is never removed. def set_cache(self, key, value, **kwargs): - print_verbose( - "InMemoryCache: set_cache. current size= {}".format(len(self.cache_dict)) - ) if len(self.cache_dict) >= self.max_size_in_memory: # only evict when cache is full self.evict_cache() @@ -1835,7 +1832,6 @@ class DualCache(BaseCache): def set_cache(self, key, value, local_only: bool = False, **kwargs): # Update both Redis and in-memory cache try: - print_verbose(f"set cache: key: {key}; value: {value}") if self.in_memory_cache is not None: if "ttl" not in kwargs and self.default_in_memory_ttl is not None: kwargs["ttl"] = self.default_in_memory_ttl @@ -1873,7 +1869,6 @@ class DualCache(BaseCache): def get_cache(self, key, local_only: bool = False, **kwargs): # Try to fetch from in-memory cache first try: - print_verbose(f"get cache: cache key: {key}; local_only: {local_only}") result = None if self.in_memory_cache is not None: in_memory_result = self.in_memory_cache.get_cache(key, **kwargs) @@ -1906,7 +1901,6 @@ class DualCache(BaseCache): if self.in_memory_cache is not None: in_memory_result = self.in_memory_cache.batch_get_cache(keys, **kwargs) - print_verbose(f"in_memory_result: {in_memory_result}") if in_memory_result is not None: result = in_memory_result diff --git a/litellm/integrations/literal_ai.py b/litellm/integrations/literal_ai.py new file mode 100644 index 000000000..0a7f2f675 --- /dev/null +++ b/litellm/integrations/literal_ai.py @@ -0,0 +1,318 @@ +#### What this does #### +# This file contains the LiteralAILogger class which is used to log steps to the LiteralAI observability platform. +import asyncio +import os +import traceback +import uuid +from typing import Optional + +import httpx + +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.litellm_core_utils.redact_messages import redact_user_api_key_info +from litellm.llms.custom_httpx.http_handler import ( + HTTPHandler, + get_async_httpx_client, + httpxSpecialProvider, +) +from litellm.types.utils import StandardLoggingPayload + + +class LiteralAILogger(CustomBatchLogger): + def __init__( + self, + literalai_api_key=None, + literalai_api_url="https://cloud.getliteral.ai", + env=None, + **kwargs, + ): + self.literalai_api_url = os.getenv("LITERAL_API_URL") or literalai_api_url + self.headers = { + "Content-Type": "application/json", + "x-api-key": literalai_api_key or os.getenv("LITERAL_API_KEY"), + "x-client-name": "litellm", + } + if env: + self.headers["x-env"] = env + self.async_httpx_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.sync_http_handler = HTTPHandler() + batch_size = os.getenv("LITERAL_BATCH_SIZE", None) + self.flush_lock = asyncio.Lock() + super().__init__( + **kwargs, + flush_lock=self.flush_lock, + batch_size=int(batch_size) if batch_size else None, + ) + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + verbose_logger.debug( + "Literal AI Layer Logging - kwargs: %s, response_obj: %s", + kwargs, + response_obj, + ) + data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) + self.log_queue.append(data) + verbose_logger.debug( + "Literal AI logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, + ) + if len(self.log_queue) >= self.batch_size: + self._send_batch() + except Exception: + verbose_logger.exception( + "Literal AI Layer Error - error logging success event." + ) + + def log_failure_event(self, kwargs, response_obj, start_time, end_time): + verbose_logger.info("Literal AI Failure Event Logging!") + try: + data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) + self.log_queue.append(data) + verbose_logger.debug( + "Literal AI logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, + ) + if len(self.log_queue) >= self.batch_size: + self._send_batch() + except Exception: + verbose_logger.exception( + "Literal AI Layer Error - error logging failure event." + ) + + def _send_batch(self): + if not self.log_queue: + return + + url = f"{self.literalai_api_url}/api/graphql" + query = self._steps_query_builder(self.log_queue) + variables = self._steps_variables_builder(self.log_queue) + + try: + response = self.sync_http_handler.post( + url=url, + json={ + "query": query, + "variables": variables, + }, + headers=self.headers, + ) + response.raise_for_status() + + if response.status_code >= 300: + verbose_logger.error( + f"Literal AI Error: {response.status_code} - {response.text}" + ) + else: + verbose_logger.debug( + f"Batch of {len(self.log_queue)} runs successfully created" + ) + except Exception: + verbose_logger.exception("Literal AI Layer Error") + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + verbose_logger.debug( + "Literal AI Async Layer Logging - kwargs: %s, response_obj: %s", + kwargs, + response_obj, + ) + data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) + self.log_queue.append(data) + verbose_logger.debug( + "Literal AI logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, + ) + if len(self.log_queue) >= self.batch_size: + await self.flush_queue() + except Exception: + verbose_logger.exception( + "Literal AI Layer Error - error logging async success event." + ) + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + verbose_logger.info("Literal AI Failure Event Logging!") + try: + data = self._prepare_log_data(kwargs, response_obj, start_time, end_time) + self.log_queue.append(data) + verbose_logger.debug( + "Literal AI logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, + ) + if len(self.log_queue) >= self.batch_size: + await self.flush_queue() + except Exception: + verbose_logger.exception( + "Literal AI Layer Error - error logging async failure event." + ) + + async def async_send_batch(self): + if not self.log_queue: + return + + url = f"{self.literalai_api_url}/api/graphql" + query = self._steps_query_builder(self.log_queue) + variables = self._steps_variables_builder(self.log_queue) + + try: + response = await self.async_httpx_client.post( + url=url, + json={ + "query": query, + "variables": variables, + }, + headers=self.headers, + ) + response.raise_for_status() + + if response.status_code >= 300: + verbose_logger.error( + f"Literal AI Error: {response.status_code} - {response.text}" + ) + else: + verbose_logger.debug( + f"Batch of {len(self.log_queue)} runs successfully created" + ) + except httpx.HTTPStatusError as e: + verbose_logger.exception( + f"Literal AI HTTP Error: {e.response.status_code} - {e.response.text}" + ) + except Exception: + verbose_logger.exception("Literal AI Layer Error") + + def _prepare_log_data(self, kwargs, response_obj, start_time, end_time) -> dict: + logging_payload: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object", None + ) + + if logging_payload is None: + raise ValueError("standard_logging_object not found in kwargs") + clean_metadata = logging_payload["metadata"] + metadata = kwargs.get("litellm_params", {}).get("metadata", {}) + + settings = logging_payload["model_parameters"] + + messages = logging_payload["messages"] + prompt_id = None + variables = None + + if messages and isinstance(messages, list) and isinstance(messages[0], dict): + for message in messages: + if literal_prompt := getattr(message, "__literal_prompt__", None): + prompt_id = literal_prompt.get("prompt_id") + variables = literal_prompt.get("variables") + message["uuid"] = literal_prompt.get("uuid") + message["templated"] = True + + tools = settings.pop("tools", None) + + step = { + "id": metadata.get("step_id", str(uuid.uuid4())), + "error": logging_payload["error_str"], + "name": kwargs.get("model", ""), + "threadId": metadata.get("literalai_thread_id", None), + "parentId": metadata.get("literalai_parent_id", None), + "rootRunId": metadata.get("literalai_root_run_id", None), + "input": None, + "output": None, + "type": "llm", + "tags": metadata.get("tags", metadata.get("literalai_tags", None)), + "startTime": str(start_time), + "endTime": str(end_time), + "metadata": clean_metadata, + "generation": { + "inputTokenCount": logging_payload["prompt_tokens"], + "outputTokenCount": logging_payload["completion_tokens"], + "tokenCount": logging_payload["total_tokens"], + "promptId": prompt_id, + "variables": variables, + "provider": kwargs.get("custom_llm_provider", "litellm"), + "model": kwargs.get("model", ""), + "duration": (end_time - start_time).total_seconds(), + "settings": settings, + "messages": messages, + "tools": tools, + }, + } + return step + + def _steps_query_variables_builder(self, steps): + generated = "" + for id in range(len(steps)): + generated += f"""$id_{id}: String! + $threadId_{id}: String + $rootRunId_{id}: String + $type_{id}: StepType + $startTime_{id}: DateTime + $endTime_{id}: DateTime + $error_{id}: String + $input_{id}: Json + $output_{id}: Json + $metadata_{id}: Json + $parentId_{id}: String + $name_{id}: String + $tags_{id}: [String!] + $generation_{id}: GenerationPayloadInput + $scores_{id}: [ScorePayloadInput!] + $attachments_{id}: [AttachmentPayloadInput!] + """ + return generated + + def _steps_ingest_steps_builder(self, steps): + generated = "" + for id in range(len(steps)): + generated += f""" + step{id}: ingestStep( + id: $id_{id} + threadId: $threadId_{id} + rootRunId: $rootRunId_{id} + startTime: $startTime_{id} + endTime: $endTime_{id} + type: $type_{id} + error: $error_{id} + input: $input_{id} + output: $output_{id} + metadata: $metadata_{id} + parentId: $parentId_{id} + name: $name_{id} + tags: $tags_{id} + generation: $generation_{id} + scores: $scores_{id} + attachments: $attachments_{id} + ) {{ + ok + message + }} + """ + return generated + + def _steps_query_builder(self, steps): + return f""" + mutation AddStep({self._steps_query_variables_builder(steps)}) {{ + {self._steps_ingest_steps_builder(steps)} + }} + """ + + def _steps_variables_builder(self, steps): + def serialize_step(event, id): + result = {} + + for key, value in event.items(): + # Only keep the keys that are not None to avoid overriding existing values + if value is not None: + result[f"{key}_{id}"] = value + + return result + + variables = {} + for i in range(len(steps)): + step = steps[i] + variables.update(serialize_step(step, i)) + return variables diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 6072ecb88..c22df85af 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -72,6 +72,7 @@ from ..integrations.lago import LagoLogger from ..integrations.langfuse import LangFuseLogger from ..integrations.langsmith import LangsmithLogger from ..integrations.litedebugger import LiteDebugger +from ..integrations.literal_ai import LiteralAILogger from ..integrations.logfire_logger import LogfireLevel, LogfireLogger from ..integrations.lunary import LunaryLogger from ..integrations.openmeter import OpenMeterLogger @@ -2245,6 +2246,14 @@ def _init_custom_logger_compatible_class( _langsmith_logger = LangsmithLogger() _in_memory_loggers.append(_langsmith_logger) return _langsmith_logger # type: ignore + elif logging_integration == "literalai": + for callback in _in_memory_loggers: + if isinstance(callback, LiteralAILogger): + return callback # type: ignore + + _literalai_logger = LiteralAILogger() + _in_memory_loggers.append(_literalai_logger) + return _literalai_logger # type: ignore elif logging_integration == "prometheus": for callback in _in_memory_loggers: if isinstance(callback, PrometheusLogger): @@ -2394,6 +2403,10 @@ def get_custom_logger_compatible_class( for callback in _in_memory_loggers: if isinstance(callback, LangsmithLogger): return callback + elif logging_integration == "literalai": + for callback in _in_memory_loggers: + if isinstance(callback, LiteralAILogger): + return callback elif logging_integration == "prometheus": for callback in _in_memory_loggers: if isinstance(callback, PrometheusLogger): diff --git a/litellm/llms/AzureOpenAI/azure.py b/litellm/llms/AzureOpenAI/azure.py index 34ba7d7e0..42c9f48f1 100644 --- a/litellm/llms/AzureOpenAI/azure.py +++ b/litellm/llms/AzureOpenAI/azure.py @@ -1813,7 +1813,9 @@ class AzureChatCompletion(BaseLLM): elif mode == "audio_transcription": # Get the current directory of the file being run pwd = os.path.dirname(os.path.realpath(__file__)) - file_path = os.path.join(pwd, "../tests/gettysburg.wav") + file_path = os.path.join( + pwd, "../../../tests/gettysburg.wav" + ) # proxy address audio_file = open(file_path, "rb") completion = await client.audio.transcriptions.with_raw_response.create( file=audio_file, diff --git a/litellm/proxy/_new_secret_config.yaml b/litellm/proxy/_new_secret_config.yaml index 040a4a536..5f36a359e 100644 --- a/litellm/proxy/_new_secret_config.yaml +++ b/litellm/proxy/_new_secret_config.yaml @@ -9,5 +9,4 @@ model_list: litellm_params: model: openai/gpt-4o-realtime-preview-2024-10-01 api_key: os.environ/OPENAI_API_KEY - api_base: http://localhost:8080 - + api_base: http://localhost:8080 \ No newline at end of file diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 12224634c..6efca6eb9 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -2902,13 +2902,6 @@ async def startup_event(): ) ) - ### CHECK IF VIEW EXISTS ### - if prisma_client is not None: - await prisma_client.check_view_exists() - # Apply misc fixes on DB - # [non-blocking] helper to apply fixes from older litellm versions - asyncio.create_task(prisma_client.apply_db_fixes()) - ### START BATCH WRITING DB + CHECKING NEW MODELS### if prisma_client is not None: scheduler = AsyncIOScheduler() diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 081591c14..0f57e90fc 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -2360,34 +2360,6 @@ class PrismaClient: ) raise e - async def apply_db_fixes(self): - try: - verbose_proxy_logger.debug( - "Applying LiteLLM - DB Fixes fixing logs in SpendLogs" - ) - sql_query = """ - UPDATE "LiteLLM_SpendLogs" - SET team_id = ( - SELECT vt.team_id - FROM "LiteLLM_VerificationToken" vt - WHERE vt.token = "LiteLLM_SpendLogs".api_key - ) - WHERE team_id IS NULL - AND EXISTS ( - SELECT 1 - FROM "LiteLLM_VerificationToken" vt - WHERE vt.token = "LiteLLM_SpendLogs".api_key - ); - """ - response = await self.db.query_raw(sql_query) - verbose_proxy_logger.debug( - "Applied LiteLLM - DB Fixes fixing logs in SpendLogs, Response=%s", - response, - ) - except Exception as e: - verbose_proxy_logger.debug(f"Error apply_db_fixes: {str(e)}") - return - ### CUSTOM FILE ### def get_instance_fn(value: str, config_file_path: Optional[str] = None) -> Any: diff --git a/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py b/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py index 4626cd667..ba8653d82 100644 --- a/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py +++ b/litellm/proxy/vertex_ai_endpoints/langfuse_endpoints.py @@ -140,6 +140,7 @@ async def langfuse_proxy_route( request, fastapi_response, user_api_key_dict, + query_params=dict(request.query_params), # type: ignore ) return received_value diff --git a/tests/local_testing/test_literalai.py b/tests/local_testing/test_literalai.py new file mode 100644 index 000000000..35e583549 --- /dev/null +++ b/tests/local_testing/test_literalai.py @@ -0,0 +1,72 @@ +import os +import sys + +sys.path.insert(0, os.path.abspath("../..")) + +import asyncio +import logging + +import pytest + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.literal_ai import LiteralAILogger + +verbose_logger.setLevel(logging.DEBUG) + +litellm.set_verbose = True + + +@pytest.mark.asyncio +async def test_literalai_queue_logging(): + try: + # Initialize LiteralAILogger + test_literalai_logger = LiteralAILogger() + + litellm.callbacks = [test_literalai_logger] + test_literalai_logger.batch_size = 6 + litellm.set_verbose = True + + # Make multiple calls to ensure we don't hit the batch size + for _ in range(5): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + await asyncio.sleep(3) + + # Check that logs are in the queue + assert len(test_literalai_logger.log_queue) == 5 + + # Now make calls to exceed the batch size + for _ in range(3): + await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test message"}], + max_tokens=10, + temperature=0.2, + mock_response="This is a mock response", + ) + + # Wait a short time for any asynchronous operations to complete + await asyncio.sleep(1) + + print( + "Length of literalai log queue: {}".format( + len(test_literalai_logger.log_queue) + ) + ) + # Check that the queue was flushed after exceeding batch size + assert len(test_literalai_logger.log_queue) < 5 + + # Clean up + for cb in litellm.callbacks: + if isinstance(cb, LiteralAILogger): + await cb.async_httpx_client.client.aclose() + + except Exception as e: + pytest.fail(f"Error occurred: {e}")