mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
* test_anthropic_cache_control_hook_system_message * test_anthropic_cache_control_hook.py * should_run_prompt_management_hooks * fix should_run_prompt_management_hooks * test_anthropic_cache_control_hook_specific_index * fix test * fix linting errors * ChatCompletionCachedContent * initial commit for cache control * fixes ui design * fix inserting cache_control_injection_points * fix entering cache control points * fixes for using cache control on ui + backend * update cache control settings on edit model page * fix init custom logger compatible class * fix linting errors * fix linting errors * fix get_chat_completion_prompt
287 lines
9.9 KiB
Python
287 lines
9.9 KiB
Python
"""
|
|
Call Hook for LiteLLM Proxy which allows Langfuse prompt management.
|
|
"""
|
|
|
|
import os
|
|
from functools import lru_cache
|
|
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union, cast
|
|
|
|
from packaging.version import Version
|
|
from typing_extensions import TypeAlias
|
|
|
|
from litellm.integrations.custom_logger import CustomLogger
|
|
from litellm.integrations.prompt_management_base import PromptManagementClient
|
|
from litellm.litellm_core_utils.asyncify import run_async_function
|
|
from litellm.types.llms.openai import AllMessageValues, ChatCompletionSystemMessage
|
|
from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload
|
|
|
|
from ...litellm_core_utils.specialty_caches.dynamic_logging_cache import (
|
|
DynamicLoggingCache,
|
|
)
|
|
from ..prompt_management_base import PromptManagementBase
|
|
from .langfuse import LangFuseLogger
|
|
from .langfuse_handler import LangFuseHandler
|
|
|
|
if TYPE_CHECKING:
|
|
from langfuse import Langfuse
|
|
from langfuse.client import ChatPromptClient, TextPromptClient
|
|
|
|
LangfuseClass: TypeAlias = Langfuse
|
|
|
|
PROMPT_CLIENT = Union[TextPromptClient, ChatPromptClient]
|
|
else:
|
|
PROMPT_CLIENT = Any
|
|
LangfuseClass = Any
|
|
|
|
in_memory_dynamic_logger_cache = DynamicLoggingCache()
|
|
|
|
|
|
@lru_cache(maxsize=10)
|
|
def langfuse_client_init(
|
|
langfuse_public_key=None,
|
|
langfuse_secret=None,
|
|
langfuse_secret_key=None,
|
|
langfuse_host=None,
|
|
flush_interval=1,
|
|
) -> LangfuseClass:
|
|
"""
|
|
Initialize Langfuse client with caching to prevent multiple initializations.
|
|
|
|
Args:
|
|
langfuse_public_key (str, optional): Public key for Langfuse. Defaults to None.
|
|
langfuse_secret (str, optional): Secret key for Langfuse. Defaults to None.
|
|
langfuse_host (str, optional): Host URL for Langfuse. Defaults to None.
|
|
flush_interval (int, optional): Flush interval in seconds. Defaults to 1.
|
|
|
|
Returns:
|
|
Langfuse: Initialized Langfuse client instance
|
|
|
|
Raises:
|
|
Exception: If langfuse package is not installed
|
|
"""
|
|
try:
|
|
import langfuse
|
|
from langfuse import Langfuse
|
|
except Exception as e:
|
|
raise Exception(
|
|
f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n\033[0m"
|
|
)
|
|
|
|
# Instance variables
|
|
|
|
secret_key = (
|
|
langfuse_secret or langfuse_secret_key or os.getenv("LANGFUSE_SECRET_KEY")
|
|
)
|
|
public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
|
|
langfuse_host = langfuse_host or os.getenv(
|
|
"LANGFUSE_HOST", "https://cloud.langfuse.com"
|
|
)
|
|
|
|
if not (
|
|
langfuse_host.startswith("http://") or langfuse_host.startswith("https://")
|
|
):
|
|
# add http:// if unset, assume communicating over private network - e.g. render
|
|
langfuse_host = "http://" + langfuse_host
|
|
|
|
langfuse_release = os.getenv("LANGFUSE_RELEASE")
|
|
langfuse_debug = os.getenv("LANGFUSE_DEBUG")
|
|
|
|
parameters = {
|
|
"public_key": public_key,
|
|
"secret_key": secret_key,
|
|
"host": langfuse_host,
|
|
"release": langfuse_release,
|
|
"debug": langfuse_debug,
|
|
"flush_interval": LangFuseLogger._get_langfuse_flush_interval(
|
|
flush_interval
|
|
), # flush interval in seconds
|
|
}
|
|
|
|
if Version(langfuse.version.__version__) >= Version("2.6.0"):
|
|
parameters["sdk_integration"] = "litellm"
|
|
|
|
client = Langfuse(**parameters)
|
|
|
|
return client
|
|
|
|
|
|
class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogger):
|
|
def __init__(
|
|
self,
|
|
langfuse_public_key=None,
|
|
langfuse_secret=None,
|
|
langfuse_host=None,
|
|
flush_interval=1,
|
|
):
|
|
import langfuse
|
|
|
|
self.langfuse_sdk_version = langfuse.version.__version__
|
|
self.Langfuse = langfuse_client_init(
|
|
langfuse_public_key=langfuse_public_key,
|
|
langfuse_secret=langfuse_secret,
|
|
langfuse_host=langfuse_host,
|
|
flush_interval=flush_interval,
|
|
)
|
|
|
|
@property
|
|
def integration_name(self):
|
|
return "langfuse"
|
|
|
|
def _get_prompt_from_id(
|
|
self, langfuse_prompt_id: str, langfuse_client: LangfuseClass
|
|
) -> PROMPT_CLIENT:
|
|
return langfuse_client.get_prompt(langfuse_prompt_id)
|
|
|
|
def _compile_prompt(
|
|
self,
|
|
langfuse_prompt_client: PROMPT_CLIENT,
|
|
langfuse_prompt_variables: Optional[dict],
|
|
call_type: Union[Literal["completion"], Literal["text_completion"]],
|
|
) -> List[AllMessageValues]:
|
|
compiled_prompt: Optional[Union[str, list]] = None
|
|
|
|
if langfuse_prompt_variables is None:
|
|
langfuse_prompt_variables = {}
|
|
|
|
compiled_prompt = langfuse_prompt_client.compile(**langfuse_prompt_variables)
|
|
|
|
if isinstance(compiled_prompt, str):
|
|
compiled_prompt = [
|
|
ChatCompletionSystemMessage(role="system", content=compiled_prompt)
|
|
]
|
|
else:
|
|
compiled_prompt = cast(List[AllMessageValues], compiled_prompt)
|
|
|
|
return compiled_prompt
|
|
|
|
def _get_optional_params_from_langfuse(
|
|
self, langfuse_prompt_client: PROMPT_CLIENT
|
|
) -> dict:
|
|
config = langfuse_prompt_client.config
|
|
optional_params = {}
|
|
for k, v in config.items():
|
|
if k != "model":
|
|
optional_params[k] = v
|
|
return optional_params
|
|
|
|
async def async_get_chat_completion_prompt(
|
|
self,
|
|
model: str,
|
|
messages: List[AllMessageValues],
|
|
non_default_params: dict,
|
|
prompt_id: Optional[str],
|
|
prompt_variables: Optional[dict],
|
|
dynamic_callback_params: StandardCallbackDynamicParams,
|
|
) -> Tuple[
|
|
str,
|
|
List[AllMessageValues],
|
|
dict,
|
|
]:
|
|
return self.get_chat_completion_prompt(
|
|
model,
|
|
messages,
|
|
non_default_params,
|
|
prompt_id,
|
|
prompt_variables,
|
|
dynamic_callback_params,
|
|
)
|
|
|
|
def should_run_prompt_management(
|
|
self,
|
|
prompt_id: str,
|
|
dynamic_callback_params: StandardCallbackDynamicParams,
|
|
) -> bool:
|
|
langfuse_client = langfuse_client_init(
|
|
langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"),
|
|
langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
|
|
langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
|
|
langfuse_host=dynamic_callback_params.get("langfuse_host"),
|
|
)
|
|
langfuse_prompt_client = self._get_prompt_from_id(
|
|
langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client
|
|
)
|
|
return langfuse_prompt_client is not None
|
|
|
|
def _compile_prompt_helper(
|
|
self,
|
|
prompt_id: str,
|
|
prompt_variables: Optional[dict],
|
|
dynamic_callback_params: StandardCallbackDynamicParams,
|
|
) -> PromptManagementClient:
|
|
langfuse_client = langfuse_client_init(
|
|
langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"),
|
|
langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
|
|
langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
|
|
langfuse_host=dynamic_callback_params.get("langfuse_host"),
|
|
)
|
|
langfuse_prompt_client = self._get_prompt_from_id(
|
|
langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client
|
|
)
|
|
|
|
## SET PROMPT
|
|
compiled_prompt = self._compile_prompt(
|
|
langfuse_prompt_client=langfuse_prompt_client,
|
|
langfuse_prompt_variables=prompt_variables,
|
|
call_type="completion",
|
|
)
|
|
|
|
template_model = langfuse_prompt_client.config.get("model")
|
|
|
|
template_optional_params = self._get_optional_params_from_langfuse(
|
|
langfuse_prompt_client
|
|
)
|
|
|
|
return PromptManagementClient(
|
|
prompt_id=prompt_id,
|
|
prompt_template=compiled_prompt,
|
|
prompt_template_model=template_model,
|
|
prompt_template_optional_params=template_optional_params,
|
|
completed_messages=None,
|
|
)
|
|
|
|
def log_success_event(self, kwargs, response_obj, start_time, end_time):
|
|
return run_async_function(
|
|
self.async_log_success_event, kwargs, response_obj, start_time, end_time
|
|
)
|
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
|
standard_callback_dynamic_params = kwargs.get(
|
|
"standard_callback_dynamic_params"
|
|
)
|
|
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
|
|
globalLangfuseLogger=self,
|
|
standard_callback_dynamic_params=standard_callback_dynamic_params,
|
|
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
|
|
)
|
|
langfuse_logger_to_use.log_event_on_langfuse(
|
|
kwargs=kwargs,
|
|
response_obj=response_obj,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
user_id=kwargs.get("user", None),
|
|
)
|
|
|
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
|
standard_callback_dynamic_params = kwargs.get(
|
|
"standard_callback_dynamic_params"
|
|
)
|
|
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
|
|
globalLangfuseLogger=self,
|
|
standard_callback_dynamic_params=standard_callback_dynamic_params,
|
|
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
|
|
)
|
|
standard_logging_object = cast(
|
|
Optional[StandardLoggingPayload],
|
|
kwargs.get("standard_logging_object", None),
|
|
)
|
|
if standard_logging_object is None:
|
|
return
|
|
langfuse_logger_to_use.log_event_on_langfuse(
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
response_obj=None,
|
|
user_id=kwargs.get("user", None),
|
|
status_message=standard_logging_object["error_str"],
|
|
level="ERROR",
|
|
kwargs=kwargs,
|
|
)
|