Revert "Revert "Logfire Integration""

This reverts commit b04a8d878a.
This commit is contained in:
alisalim17 2024-05-21 11:07:40 +04:00
parent 07abccf96f
commit 01bb26bbba
6 changed files with 431 additions and 22 deletions

View file

@ -41,6 +41,7 @@ jobs:
pip install langchain pip install langchain
pip install lunary==0.2.5 pip install lunary==0.2.5
pip install "langfuse==2.27.1" pip install "langfuse==2.27.1"
pip install "logfire==0.29.0"
pip install numpydoc pip install numpydoc
pip install traceloop-sdk==0.18.2 pip install traceloop-sdk==0.18.2
pip install openai pip install openai
@ -89,7 +90,6 @@ jobs:
fi fi
cd .. cd ..
# Run pytest and generate JUnit XML report # Run pytest and generate JUnit XML report
- run: - run:
name: Run tests name: Run tests
@ -172,6 +172,7 @@ jobs:
pip install "aioboto3==12.3.0" pip install "aioboto3==12.3.0"
pip install langchain pip install langchain
pip install "langfuse>=2.0.0" pip install "langfuse>=2.0.0"
pip install "logfire==0.29.0"
pip install numpydoc pip install numpydoc
pip install prisma pip install prisma
pip install fastapi pip install fastapi

View file

@ -0,0 +1,60 @@
import Image from '@theme/IdealImage';
# Logfire - Logging LLM Input/Output
Logfire is open Source Observability & Analytics for LLM Apps
Detailed production traces and a granular view on quality, cost and latency
<Image img={require('../../img/logfire.png')} />
:::info
We want to learn how we can make the callbacks better! Meet the LiteLLM [founders](https://calendly.com/d/4mp-gd3-k5k/berriai-1-1-onboarding-litellm-hosted-version) or
join our [discord](https://discord.gg/wuPM9dRgDw)
:::
## Pre-Requisites
Ensure you have run `pip install logfire` for this integration
```shell
pip install logfire litellm
```
## Quick Start
Get your Logfire token from [Logfire](https://logfire.pydantic.dev/)
```python
litellm.success_callback = ["logfire"]
litellm.failure_callback = ["logfire"] # logs errors to logfire
```
```python
# pip install logfire
import litellm
import os
# from https://logfire.pydantic.dev/
os.environ["LOGFIRE_TOKEN"] = ""
# LLM API Keys
os.environ['OPENAI_API_KEY']=""
# set logfire as a callback, litellm will send the data to logfire
litellm.success_callback = ["logfire"]
# openai call
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "Hi 👋 - i'm openai"}
]
)
```
## Support & Talk to Founders
- [Schedule Demo 👋](https://calendly.com/d/4mp-gd3-k5k/berriai-1-1-onboarding-litellm-hosted-version)
- [Community Discord 💭](https://discord.gg/wuPM9dRgDw)
- Our numbers 📞 +1 (770) 8783-106 / +1 (412) 618-6238
- Our emails ✉️ ishaan@berri.ai / krrish@berri.ai

Binary file not shown.

After

Width:  |  Height:  |  Size: 695 KiB

View file

@ -0,0 +1,178 @@
#### What this does ####
# On success + failure, log events to Logfire
import dotenv, os
dotenv.load_dotenv() # Loading env variables using dotenv
import traceback
import uuid
from litellm._logging import print_verbose, verbose_logger
from enum import Enum
from typing import Any, Dict, NamedTuple
from typing_extensions import LiteralString
class SpanConfig(NamedTuple):
message_template: LiteralString
span_data: Dict[str, Any]
class LogfireLevel(str, Enum):
INFO = "info"
ERROR = "error"
class LogfireLogger:
# Class variables or attributes
def __init__(self):
try:
verbose_logger.debug(f"in init logfire logger")
import logfire
# only setting up logfire if we are sending to logfire
# in testing, we don't want to send to logfire
if logfire.DEFAULT_LOGFIRE_INSTANCE.config.send_to_logfire:
logfire.configure(token=os.getenv("LOGFIRE_TOKEN"))
except Exception as e:
print_verbose(f"Got exception on init logfire client {str(e)}")
raise e
def _get_span_config(self, payload) -> SpanConfig:
if (
payload["call_type"] == "completion"
or payload["call_type"] == "acompletion"
):
return SpanConfig(
message_template="Chat Completion with {request_data[model]!r}",
span_data={"request_data": payload},
)
elif (
payload["call_type"] == "embedding" or payload["call_type"] == "aembedding"
):
return SpanConfig(
message_template="Embedding Creation with {request_data[model]!r}",
span_data={"request_data": payload},
)
elif (
payload["call_type"] == "image_generation"
or payload["call_type"] == "aimage_generation"
):
return SpanConfig(
message_template="Image Generation with {request_data[model]!r}",
span_data={"request_data": payload},
)
else:
return SpanConfig(
message_template="Litellm Call with {request_data[model]!r}",
span_data={"request_data": payload},
)
async def _async_log_event(
self,
kwargs,
response_obj,
start_time,
end_time,
print_verbose,
level: LogfireLevel,
):
self.log_event(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
print_verbose=print_verbose,
level=level,
)
def log_event(
self,
kwargs,
start_time,
end_time,
print_verbose,
level: LogfireLevel,
response_obj,
):
try:
import logfire
verbose_logger.debug(
f"logfire Logging - Enters logging function for model {kwargs}"
)
if not response_obj:
response_obj = {}
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
messages = kwargs.get("messages")
optional_params = kwargs.get("optional_params", {})
call_type = kwargs.get("call_type", "completion")
cache_hit = kwargs.get("cache_hit", False)
usage = response_obj.get("usage", {})
id = response_obj.get("id", str(uuid.uuid4()))
try:
response_time = (end_time - start_time).total_seconds()
except:
response_time = None
# Clean Metadata before logging - never log raw metadata
# the raw metadata can contain circular references which leads to infinite recursion
# we clean out all extra litellm metadata params before logging
clean_metadata = {}
if isinstance(metadata, dict):
for key, value in metadata.items():
# clean litellm metadata before logging
if key in [
"endpoint",
"caching_groups",
"previous_models",
]:
continue
else:
clean_metadata[key] = value
# Build the initial payload
payload = {
"id": id,
"call_type": call_type,
"cache_hit": cache_hit,
"startTime": start_time,
"endTime": end_time,
"responseTime (seconds)": response_time,
"model": kwargs.get("model", ""),
"user": kwargs.get("user", ""),
"modelParameters": optional_params,
"spend": kwargs.get("response_cost", 0),
"messages": messages,
"response": response_obj,
"usage": usage,
"metadata": clean_metadata,
}
logfire_openai = logfire.with_settings(custom_scope_suffix="openai")
message_template, span_data = self._get_span_config(payload)
if level == LogfireLevel.INFO:
logfire_openai.info(
message_template,
**span_data,
)
elif level == LogfireLevel.ERROR:
logfire_openai.error(
message_template,
**span_data,
_exc_info=True,
)
print_verbose(f"\ndd Logger - Logging payload = {payload}")
print_verbose(
f"Logfire Layer Logging - final response object: {response_obj}"
)
except Exception as e:
traceback.print_exc()
verbose_logger.debug(
f"Logfire Layer Error - {str(e)}\n{traceback.format_exc()}"
)
pass

View file

@ -0,0 +1,117 @@
import sys
import os
import json
import time
import logfire
import litellm
import pytest
from logfire.testing import TestExporter, SimpleSpanProcessor
sys.path.insert(0, os.path.abspath("../.."))
# Testing scenarios for logfire logging:
# 1. Test logfire logging for completion
# 2. Test logfire logging for acompletion
# 3. Test logfire logging for completion while streaming is enabled
# 4. Test logfire logging for completion while streaming is enabled
@pytest.mark.parametrize("stream", [False, True])
def test_completion_logfire_logging(stream):
litellm.success_callback = ["logfire"]
litellm.set_verbose = True
exporter = TestExporter()
logfire.configure(
send_to_logfire=False,
console=False,
processors=[SimpleSpanProcessor(exporter)],
collect_system_metrics=False,
)
messages = [{"role": "user", "content": "what llm are u"}]
temperature = 0.3
max_tokens = 10
response = litellm.completion(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=stream,
)
print(response)
if stream:
for chunk in response:
print(chunk)
time.sleep(5)
exported_spans = exporter.exported_spans_as_dict()
assert len(exported_spans) == 1
assert (
exported_spans[0]["attributes"]["logfire.msg"]
== "Chat Completion with 'gpt-3.5-turbo'"
)
request_data = json.loads(exported_spans[0]["attributes"]["request_data"])
assert request_data["model"] == "gpt-3.5-turbo"
assert request_data["messages"] == messages
assert "completion_tokens" in request_data["usage"]
assert "prompt_tokens" in request_data["usage"]
assert "total_tokens" in request_data["usage"]
assert request_data["response"]["choices"][0]["message"]["content"]
assert request_data["modelParameters"]["max_tokens"] == max_tokens
assert request_data["modelParameters"]["temperature"] == temperature
@pytest.mark.asyncio
@pytest.mark.parametrize("stream", [False, True])
async def test_acompletion_logfire_logging(stream):
litellm.success_callback = ["logfire"]
litellm.set_verbose = True
exporter = TestExporter()
logfire.configure(
send_to_logfire=False,
console=False,
processors=[SimpleSpanProcessor(exporter)],
collect_system_metrics=False,
)
messages = [{"role": "user", "content": "what llm are u"}]
temperature = 0.3
max_tokens = 10
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
print(response)
if stream:
for chunk in response:
print(chunk)
time.sleep(5)
exported_spans = exporter.exported_spans_as_dict()
print("exported_spans", exported_spans)
assert len(exported_spans) == 1
assert (
exported_spans[0]["attributes"]["logfire.msg"]
== "Chat Completion with 'gpt-3.5-turbo'"
)
request_data = json.loads(exported_spans[0]["attributes"]["request_data"])
assert request_data["model"] == "gpt-3.5-turbo"
assert request_data["messages"] == messages
assert "completion_tokens" in request_data["usage"]
assert "prompt_tokens" in request_data["usage"]
assert "total_tokens" in request_data["usage"]
assert request_data["response"]["choices"][0]["message"]["content"]
assert request_data["modelParameters"]["max_tokens"] == max_tokens
assert request_data["modelParameters"]["temperature"] == temperature

View file

@ -6,7 +6,6 @@
# +-----------------------------------------------+ # +-----------------------------------------------+
# #
# Thank you users! We ❤️ you! - Krrish & Ishaan # Thank you users! We ❤️ you! - Krrish & Ishaan
import sys, re, binascii, struct import sys, re, binascii, struct
import litellm import litellm
import dotenv, json, traceback, threading, base64, ast import dotenv, json, traceback, threading, base64, ast
@ -73,6 +72,7 @@ from .integrations.supabase import Supabase
from .integrations.lunary import LunaryLogger from .integrations.lunary import LunaryLogger
from .integrations.prompt_layer import PromptLayerLogger from .integrations.prompt_layer import PromptLayerLogger
from .integrations.langsmith import LangsmithLogger from .integrations.langsmith import LangsmithLogger
from .integrations.logfire_logger import LogfireLogger, LogfireLevel
from .integrations.weights_biases import WeightsBiasesLogger from .integrations.weights_biases import WeightsBiasesLogger
from .integrations.custom_logger import CustomLogger from .integrations.custom_logger import CustomLogger
from .integrations.langfuse import LangFuseLogger from .integrations.langfuse import LangFuseLogger
@ -146,6 +146,7 @@ heliconeLogger = None
athinaLogger = None athinaLogger = None
promptLayerLogger = None promptLayerLogger = None
langsmithLogger = None langsmithLogger = None
logfireLogger = None
weightsBiasesLogger = None weightsBiasesLogger = None
customLogger = None customLogger = None
langFuseLogger = None langFuseLogger = None
@ -1130,7 +1131,7 @@ class CallTypes(Enum):
# Logging function -> log the exact model details + what's being sent | Non-BlockingP # Logging function -> log the exact model details + what's being sent | Non-BlockingP
class Logging: class Logging:
global supabaseClient, liteDebuggerClient, promptLayerLogger, weightsBiasesLogger, langsmithLogger, capture_exception, add_breadcrumb, lunaryLogger global supabaseClient, liteDebuggerClient, promptLayerLogger, weightsBiasesLogger, langsmithLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger
custom_pricing: bool = False custom_pricing: bool = False
@ -1672,7 +1673,7 @@ class Logging:
# this only logs streaming once, complete_streaming_response exists i.e when stream ends # this only logs streaming once, complete_streaming_response exists i.e when stream ends
if self.stream: if self.stream:
if "complete_streaming_response" not in kwargs: if "complete_streaming_response" not in kwargs:
return continue
else: else:
print_verbose("reaches supabase for streaming logging!") print_verbose("reaches supabase for streaming logging!")
result = kwargs["complete_streaming_response"] result = kwargs["complete_streaming_response"]
@ -1706,7 +1707,7 @@ class Logging:
print_verbose("reaches langsmith for logging!") print_verbose("reaches langsmith for logging!")
if self.stream: if self.stream:
if "complete_streaming_response" not in kwargs: if "complete_streaming_response" not in kwargs:
break continue
else: else:
print_verbose( print_verbose(
"reaches langsmith for streaming logging!" "reaches langsmith for streaming logging!"
@ -1719,6 +1720,33 @@ class Logging:
end_time=end_time, end_time=end_time,
print_verbose=print_verbose, print_verbose=print_verbose,
) )
if callback == "logfire":
global logfireLogger
verbose_logger.debug("reaches logfire for success logging!")
kwargs = {}
for k, v in self.model_call_details.items():
if (
k != "original_response"
): # copy.deepcopy raises errors as this could be a coroutine
kwargs[k] = v
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
if self.stream:
if "complete_streaming_response" not in kwargs:
continue
else:
print_verbose("reaches logfire for streaming logging!")
result = kwargs["complete_streaming_response"]
logfireLogger.log_event(
kwargs=self.model_call_details,
response_obj=result,
start_time=start_time,
end_time=end_time,
print_verbose=print_verbose,
level=LogfireLevel.INFO.value,
)
if callback == "lunary": if callback == "lunary":
print_verbose("reaches lunary for logging!") print_verbose("reaches lunary for logging!")
model = self.model model = self.model
@ -1735,7 +1763,7 @@ class Logging:
# this only logs streaming once, complete_streaming_response exists i.e when stream ends # this only logs streaming once, complete_streaming_response exists i.e when stream ends
if self.stream: if self.stream:
if "complete_streaming_response" not in kwargs: if "complete_streaming_response" not in kwargs:
break continue
else: else:
result = kwargs["complete_streaming_response"] result = kwargs["complete_streaming_response"]
@ -1880,7 +1908,7 @@ class Logging:
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}"
) )
if complete_streaming_response is None: if complete_streaming_response is None:
break continue
else: else:
print_verbose("reaches langfuse for streaming logging!") print_verbose("reaches langfuse for streaming logging!")
result = kwargs["complete_streaming_response"] result = kwargs["complete_streaming_response"]
@ -1909,7 +1937,7 @@ class Logging:
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}"
) )
if complete_streaming_response is None: if complete_streaming_response is None:
break continue
else: else:
print_verbose( print_verbose(
"reaches clickhouse for streaming logging!" "reaches clickhouse for streaming logging!"
@ -1938,7 +1966,7 @@ class Logging:
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}"
) )
if complete_streaming_response is None: if complete_streaming_response is None:
break continue
else: else:
print_verbose( print_verbose(
"reaches greenscale for streaming logging!" "reaches greenscale for streaming logging!"
@ -2409,7 +2437,9 @@ class Logging:
def failure_handler( def failure_handler(
self, exception, traceback_exception, start_time=None, end_time=None self, exception, traceback_exception, start_time=None, end_time=None
): ):
print_verbose(f"Logging Details LiteLLM-Failure Call") print_verbose(
f"Logging Details LiteLLM-Failure Call: {litellm.failure_callback}"
)
try: try:
start_time, end_time = self._failure_handler_helper_fn( start_time, end_time = self._failure_handler_helper_fn(
exception=exception, exception=exception,
@ -2464,7 +2494,7 @@ class Logging:
call_type=self.call_type, call_type=self.call_type,
stream=self.stream, stream=self.stream,
) )
elif callback == "lunary": if callback == "lunary":
print_verbose("reaches lunary for logging error!") print_verbose("reaches lunary for logging error!")
model = self.model model = self.model
@ -2489,7 +2519,7 @@ class Logging:
end_time=end_time, end_time=end_time,
print_verbose=print_verbose, print_verbose=print_verbose,
) )
elif callback == "sentry": if callback == "sentry":
print_verbose("sending exception to sentry") print_verbose("sending exception to sentry")
if capture_exception: if capture_exception:
capture_exception(exception) capture_exception(exception)
@ -2497,7 +2527,7 @@ class Logging:
print_verbose( print_verbose(
f"capture exception not initialized: {capture_exception}" f"capture exception not initialized: {capture_exception}"
) )
elif callable(callback): # custom logger functions if callable(callback): # custom logger functions
customLogger.log_event( customLogger.log_event(
kwargs=self.model_call_details, kwargs=self.model_call_details,
response_obj=result, response_obj=result,
@ -2506,7 +2536,7 @@ class Logging:
print_verbose=print_verbose, print_verbose=print_verbose,
callback_func=callback, callback_func=callback,
) )
elif ( if (
isinstance(callback, CustomLogger) isinstance(callback, CustomLogger)
and self.model_call_details.get("litellm_params", {}).get( and self.model_call_details.get("litellm_params", {}).get(
"acompletion", False "acompletion", False
@ -2523,7 +2553,7 @@ class Logging:
response_obj=result, response_obj=result,
kwargs=self.model_call_details, kwargs=self.model_call_details,
) )
elif callback == "langfuse": if callback == "langfuse":
global langFuseLogger global langFuseLogger
verbose_logger.debug("reaches langfuse for logging failure") verbose_logger.debug("reaches langfuse for logging failure")
kwargs = {} kwargs = {}
@ -2559,7 +2589,7 @@ class Logging:
level="ERROR", level="ERROR",
kwargs=self.model_call_details, kwargs=self.model_call_details,
) )
elif callback == "prometheus": if callback == "prometheus":
global prometheusLogger global prometheusLogger
verbose_logger.debug("reaches prometheus for success logging!") verbose_logger.debug("reaches prometheus for success logging!")
kwargs = {} kwargs = {}
@ -2577,6 +2607,26 @@ class Logging:
user_id=kwargs.get("user", None), user_id=kwargs.get("user", None),
print_verbose=print_verbose, print_verbose=print_verbose,
) )
if callback == "logfire":
global logfireLogger
verbose_logger.debug("reaches logfire for failure logging!")
kwargs = {}
for k, v in self.model_call_details.items():
if (
k != "original_response"
): # copy.deepcopy raises errors as this could be a coroutine
kwargs[k] = v
kwargs["exception"] = exception
logfireLogger.log_event(
kwargs=kwargs,
response_obj=result,
start_time=start_time,
end_time=end_time,
level=LogfireLevel.ERROR.value,
print_verbose=print_verbose,
)
except Exception as e: except Exception as e:
print_verbose( print_verbose(
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}" f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}"
@ -3343,6 +3393,7 @@ def client(original_function):
return original_function(*args, **kwargs) return original_function(*args, **kwargs)
traceback_exception = traceback.format_exc() traceback_exception = traceback.format_exc()
end_time = datetime.datetime.now() end_time = datetime.datetime.now()
# LOG FAILURE - handle streaming failure logging in the _next_ object, remove `handle_failure` once it's deprecated # LOG FAILURE - handle streaming failure logging in the _next_ object, remove `handle_failure` once it's deprecated
if logging_obj: if logging_obj:
logging_obj.failure_handler( logging_obj.failure_handler(
@ -7441,7 +7492,7 @@ def validate_environment(model: Optional[str] = None) -> dict:
def set_callbacks(callback_list, function_id=None): def set_callbacks(callback_list, function_id=None):
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, langsmithLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, aispendLogger, berrispendLogger, supabaseClient, liteDebuggerClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, langsmithLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger
try: try:
for callback in callback_list: for callback in callback_list:
@ -7523,6 +7574,8 @@ def set_callbacks(callback_list, function_id=None):
weightsBiasesLogger = WeightsBiasesLogger() weightsBiasesLogger = WeightsBiasesLogger()
elif callback == "langsmith": elif callback == "langsmith":
langsmithLogger = LangsmithLogger() langsmithLogger = LangsmithLogger()
elif callback == "logfire":
logfireLogger = LogfireLogger()
elif callback == "aispend": elif callback == "aispend":
aispendLogger = AISpendLogger() aispendLogger = AISpendLogger()
elif callback == "berrispend": elif callback == "berrispend":