(Refactor) Langfuse - remove prepare_metadata, langfuse python SDK now handles non-json serializable objects (#7925)

* test_langfuse_logging_completion_with_langfuse_metadata

* fix litellm - remove prepare metadata

* test_langfuse_logging_with_non_serializable_metadata

* detailed e2e langfuse metadata tests

* clean up langfuse logging

* fix langfuse

* remove unused imports

* fix code qa checks

* fix _prepare_metadata
This commit is contained in:
Ishaan Jaff 2025-01-22 22:11:40 -08:00 committed by GitHub
parent 27560bd5ad
commit 53a3ea3d06
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 1231 additions and 133 deletions

View file

@ -3,11 +3,9 @@
import copy
import os
import traceback
from collections.abc import MutableMapping, MutableSequence, MutableSet
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from packaging.version import Version
from pydantic import BaseModel
import litellm
from litellm._logging import verbose_logger
@ -71,8 +69,9 @@ class LangFuseLogger:
"flush_interval": self.langfuse_flush_interval, # flush interval in seconds
"httpx_client": self.langfuse_client,
}
self.langfuse_sdk_version: str = langfuse.version.__version__
if Version(langfuse.version.__version__) >= Version("2.6.0"):
if Version(self.langfuse_sdk_version) >= Version("2.6.0"):
parameters["sdk_integration"] = "litellm"
self.Langfuse = Langfuse(**parameters)
@ -360,73 +359,6 @@ class LangFuseLogger:
)
)
def is_base_type(self, value: Any) -> bool:
# Check if the value is of a base type
base_types = (int, float, str, bool, list, dict, tuple)
return isinstance(value, base_types)
def _prepare_metadata(self, metadata: Optional[dict]) -> Any:
try:
if metadata is None:
return None
# Filter out function types from the metadata
sanitized_metadata = {k: v for k, v in metadata.items() if not callable(v)}
return copy.deepcopy(sanitized_metadata)
except Exception as e:
verbose_logger.debug(f"Langfuse Layer Error - {e}, metadata: {metadata}")
new_metadata: Dict[str, Any] = {}
# if metadata is not a MutableMapping, return an empty dict since we can't call items() on it
if not isinstance(metadata, MutableMapping):
verbose_logger.debug(
"Langfuse Layer Logging - metadata is not a MutableMapping, returning empty dict"
)
return new_metadata
for key, value in metadata.items():
try:
if isinstance(value, MutableMapping):
new_metadata[key] = self._prepare_metadata(cast(dict, value))
elif isinstance(value, MutableSequence):
# For lists or other mutable sequences
new_metadata[key] = list(
(
self._prepare_metadata(cast(dict, v))
if isinstance(v, MutableMapping)
else copy.deepcopy(v)
)
for v in value
)
elif isinstance(value, MutableSet):
# For sets specifically, create a new set by passing an iterable
new_metadata[key] = set(
(
self._prepare_metadata(cast(dict, v))
if isinstance(v, MutableMapping)
else copy.deepcopy(v)
)
for v in value
)
elif isinstance(value, BaseModel):
new_metadata[key] = value.model_dump()
elif self.is_base_type(value):
new_metadata[key] = value
else:
verbose_logger.debug(
f"Langfuse Layer Error - Unsupported metadata type: {type(value)} for key: {key}"
)
continue
except (TypeError, copy.Error):
verbose_logger.debug(
f"Langfuse Layer Error - Couldn't copy metadata key: {key}, type of key: {type(key)}, type of value: {type(value)} - {traceback.format_exc()}"
)
return new_metadata
def _log_langfuse_v2( # noqa: PLR0915
self,
user_id,
@ -443,27 +375,17 @@ class LangFuseLogger:
print_verbose,
litellm_call_id,
) -> tuple:
import langfuse
verbose_logger.debug("Langfuse Layer Logging - logging to langfuse v2")
try:
metadata = self._prepare_metadata(metadata)
langfuse_version = Version(langfuse.version.__version__)
supports_tags = langfuse_version >= Version("2.6.3")
supports_prompt = langfuse_version >= Version("2.7.3")
supports_costs = langfuse_version >= Version("2.7.3")
supports_completion_start_time = langfuse_version >= Version("2.7.3")
metadata = metadata or {}
standard_logging_object: Optional[StandardLoggingPayload] = cast(
Optional[StandardLoggingPayload],
kwargs.get("standard_logging_object", None),
)
tags = (
self._get_langfuse_tags(standard_logging_object=standard_logging_object)
if supports_tags
if self._supports_tags()
else []
)
@ -624,7 +546,7 @@ class LangFuseLogger:
if aws_region_name:
clean_metadata["aws_region_name"] = aws_region_name
if supports_tags:
if self._supports_tags():
if "cache_hit" in kwargs:
if kwargs["cache_hit"] is None:
kwargs["cache_hit"] = False
@ -670,7 +592,7 @@ class LangFuseLogger:
usage = {
"prompt_tokens": _usage_obj.prompt_tokens,
"completion_tokens": _usage_obj.completion_tokens,
"total_cost": cost if supports_costs else None,
"total_cost": cost if self._supports_costs() else None,
}
generation_name = clean_metadata.pop("generation_name", None)
if generation_name is None:
@ -713,7 +635,7 @@ class LangFuseLogger:
if parent_observation_id is not None:
generation_params["parent_observation_id"] = parent_observation_id
if supports_prompt:
if self._supports_prompt():
generation_params = _add_prompt_to_generation_params(
generation_params=generation_params,
clean_metadata=clean_metadata,
@ -723,7 +645,7 @@ class LangFuseLogger:
if output is not None and isinstance(output, str) and level == "ERROR":
generation_params["status_message"] = output
if supports_completion_start_time:
if self._supports_completion_start_time():
generation_params["completion_start_time"] = kwargs.get(
"completion_start_time", None
)
@ -770,6 +692,22 @@ class LangFuseLogger:
tags.append(f"cache_key:{_cache_key}")
return tags
def _supports_tags(self):
"""Check if current langfuse version supports tags"""
return Version(self.langfuse_sdk_version) >= Version("2.6.3")
def _supports_prompt(self):
"""Check if current langfuse version supports prompt"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")
def _supports_costs(self):
"""Check if current langfuse version supports costs"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")
def _supports_completion_start_time(self):
"""Check if current langfuse version supports completion start time"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")
def _add_prompt_to_generation_params(
generation_params: dict,