diff --git a/docs/my-website/docs/observability/langfuse_integration.md b/docs/my-website/docs/observability/langfuse_integration.md index bf62ee9bc..ebf20b633 100644 --- a/docs/my-website/docs/observability/langfuse_integration.md +++ b/docs/my-website/docs/observability/langfuse_integration.md @@ -94,9 +94,10 @@ print(response) ``` -### Set Custom Trace ID, Trace User ID and Tags +### Set Custom Trace ID, Trace User ID, Trace Metadata, Trace Version, Trace Release and Tags + +Pass `trace_id`, `trace_user_id`, `trace_metadata`, `trace_version`, `trace_release`, `tags` in `metadata` -Pass `trace_id`, `trace_user_id` in `metadata` ```python import litellm @@ -121,12 +122,20 @@ response = completion( metadata={ "generation_name": "ishaan-test-generation", # set langfuse Generation Name "generation_id": "gen-id22", # set langfuse Generation ID + "version": "test-generation-version" # set langfuse Generation Version "trace_user_id": "user-id2", # set langfuse Trace User ID "session_id": "session-1", # set langfuse Session ID - "tags": ["tag1", "tag2"] # set langfuse Tags + "tags": ["tag1", "tag2"], # set langfuse Tags "trace_id": "trace-id22", # set langfuse Trace ID + "trace_metadata": {"key": "value"}, # set langfuse Trace Metadata + "trace_version": "test-trace-version", # set langfuse Trace Version (if not set, defaults to Generation Version) + "trace_release": "test-trace-release", # set langfuse Trace Release ### OR ### - "existing_trace_id": "trace-id22", # if generation is continuation of past trace. This prevents default behaviour of setting a trace name + "existing_trace_id": "trace-id22", # if generation is continuation of past trace. This prevents default behaviour of setting a trace name + ### OR enforce that certain fields are trace overwritten in the trace during the continuation ### + "existing_trace_id": "trace-id22", + "trace_metadata": {"key": "updated_trace_value"}, # The new value to use for the langfuse Trace Metadata + "update_trace_keys": ["input", "output", "trace_metadata"], # Updates the trace input & output to be this generations input & output also updates the Trace Metadata to match the passed in value }, ) @@ -134,6 +143,38 @@ print(response) ``` +### Trace & Generation Parameters + +#### Trace Specific Parameters + +* `trace_id` - Identifier for the trace, must use `existing_trace_id` instead or in conjunction with `trace_id` if this is an existing trace, auto-generated by default +* `trace_name` - Name of the trace, auto-generated by default +* `session_id` - Session identifier for the trace, defaults to `None` +* `trace_version` - Version for the trace, defaults to value for `version` +* `trace_release` - Release for the trace, defaults to `None` +* `trace_metadata` - Metadata for the trace, defaults to `None` +* `trace_user_id` - User identifier for the trace, defaults to completion argument `user` +* `tags` - Tags for the trace, defeaults to `None` + +##### Updatable Parameters on Continuation + +The following parameters can be updated on a continuation of a trace by passing in the following values into the `update_trace_keys` in the metadata of the completion. + +* `input` - Will set the traces input to be the input of this latest generation +* `output` - Will set the traces output to be the output of this generation +* `trace_version` - Will set the trace version to be the provided value (To use the latest generations version instead, use `version`) +* `trace_release` - Will set the trace release to be the provided value +* `trace_metadata` - Will set the trace metadata to the provided value +* `trace_user_id` - Will set the trace user id to the provided value + +#### Generation Specific Parameters + +* `generation_id` - Identifier for the generation, auto-generated by default +* `generation_name` - Identifier for the generation, auto-generated by default +* `prompt` - Langfuse prompt object used for the generation, defaults to None + +Any other key value pairs passed into the metadata not listed in the above spec for a `litellm` completion will be added as a metadata key value pair for the generation. + ### Use LangChain ChatLiteLLM + Langfuse Pass `trace_user_id`, `session_id` in model_kwargs ```python diff --git a/litellm/integrations/langfuse.py b/litellm/integrations/langfuse.py index 304edbbc9..fa8b0c61d 100644 --- a/litellm/integrations/langfuse.py +++ b/litellm/integrations/langfuse.py @@ -262,6 +262,7 @@ class LangFuseLogger: try: tags = [] + metadata = copy.deepcopy(metadata) # Avoid modifying the original metadata supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3") supports_prompt = Version(langfuse.version.__version__) >= Version("2.7.3") supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3") @@ -272,35 +273,9 @@ class LangFuseLogger: print_verbose(f"Langfuse Layer Logging - logging to langfuse v2 ") if supports_tags: - metadata_tags = metadata.get("tags", []) + metadata_tags = metadata.pop("tags", []) tags = metadata_tags - trace_name = metadata.get("trace_name", None) - trace_id = metadata.get("trace_id", None) - existing_trace_id = metadata.get("existing_trace_id", None) - if trace_name is None and existing_trace_id is None: - # just log `litellm-{call_type}` as the trace name - ## DO NOT SET TRACE_NAME if trace-id set. this can lead to overwriting of past traces. - trace_name = f"litellm-{kwargs.get('call_type', 'completion')}" - - if existing_trace_id is not None: - trace_params = {"id": existing_trace_id} - else: # don't overwrite an existing trace - trace_params = { - "name": trace_name, - "input": input, - "user_id": metadata.get("trace_user_id", user_id), - "id": trace_id, - "session_id": metadata.get("session_id", None), - } - - if level == "ERROR": - trace_params["status_message"] = output - else: - trace_params["output"] = output - - cost = kwargs.get("response_cost", None) - print_verbose(f"trace: {cost}") # Clean Metadata before logging - never log raw metadata # the raw metadata can contain circular references which leads to infinite recursion @@ -328,6 +303,58 @@ class LangFuseLogger: else: clean_metadata[key] = value + + session_id = clean_metadata.pop("session_id", None) + trace_name = clean_metadata.pop("trace_name", None) + trace_id = clean_metadata.pop("trace_id", None) + existing_trace_id = clean_metadata.pop("existing_trace_id", None) + update_trace_keys = clean_metadata.pop("update_trace_keys", []) + + if trace_name is None and existing_trace_id is None: + # just log `litellm-{call_type}` as the trace name + ## DO NOT SET TRACE_NAME if trace-id set. this can lead to overwriting of past traces. + trace_name = f"litellm-{kwargs.get('call_type', 'completion')}" + + if existing_trace_id is not None: + trace_params = {"id": existing_trace_id} + + # Update the following keys for this trace + for metadata_param_key in update_trace_keys: + trace_param_key = metadata_param_key.replace("trace_", "") + if trace_param_key not in trace_params: + updated_trace_value = clean_metadata.pop(metadata_param_key, None) + if updated_trace_value is not None: + trace_params[trace_param_key] = updated_trace_value + + + # Pop the trace specific keys that would have been popped if there were a new trace + for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())): + clean_metadata.pop(key, None) + + # Special keys that are found in the function arguments and not the metadata + if "input" in update_trace_keys: + trace_params["input"] = input + if "output" in update_trace_keys: + trace_params["output"] = output + else: # don't overwrite an existing trace + trace_params = { + "id": trace_id, + "name": trace_name, + "session_id": session_id, + "input": input, + "version": clean_metadata.pop("trace_version", clean_metadata.get("version", None)), # If provided just version, it will applied to the trace as well, if applied a trace version it will take precedence + } + for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())): + trace_params[key.replace("trace_", "")] = clean_metadata.pop(key, None) + + if level == "ERROR": + trace_params["status_message"] = output + else: + trace_params["output"] = output + + cost = kwargs.get("response_cost", None) + print_verbose(f"trace: {cost}") + if ( litellm._langfuse_default_tags is not None and isinstance(litellm._langfuse_default_tags, list) @@ -387,7 +414,7 @@ class LangFuseLogger: "completion_tokens": response_obj["usage"]["completion_tokens"], "total_cost": cost if supports_costs else None, } - generation_name = metadata.get("generation_name", None) + generation_name = clean_metadata.pop("generation_name", None) if generation_name is None: # just log `litellm-{call_type}` as the generation name generation_name = f"litellm-{kwargs.get('call_type', 'completion')}" @@ -402,7 +429,7 @@ class LangFuseLogger: generation_params = { "name": generation_name, - "id": metadata.get("generation_id", generation_id), + "id": clean_metadata.pop("generation_id", generation_id), "start_time": start_time, "end_time": end_time, "model": kwargs["model"], @@ -412,10 +439,11 @@ class LangFuseLogger: "usage": usage, "metadata": clean_metadata, "level": level, + "version": clean_metadata.pop("version", None), } if supports_prompt: - generation_params["prompt"] = metadata.get("prompt", None) + generation_params["prompt"] = clean_metadata.pop("prompt", None) if output is not None and isinstance(output, str) and level == "ERROR": generation_params["status_message"] = output diff --git a/litellm/tests/test_alangfuse.py b/litellm/tests/test_alangfuse.py index fd968c6a7..d71738cc6 100644 --- a/litellm/tests/test_alangfuse.py +++ b/litellm/tests/test_alangfuse.py @@ -1,9 +1,11 @@ +import copy import json import sys import os -import io, asyncio +import asyncio import logging +from unittest.mock import MagicMock, patch logging.basicConfig(level=logging.DEBUG) sys.path.insert(0, os.path.abspath("../..")) @@ -18,6 +20,18 @@ import time import pytest +@pytest.fixture +def langfuse_client() -> "langfuse.Langfuse": + import langfuse + + langfuse_client = langfuse.Langfuse( + public_key=os.environ["LANGFUSE_PUBLIC_KEY"], + secret_key=os.environ["LANGFUSE_SECRET_KEY"], + ) + + with patch("langfuse.Langfuse", MagicMock(return_value=langfuse_client)) as mock_langfuse_client: + yield mock_langfuse_client() + def search_logs(log_file_path, num_good_logs=1): """ Searches the given log file for logs containing the "/api/public" string. @@ -129,21 +143,10 @@ def test_langfuse_logging_async(): pytest.fail(f"An exception occurred - {e}") -async def make_async_calls(): +async def make_async_calls(metadata = None, **completion_kwargs): tasks = [] for _ in range(5): - task = asyncio.create_task( - litellm.acompletion( - model="azure/chatgpt-v-2", - messages=[{"role": "user", "content": "This is a test"}], - max_tokens=5, - temperature=0.7, - timeout=5, - user="langfuse_latency_test_user", - mock_response="It's simple to use and easy to get started", - ) - ) - tasks.append(task) + tasks.append(create_async_task()) # Measure the start time before running the tasks start_time = asyncio.get_event_loop().time() @@ -161,9 +164,30 @@ async def make_async_calls(): return total_time +def create_async_task(**completion_kwargs): + """ + Creates an async task for the litellm.acompletion function. + This is just the task, but it is not run here. + To run the task it must be awaited or used in other asyncio coroutine execution functions like asyncio.gather. + Any kwargs passed to this function will be passed to the litellm.acompletion function. + By default a standard set of arguments are used for the litellm.acompletion function. + """ + completion_args = { + "model": "azure/chatgpt-v-2", + "messages": [{"role": "user", "content": "This is a test"}], + "max_tokens": 5, + "temperature": 0.7, + "timeout": 5, + "user": "langfuse_latency_test_user", + "mock_response": "It's simple to use and easy to get started", + } + completion_args.update(completion_kwargs) + return asyncio.create_task(litellm.acompletion(**completion_args)) + + @pytest.mark.asyncio @pytest.mark.parametrize("stream", [False, True]) -async def test_langfuse_logging_without_request_response(stream): +async def test_langfuse_logging_without_request_response(stream, langfuse_client): try: import uuid @@ -171,28 +195,14 @@ async def test_langfuse_logging_without_request_response(stream): litellm.set_verbose = True litellm.turn_off_message_logging = True litellm.success_callback = ["langfuse"] - response = await litellm.acompletion( - model="gpt-3.5-turbo", - mock_response="It's simple to use and easy to get started", - messages=[{"role": "user", "content": "Hi 👋 - i'm claude"}], - max_tokens=10, - temperature=0.2, - stream=stream, - metadata={"trace_id": _unique_trace_name}, - ) + response = await create_async_task(model="gpt-3.5-turbo", stream=stream, metadata={"trace_id": _unique_trace_name}) print(response) if stream: async for chunk in response: print(chunk) - await asyncio.sleep(3) - - import langfuse - - langfuse_client = langfuse.Langfuse( - public_key=os.environ["LANGFUSE_PUBLIC_KEY"], - secret_key=os.environ["LANGFUSE_SECRET_KEY"], - ) + langfuse_client.flush() + await asyncio.sleep(2) # get trace with _unique_trace_name trace = langfuse_client.get_generations(trace_id=_unique_trace_name) @@ -211,6 +221,83 @@ async def test_langfuse_logging_without_request_response(stream): pytest.fail(f"An exception occurred - {e}") +@pytest.mark.asyncio +async def test_langfuse_logging_metadata(langfuse_client): + """ + Test that creates multiple traces, with a varying number of generations and sets various metadata fields + Confirms that no metadata that is standard within Langfuse is duplicated in the respective trace or generation metadata + For trace continuation certain metadata of the trace is overriden with metadata from the last generation based on the update_trace_keys field + Version is set for both the trace and the generation + Release is just set for the trace + Tags is just set for the trace + """ + import uuid + + litellm.set_verbose = True + litellm.success_callback = ["langfuse"] + + trace_identifiers = {} + expected_filtered_metadata_keys = {"trace_name", "trace_id", "existing_trace_id", "trace_user_id", "session_id", "tags", "generation_name", "generation_id", "prompt"} + trace_metadata = {"trace_actual_metadata_key": "trace_actual_metadata_value"} # Allows for setting the metadata on the trace + run_id = str(uuid.uuid4()) + session_id = f"litellm-test-session-{run_id}" + trace_common_metadata = { + "session_id": session_id, + "tags": ["litellm-test-tag1", "litellm-test-tag2"], + "update_trace_keys": ["output", "trace_metadata"], # Overwrite the following fields in the trace with the last generation's output and the trace_user_id + "trace_metadata": trace_metadata, + "gen_metadata_key": "gen_metadata_value", # Metadata key that should not be filtered in the generation + "trace_release": "litellm-test-release", + "version": "litellm-test-version", + } + for trace_num in range(1, 3): # Two traces + metadata = copy.deepcopy(trace_common_metadata) + trace_id = f"litellm-test-trace{trace_num}-{run_id}" + metadata["trace_id"] = trace_id + metadata["trace_name"] = trace_id + trace_identifiers[trace_id] = [] + print(f"Trace: {trace_id}") + for generation_num in range(1, trace_num + 1): # Each trace has a number of generations equal to its trace number + metadata["trace_user_id"] = f"litellm-test-user{generation_num}-{run_id}" + generation_id = f"litellm-test-trace{trace_num}-generation-{generation_num}-{run_id}" + metadata["generation_id"] = generation_id + metadata["generation_name"] = generation_id + metadata["trace_metadata"]["generation_id"] = generation_id # Update to test if trace_metadata is overwritten by update trace keys + trace_identifiers[trace_id].append(generation_id) + print(f"Generation: {generation_id}") + response = await create_async_task(model="gpt-3.5-turbo", + mock_response=f"{session_id}:{trace_id}:{generation_id}", + messages=[{"role": "user", "content": f"{session_id}:{trace_id}:{generation_id}"}], + max_tokens=100, + temperature=0.2, + metadata=copy.deepcopy(metadata) # Every generation needs its own metadata, langfuse is not async/thread safe without it + ) + print(response) + metadata["existing_trace_id"] = trace_id + + langfuse_client.flush() + await asyncio.sleep(2) + + # Tests the metadata filtering and the override of the output to be the last generation + for trace_id, generation_ids in trace_identifiers.items(): + trace = langfuse_client.get_trace(id=trace_id) + assert trace.id == trace_id + assert trace.session_id == session_id + assert trace.metadata != trace_metadata + generations = list(reversed(langfuse_client.get_generations(trace_id=trace_id).data)) + assert len(generations) == len(generation_ids) + assert trace.input == generations[0].input # Should be set by the first generation + assert trace.output == generations[-1].output # Should be overwritten by the last generation according to update_trace_keys + assert trace.metadata != generations[-1].metadata # Should be overwritten by the last generation according to update_trace_keys + assert trace.metadata["generation_id"] == generations[-1].id + assert set(trace.tags).issuperset(trace_common_metadata["tags"]) + print("trace_from_langfuse", trace) + for generation_id, generation in zip(generation_ids, generations): + assert generation.id == generation_id + assert generation.trace_id == trace_id + assert set(generation.metadata.keys()).isdisjoint(expected_filtered_metadata_keys) + print("generation_from_langfuse", generation) + @pytest.mark.skip(reason="beta test - checking langfuse output") def test_langfuse_logging(): try: @@ -570,6 +657,7 @@ def test_langfuse_existing_trace_id(): assert initial_langfuse_trace_dict == new_langfuse_trace_dict +@pytest.mark.skipif(condition=not os.environ.get("OPENAI_API_KEY", False), reason="Authentication missing for openai") def test_langfuse_logging_tool_calling(): litellm.set_verbose = True diff --git a/litellm/utils.py b/litellm/utils.py index 75031a7c3..03f1698d0 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -9363,7 +9363,7 @@ def get_secret( else: secret = os.environ.get(secret_name) try: - secret_value_as_bool = ast.literal_eval(secret) + secret_value_as_bool = ast.literal_eval(secret) if secret is not None else None if isinstance(secret_value_as_bool, bool): return secret_value_as_bool else: