forked from phoenix/litellm-mirror
fix(langsmith.py): support 'run_id' for langsmith
Fixes https://github.com/BerriAI/litellm/issues/6862
This commit is contained in:
parent
927f9fa4eb
commit
147dfa61b0
6 changed files with 49 additions and 37 deletions
|
@ -17,7 +17,11 @@ from litellm._logging import (
|
|||
_turn_on_json,
|
||||
log_level,
|
||||
)
|
||||
from litellm.constants import ROUTER_MAX_FALLBACKS
|
||||
from litellm.constants import (
|
||||
DEFAULT_BATCH_SIZE,
|
||||
DEFAULT_FLUSH_INTERVAL_SECONDS,
|
||||
ROUTER_MAX_FALLBACKS,
|
||||
)
|
||||
from litellm.types.guardrails import GuardrailItem
|
||||
from litellm.proxy._types import (
|
||||
KeyManagementSystem,
|
||||
|
|
|
@ -1 +1,3 @@
|
|||
ROUTER_MAX_FALLBACKS = 5
|
||||
DEFAULT_BATCH_SIZE = 512
|
||||
DEFAULT_FLUSH_INTERVAL_SECONDS = 5
|
||||
|
|
|
@ -8,20 +8,18 @@ import asyncio
|
|||
import time
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_logger
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
|
||||
DEFAULT_BATCH_SIZE = 512
|
||||
DEFAULT_FLUSH_INTERVAL_SECONDS = 5
|
||||
|
||||
|
||||
class CustomBatchLogger(CustomLogger):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
flush_lock: Optional[asyncio.Lock] = None,
|
||||
batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
|
||||
flush_interval: Optional[int] = DEFAULT_FLUSH_INTERVAL_SECONDS,
|
||||
batch_size: Optional[int] = None,
|
||||
flush_interval: Optional[int] = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
|
@ -29,13 +27,12 @@ class CustomBatchLogger(CustomLogger):
|
|||
flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
|
||||
"""
|
||||
self.log_queue: List = []
|
||||
self.flush_interval = flush_interval or DEFAULT_FLUSH_INTERVAL_SECONDS
|
||||
self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE
|
||||
self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS
|
||||
self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE
|
||||
self.last_flush_time = time.time()
|
||||
self.flush_lock = flush_lock
|
||||
|
||||
super().__init__(**kwargs)
|
||||
pass
|
||||
|
||||
async def periodic_flush(self):
|
||||
while True:
|
||||
|
|
|
@ -70,6 +70,7 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
self.log_queue: List[LangsmithQueueObject] = []
|
||||
asyncio.create_task(self.periodic_flush())
|
||||
self.flush_lock = asyncio.Lock()
|
||||
|
||||
super().__init__(**kwargs, flush_lock=self.flush_lock)
|
||||
|
||||
def get_credentials_from_env(
|
||||
|
@ -122,7 +123,7 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
"project_name", credentials["LANGSMITH_PROJECT"]
|
||||
)
|
||||
run_name = metadata.get("run_name", self.langsmith_default_run_name)
|
||||
run_id = metadata.get("id", None)
|
||||
run_id = metadata.get("id", metadata.get("run_id", None))
|
||||
parent_run_id = metadata.get("parent_run_id", None)
|
||||
trace_id = metadata.get("trace_id", None)
|
||||
session_id = metadata.get("session_id", None)
|
||||
|
@ -173,14 +174,28 @@ class LangsmithLogger(CustomBatchLogger):
|
|||
if dotted_order:
|
||||
data["dotted_order"] = dotted_order
|
||||
|
||||
run_id: Optional[str] = data.get("id") # type: ignore
|
||||
if "id" not in data or data["id"] is None:
|
||||
"""
|
||||
for /batch langsmith requires id, trace_id and dotted_order passed as params
|
||||
"""
|
||||
run_id = str(uuid.uuid4())
|
||||
data["id"] = str(run_id)
|
||||
data["trace_id"] = str(run_id)
|
||||
data["dotted_order"] = self.make_dot_order(run_id=run_id)
|
||||
|
||||
data["id"] = run_id
|
||||
|
||||
if (
|
||||
"trace_id" not in data
|
||||
or data["trace_id"] is None
|
||||
and (run_id is not None and isinstance(run_id, str))
|
||||
):
|
||||
data["trace_id"] = run_id
|
||||
|
||||
if (
|
||||
"dotted_order" not in data
|
||||
or data["dotted_order"] is None
|
||||
and (run_id is not None and isinstance(run_id, str))
|
||||
):
|
||||
data["dotted_order"] = self.make_dot_order(run_id=run_id) # type: ignore
|
||||
|
||||
verbose_logger.debug("Langsmith Logging data on langsmith: %s", data)
|
||||
|
||||
|
|
|
@ -16,23 +16,5 @@ model_list:
|
|||
model: openai/gpt-4o-realtime-preview-2024-10-01
|
||||
api_key: os.environ/OPENAI_API_KEY
|
||||
|
||||
router_settings:
|
||||
routing_strategy: usage-based-routing-v2
|
||||
#redis_url: "os.environ/REDIS_URL"
|
||||
redis_host: "os.environ/REDIS_HOST"
|
||||
redis_port: "os.environ/REDIS_PORT"
|
||||
|
||||
litellm_settings:
|
||||
cache: true
|
||||
cache_params:
|
||||
type: redis
|
||||
host: "os.environ/REDIS_HOST"
|
||||
port: "os.environ/REDIS_PORT"
|
||||
namespace: "litellm.caching"
|
||||
ttl: 600
|
||||
# key_generation_settings:
|
||||
# team_key_generation:
|
||||
# allowed_team_member_roles: ["admin"]
|
||||
# required_params: ["tags"] # require team admins to set tags for cost-tracking when generating a team key
|
||||
# personal_key_generation: # maps to 'Default Team' on UI
|
||||
# allowed_user_roles: ["proxy_admin"]
|
||||
success_callback: ["langsmith"]
|
|
@ -53,10 +53,17 @@ def test_async_langsmith_logging_with_metadata():
|
|||
@pytest.mark.asyncio
|
||||
async def test_async_langsmith_logging_with_streaming_and_metadata(sync_mode):
|
||||
try:
|
||||
litellm.DEFAULT_BATCH_SIZE = 1
|
||||
litellm.DEFAULT_FLUSH_INTERVAL_SECONDS = 1
|
||||
test_langsmith_logger = LangsmithLogger()
|
||||
litellm.success_callback = ["langsmith"]
|
||||
litellm.set_verbose = True
|
||||
run_id = str(uuid.uuid4())
|
||||
run_id = "497f6eca-6276-4993-bfeb-53cbbbba6f08"
|
||||
run_name = "litellmRUN"
|
||||
test_metadata = {
|
||||
"run_name": run_name, # langsmith run name
|
||||
"run_id": run_id, # langsmith run id
|
||||
}
|
||||
|
||||
messages = [{"role": "user", "content": "what llm are u"}]
|
||||
if sync_mode is True:
|
||||
|
@ -66,7 +73,7 @@ async def test_async_langsmith_logging_with_streaming_and_metadata(sync_mode):
|
|||
max_tokens=10,
|
||||
temperature=0.2,
|
||||
stream=True,
|
||||
metadata={"id": run_id},
|
||||
metadata=test_metadata,
|
||||
)
|
||||
for cb in litellm.callbacks:
|
||||
if isinstance(cb, LangsmithLogger):
|
||||
|
@ -82,7 +89,7 @@ async def test_async_langsmith_logging_with_streaming_and_metadata(sync_mode):
|
|||
temperature=0.2,
|
||||
mock_response="This is a mock request",
|
||||
stream=True,
|
||||
metadata={"id": run_id},
|
||||
metadata=test_metadata,
|
||||
)
|
||||
for cb in litellm.callbacks:
|
||||
if isinstance(cb, LangsmithLogger):
|
||||
|
@ -100,11 +107,16 @@ async def test_async_langsmith_logging_with_streaming_and_metadata(sync_mode):
|
|||
|
||||
input_fields_on_langsmith = logged_run_on_langsmith.get("inputs")
|
||||
|
||||
extra_fields_on_langsmith = logged_run_on_langsmith.get("extra").get(
|
||||
extra_fields_on_langsmith = logged_run_on_langsmith.get("extra", {}).get(
|
||||
"invocation_params"
|
||||
)
|
||||
|
||||
assert logged_run_on_langsmith.get("run_type") == "llm"
|
||||
assert (
|
||||
logged_run_on_langsmith.get("run_type") == "llm"
|
||||
), f"run_type should be llm. Got: {logged_run_on_langsmith.get('run_type')}"
|
||||
assert (
|
||||
logged_run_on_langsmith.get("name") == run_name
|
||||
), f"run_type should be llm. Got: {logged_run_on_langsmith.get('run_type')}"
|
||||
print("\nLogged INPUT ON LANGSMITH", input_fields_on_langsmith)
|
||||
|
||||
print("\nextra fields on langsmith", extra_fields_on_langsmith)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue