forked from phoenix/litellm-mirror
Merge pull request #3459 from alexanderepstein/langfuse_improvements
Update support for langfuse metadata
This commit is contained in:
commit
30003afbf8
4 changed files with 224 additions and 67 deletions
|
@ -94,9 +94,10 @@ print(response)
|
|||
|
||||
```
|
||||
|
||||
### Set Custom Trace ID, Trace User ID and Tags
|
||||
### Set Custom Trace ID, Trace User ID, Trace Metadata, Trace Version, Trace Release and Tags
|
||||
|
||||
Pass `trace_id`, `trace_user_id`, `trace_metadata`, `trace_version`, `trace_release`, `tags` in `metadata`
|
||||
|
||||
Pass `trace_id`, `trace_user_id` in `metadata`
|
||||
|
||||
```python
|
||||
import litellm
|
||||
|
@ -121,12 +122,20 @@ response = completion(
|
|||
metadata={
|
||||
"generation_name": "ishaan-test-generation", # set langfuse Generation Name
|
||||
"generation_id": "gen-id22", # set langfuse Generation ID
|
||||
"version": "test-generation-version" # set langfuse Generation Version
|
||||
"trace_user_id": "user-id2", # set langfuse Trace User ID
|
||||
"session_id": "session-1", # set langfuse Session ID
|
||||
"tags": ["tag1", "tag2"] # set langfuse Tags
|
||||
"tags": ["tag1", "tag2"], # set langfuse Tags
|
||||
"trace_id": "trace-id22", # set langfuse Trace ID
|
||||
"trace_metadata": {"key": "value"}, # set langfuse Trace Metadata
|
||||
"trace_version": "test-trace-version", # set langfuse Trace Version (if not set, defaults to Generation Version)
|
||||
"trace_release": "test-trace-release", # set langfuse Trace Release
|
||||
### OR ###
|
||||
"existing_trace_id": "trace-id22", # if generation is continuation of past trace. This prevents default behaviour of setting a trace name
|
||||
"existing_trace_id": "trace-id22", # if generation is continuation of past trace. This prevents default behaviour of setting a trace name
|
||||
### OR enforce that certain fields are trace overwritten in the trace during the continuation ###
|
||||
"existing_trace_id": "trace-id22",
|
||||
"trace_metadata": {"key": "updated_trace_value"}, # The new value to use for the langfuse Trace Metadata
|
||||
"update_trace_keys": ["input", "output", "trace_metadata"], # Updates the trace input & output to be this generations input & output also updates the Trace Metadata to match the passed in value
|
||||
},
|
||||
)
|
||||
|
||||
|
@ -134,6 +143,38 @@ print(response)
|
|||
|
||||
```
|
||||
|
||||
### Trace & Generation Parameters
|
||||
|
||||
#### Trace Specific Parameters
|
||||
|
||||
* `trace_id` - Identifier for the trace, must use `existing_trace_id` instead or in conjunction with `trace_id` if this is an existing trace, auto-generated by default
|
||||
* `trace_name` - Name of the trace, auto-generated by default
|
||||
* `session_id` - Session identifier for the trace, defaults to `None`
|
||||
* `trace_version` - Version for the trace, defaults to value for `version`
|
||||
* `trace_release` - Release for the trace, defaults to `None`
|
||||
* `trace_metadata` - Metadata for the trace, defaults to `None`
|
||||
* `trace_user_id` - User identifier for the trace, defaults to completion argument `user`
|
||||
* `tags` - Tags for the trace, defeaults to `None`
|
||||
|
||||
##### Updatable Parameters on Continuation
|
||||
|
||||
The following parameters can be updated on a continuation of a trace by passing in the following values into the `update_trace_keys` in the metadata of the completion.
|
||||
|
||||
* `input` - Will set the traces input to be the input of this latest generation
|
||||
* `output` - Will set the traces output to be the output of this generation
|
||||
* `trace_version` - Will set the trace version to be the provided value (To use the latest generations version instead, use `version`)
|
||||
* `trace_release` - Will set the trace release to be the provided value
|
||||
* `trace_metadata` - Will set the trace metadata to the provided value
|
||||
* `trace_user_id` - Will set the trace user id to the provided value
|
||||
|
||||
#### Generation Specific Parameters
|
||||
|
||||
* `generation_id` - Identifier for the generation, auto-generated by default
|
||||
* `generation_name` - Identifier for the generation, auto-generated by default
|
||||
* `prompt` - Langfuse prompt object used for the generation, defaults to None
|
||||
|
||||
Any other key value pairs passed into the metadata not listed in the above spec for a `litellm` completion will be added as a metadata key value pair for the generation.
|
||||
|
||||
### Use LangChain ChatLiteLLM + Langfuse
|
||||
Pass `trace_user_id`, `session_id` in model_kwargs
|
||||
```python
|
||||
|
|
|
@ -262,6 +262,7 @@ class LangFuseLogger:
|
|||
|
||||
try:
|
||||
tags = []
|
||||
metadata = copy.deepcopy(metadata) # Avoid modifying the original metadata
|
||||
supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3")
|
||||
supports_prompt = Version(langfuse.version.__version__) >= Version("2.7.3")
|
||||
supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3")
|
||||
|
@ -272,35 +273,9 @@ class LangFuseLogger:
|
|||
print_verbose(f"Langfuse Layer Logging - logging to langfuse v2 ")
|
||||
|
||||
if supports_tags:
|
||||
metadata_tags = metadata.get("tags", [])
|
||||
metadata_tags = metadata.pop("tags", [])
|
||||
tags = metadata_tags
|
||||
|
||||
trace_name = metadata.get("trace_name", None)
|
||||
trace_id = metadata.get("trace_id", None)
|
||||
existing_trace_id = metadata.get("existing_trace_id", None)
|
||||
if trace_name is None and existing_trace_id is None:
|
||||
# just log `litellm-{call_type}` as the trace name
|
||||
## DO NOT SET TRACE_NAME if trace-id set. this can lead to overwriting of past traces.
|
||||
trace_name = f"litellm-{kwargs.get('call_type', 'completion')}"
|
||||
|
||||
if existing_trace_id is not None:
|
||||
trace_params = {"id": existing_trace_id}
|
||||
else: # don't overwrite an existing trace
|
||||
trace_params = {
|
||||
"name": trace_name,
|
||||
"input": input,
|
||||
"user_id": metadata.get("trace_user_id", user_id),
|
||||
"id": trace_id,
|
||||
"session_id": metadata.get("session_id", None),
|
||||
}
|
||||
|
||||
if level == "ERROR":
|
||||
trace_params["status_message"] = output
|
||||
else:
|
||||
trace_params["output"] = output
|
||||
|
||||
cost = kwargs.get("response_cost", None)
|
||||
print_verbose(f"trace: {cost}")
|
||||
|
||||
# Clean Metadata before logging - never log raw metadata
|
||||
# the raw metadata can contain circular references which leads to infinite recursion
|
||||
|
@ -328,6 +303,58 @@ class LangFuseLogger:
|
|||
else:
|
||||
clean_metadata[key] = value
|
||||
|
||||
|
||||
session_id = clean_metadata.pop("session_id", None)
|
||||
trace_name = clean_metadata.pop("trace_name", None)
|
||||
trace_id = clean_metadata.pop("trace_id", None)
|
||||
existing_trace_id = clean_metadata.pop("existing_trace_id", None)
|
||||
update_trace_keys = clean_metadata.pop("update_trace_keys", [])
|
||||
|
||||
if trace_name is None and existing_trace_id is None:
|
||||
# just log `litellm-{call_type}` as the trace name
|
||||
## DO NOT SET TRACE_NAME if trace-id set. this can lead to overwriting of past traces.
|
||||
trace_name = f"litellm-{kwargs.get('call_type', 'completion')}"
|
||||
|
||||
if existing_trace_id is not None:
|
||||
trace_params = {"id": existing_trace_id}
|
||||
|
||||
# Update the following keys for this trace
|
||||
for metadata_param_key in update_trace_keys:
|
||||
trace_param_key = metadata_param_key.replace("trace_", "")
|
||||
if trace_param_key not in trace_params:
|
||||
updated_trace_value = clean_metadata.pop(metadata_param_key, None)
|
||||
if updated_trace_value is not None:
|
||||
trace_params[trace_param_key] = updated_trace_value
|
||||
|
||||
|
||||
# Pop the trace specific keys that would have been popped if there were a new trace
|
||||
for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())):
|
||||
clean_metadata.pop(key, None)
|
||||
|
||||
# Special keys that are found in the function arguments and not the metadata
|
||||
if "input" in update_trace_keys:
|
||||
trace_params["input"] = input
|
||||
if "output" in update_trace_keys:
|
||||
trace_params["output"] = output
|
||||
else: # don't overwrite an existing trace
|
||||
trace_params = {
|
||||
"id": trace_id,
|
||||
"name": trace_name,
|
||||
"session_id": session_id,
|
||||
"input": input,
|
||||
"version": clean_metadata.pop("trace_version", clean_metadata.get("version", None)), # If provided just version, it will applied to the trace as well, if applied a trace version it will take precedence
|
||||
}
|
||||
for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())):
|
||||
trace_params[key.replace("trace_", "")] = clean_metadata.pop(key, None)
|
||||
|
||||
if level == "ERROR":
|
||||
trace_params["status_message"] = output
|
||||
else:
|
||||
trace_params["output"] = output
|
||||
|
||||
cost = kwargs.get("response_cost", None)
|
||||
print_verbose(f"trace: {cost}")
|
||||
|
||||
if (
|
||||
litellm._langfuse_default_tags is not None
|
||||
and isinstance(litellm._langfuse_default_tags, list)
|
||||
|
@ -387,7 +414,7 @@ class LangFuseLogger:
|
|||
"completion_tokens": response_obj["usage"]["completion_tokens"],
|
||||
"total_cost": cost if supports_costs else None,
|
||||
}
|
||||
generation_name = metadata.get("generation_name", None)
|
||||
generation_name = clean_metadata.pop("generation_name", None)
|
||||
if generation_name is None:
|
||||
# just log `litellm-{call_type}` as the generation name
|
||||
generation_name = f"litellm-{kwargs.get('call_type', 'completion')}"
|
||||
|
@ -402,7 +429,7 @@ class LangFuseLogger:
|
|||
|
||||
generation_params = {
|
||||
"name": generation_name,
|
||||
"id": metadata.get("generation_id", generation_id),
|
||||
"id": clean_metadata.pop("generation_id", generation_id),
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"model": kwargs["model"],
|
||||
|
@ -412,10 +439,11 @@ class LangFuseLogger:
|
|||
"usage": usage,
|
||||
"metadata": clean_metadata,
|
||||
"level": level,
|
||||
"version": clean_metadata.pop("version", None),
|
||||
}
|
||||
|
||||
if supports_prompt:
|
||||
generation_params["prompt"] = metadata.get("prompt", None)
|
||||
generation_params["prompt"] = clean_metadata.pop("prompt", None)
|
||||
|
||||
if output is not None and isinstance(output, str) and level == "ERROR":
|
||||
generation_params["status_message"] = output
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
import copy
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import io, asyncio
|
||||
import asyncio
|
||||
|
||||
import logging
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
sys.path.insert(0, os.path.abspath("../.."))
|
||||
|
@ -18,6 +20,18 @@ import time
|
|||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def langfuse_client() -> "langfuse.Langfuse":
|
||||
import langfuse
|
||||
|
||||
langfuse_client = langfuse.Langfuse(
|
||||
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
|
||||
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
|
||||
)
|
||||
|
||||
with patch("langfuse.Langfuse", MagicMock(return_value=langfuse_client)) as mock_langfuse_client:
|
||||
yield mock_langfuse_client()
|
||||
|
||||
def search_logs(log_file_path, num_good_logs=1):
|
||||
"""
|
||||
Searches the given log file for logs containing the "/api/public" string.
|
||||
|
@ -129,21 +143,10 @@ def test_langfuse_logging_async():
|
|||
pytest.fail(f"An exception occurred - {e}")
|
||||
|
||||
|
||||
async def make_async_calls():
|
||||
async def make_async_calls(metadata = None, **completion_kwargs):
|
||||
tasks = []
|
||||
for _ in range(5):
|
||||
task = asyncio.create_task(
|
||||
litellm.acompletion(
|
||||
model="azure/chatgpt-v-2",
|
||||
messages=[{"role": "user", "content": "This is a test"}],
|
||||
max_tokens=5,
|
||||
temperature=0.7,
|
||||
timeout=5,
|
||||
user="langfuse_latency_test_user",
|
||||
mock_response="It's simple to use and easy to get started",
|
||||
)
|
||||
)
|
||||
tasks.append(task)
|
||||
tasks.append(create_async_task())
|
||||
|
||||
# Measure the start time before running the tasks
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
@ -161,9 +164,30 @@ async def make_async_calls():
|
|||
return total_time
|
||||
|
||||
|
||||
def create_async_task(**completion_kwargs):
|
||||
"""
|
||||
Creates an async task for the litellm.acompletion function.
|
||||
This is just the task, but it is not run here.
|
||||
To run the task it must be awaited or used in other asyncio coroutine execution functions like asyncio.gather.
|
||||
Any kwargs passed to this function will be passed to the litellm.acompletion function.
|
||||
By default a standard set of arguments are used for the litellm.acompletion function.
|
||||
"""
|
||||
completion_args = {
|
||||
"model": "azure/chatgpt-v-2",
|
||||
"messages": [{"role": "user", "content": "This is a test"}],
|
||||
"max_tokens": 5,
|
||||
"temperature": 0.7,
|
||||
"timeout": 5,
|
||||
"user": "langfuse_latency_test_user",
|
||||
"mock_response": "It's simple to use and easy to get started",
|
||||
}
|
||||
completion_args.update(completion_kwargs)
|
||||
return asyncio.create_task(litellm.acompletion(**completion_args))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("stream", [False, True])
|
||||
async def test_langfuse_logging_without_request_response(stream):
|
||||
async def test_langfuse_logging_without_request_response(stream, langfuse_client):
|
||||
try:
|
||||
import uuid
|
||||
|
||||
|
@ -171,28 +195,14 @@ async def test_langfuse_logging_without_request_response(stream):
|
|||
litellm.set_verbose = True
|
||||
litellm.turn_off_message_logging = True
|
||||
litellm.success_callback = ["langfuse"]
|
||||
response = await litellm.acompletion(
|
||||
model="gpt-3.5-turbo",
|
||||
mock_response="It's simple to use and easy to get started",
|
||||
messages=[{"role": "user", "content": "Hi 👋 - i'm claude"}],
|
||||
max_tokens=10,
|
||||
temperature=0.2,
|
||||
stream=stream,
|
||||
metadata={"trace_id": _unique_trace_name},
|
||||
)
|
||||
response = await create_async_task(model="gpt-3.5-turbo", stream=stream, metadata={"trace_id": _unique_trace_name})
|
||||
print(response)
|
||||
if stream:
|
||||
async for chunk in response:
|
||||
print(chunk)
|
||||
|
||||
await asyncio.sleep(3)
|
||||
|
||||
import langfuse
|
||||
|
||||
langfuse_client = langfuse.Langfuse(
|
||||
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
|
||||
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
|
||||
)
|
||||
langfuse_client.flush()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# get trace with _unique_trace_name
|
||||
trace = langfuse_client.get_generations(trace_id=_unique_trace_name)
|
||||
|
@ -211,6 +221,83 @@ async def test_langfuse_logging_without_request_response(stream):
|
|||
pytest.fail(f"An exception occurred - {e}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_langfuse_logging_metadata(langfuse_client):
|
||||
"""
|
||||
Test that creates multiple traces, with a varying number of generations and sets various metadata fields
|
||||
Confirms that no metadata that is standard within Langfuse is duplicated in the respective trace or generation metadata
|
||||
For trace continuation certain metadata of the trace is overriden with metadata from the last generation based on the update_trace_keys field
|
||||
Version is set for both the trace and the generation
|
||||
Release is just set for the trace
|
||||
Tags is just set for the trace
|
||||
"""
|
||||
import uuid
|
||||
|
||||
litellm.set_verbose = True
|
||||
litellm.success_callback = ["langfuse"]
|
||||
|
||||
trace_identifiers = {}
|
||||
expected_filtered_metadata_keys = {"trace_name", "trace_id", "existing_trace_id", "trace_user_id", "session_id", "tags", "generation_name", "generation_id", "prompt"}
|
||||
trace_metadata = {"trace_actual_metadata_key": "trace_actual_metadata_value"} # Allows for setting the metadata on the trace
|
||||
run_id = str(uuid.uuid4())
|
||||
session_id = f"litellm-test-session-{run_id}"
|
||||
trace_common_metadata = {
|
||||
"session_id": session_id,
|
||||
"tags": ["litellm-test-tag1", "litellm-test-tag2"],
|
||||
"update_trace_keys": ["output", "trace_metadata"], # Overwrite the following fields in the trace with the last generation's output and the trace_user_id
|
||||
"trace_metadata": trace_metadata,
|
||||
"gen_metadata_key": "gen_metadata_value", # Metadata key that should not be filtered in the generation
|
||||
"trace_release": "litellm-test-release",
|
||||
"version": "litellm-test-version",
|
||||
}
|
||||
for trace_num in range(1, 3): # Two traces
|
||||
metadata = copy.deepcopy(trace_common_metadata)
|
||||
trace_id = f"litellm-test-trace{trace_num}-{run_id}"
|
||||
metadata["trace_id"] = trace_id
|
||||
metadata["trace_name"] = trace_id
|
||||
trace_identifiers[trace_id] = []
|
||||
print(f"Trace: {trace_id}")
|
||||
for generation_num in range(1, trace_num + 1): # Each trace has a number of generations equal to its trace number
|
||||
metadata["trace_user_id"] = f"litellm-test-user{generation_num}-{run_id}"
|
||||
generation_id = f"litellm-test-trace{trace_num}-generation-{generation_num}-{run_id}"
|
||||
metadata["generation_id"] = generation_id
|
||||
metadata["generation_name"] = generation_id
|
||||
metadata["trace_metadata"]["generation_id"] = generation_id # Update to test if trace_metadata is overwritten by update trace keys
|
||||
trace_identifiers[trace_id].append(generation_id)
|
||||
print(f"Generation: {generation_id}")
|
||||
response = await create_async_task(model="gpt-3.5-turbo",
|
||||
mock_response=f"{session_id}:{trace_id}:{generation_id}",
|
||||
messages=[{"role": "user", "content": f"{session_id}:{trace_id}:{generation_id}"}],
|
||||
max_tokens=100,
|
||||
temperature=0.2,
|
||||
metadata=copy.deepcopy(metadata) # Every generation needs its own metadata, langfuse is not async/thread safe without it
|
||||
)
|
||||
print(response)
|
||||
metadata["existing_trace_id"] = trace_id
|
||||
|
||||
langfuse_client.flush()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# Tests the metadata filtering and the override of the output to be the last generation
|
||||
for trace_id, generation_ids in trace_identifiers.items():
|
||||
trace = langfuse_client.get_trace(id=trace_id)
|
||||
assert trace.id == trace_id
|
||||
assert trace.session_id == session_id
|
||||
assert trace.metadata != trace_metadata
|
||||
generations = list(reversed(langfuse_client.get_generations(trace_id=trace_id).data))
|
||||
assert len(generations) == len(generation_ids)
|
||||
assert trace.input == generations[0].input # Should be set by the first generation
|
||||
assert trace.output == generations[-1].output # Should be overwritten by the last generation according to update_trace_keys
|
||||
assert trace.metadata != generations[-1].metadata # Should be overwritten by the last generation according to update_trace_keys
|
||||
assert trace.metadata["generation_id"] == generations[-1].id
|
||||
assert set(trace.tags).issuperset(trace_common_metadata["tags"])
|
||||
print("trace_from_langfuse", trace)
|
||||
for generation_id, generation in zip(generation_ids, generations):
|
||||
assert generation.id == generation_id
|
||||
assert generation.trace_id == trace_id
|
||||
assert set(generation.metadata.keys()).isdisjoint(expected_filtered_metadata_keys)
|
||||
print("generation_from_langfuse", generation)
|
||||
|
||||
@pytest.mark.skip(reason="beta test - checking langfuse output")
|
||||
def test_langfuse_logging():
|
||||
try:
|
||||
|
@ -570,6 +657,7 @@ def test_langfuse_existing_trace_id():
|
|||
assert initial_langfuse_trace_dict == new_langfuse_trace_dict
|
||||
|
||||
|
||||
@pytest.mark.skipif(condition=not os.environ.get("OPENAI_API_KEY", False), reason="Authentication missing for openai")
|
||||
def test_langfuse_logging_tool_calling():
|
||||
litellm.set_verbose = True
|
||||
|
||||
|
|
|
@ -9373,7 +9373,7 @@ def get_secret(
|
|||
else:
|
||||
secret = os.environ.get(secret_name)
|
||||
try:
|
||||
secret_value_as_bool = ast.literal_eval(secret)
|
||||
secret_value_as_bool = ast.literal_eval(secret) if secret is not None else None
|
||||
if isinstance(secret_value_as_bool, bool):
|
||||
return secret_value_as_bool
|
||||
else:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue