mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
new logger client
This commit is contained in:
parent
f47faa1139
commit
e8eb92c108
9 changed files with 235 additions and 195 deletions
198
litellm/utils.py
198
litellm/utils.py
|
@ -141,27 +141,41 @@ def install_and_import(package: str):
|
|||
|
||||
|
||||
####### LOGGING ###################
|
||||
from enum import Enum
|
||||
|
||||
class CallTypes(Enum):
|
||||
embedding = 'embedding'
|
||||
completion = 'completion'
|
||||
|
||||
# Logging function -> log the exact model details + what's being sent | Non-Blocking
|
||||
class Logging:
|
||||
global supabaseClient, liteDebuggerClient
|
||||
|
||||
def __init__(self, model, messages, optional_params, litellm_params):
|
||||
def __init__(self, model, messages, stream, call_type, litellm_call_id):
|
||||
if call_type not in [item.value for item in CallTypes]:
|
||||
allowed_values = ", ".join([item.value for item in CallTypes])
|
||||
raise ValueError(f"Invalid call_type {call_type}. Allowed values: {allowed_values}")
|
||||
self.model = model
|
||||
self.messages = messages
|
||||
self.stream = stream
|
||||
self.call_type = call_type
|
||||
self.litellm_call_id = litellm_call_id
|
||||
|
||||
def update_environment_variables(self, optional_params, litellm_params):
|
||||
self.optional_params = optional_params
|
||||
self.litellm_params = litellm_params
|
||||
self.logger_fn = litellm_params["logger_fn"]
|
||||
print_verbose(f"self.optional_params: {self.optional_params}")
|
||||
self.model_call_details = {
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
"model": self.model,
|
||||
"messages": self.messages,
|
||||
"optional_params": self.optional_params,
|
||||
"litellm_params": self.litellm_params,
|
||||
}
|
||||
|
||||
def pre_call(self, input, api_key, model=None, additional_args={}):
|
||||
try:
|
||||
print_verbose(f"logging pre call for model: {self.model}")
|
||||
print_verbose(f"logging pre call for model: {self.model} with call type: {self.call_type}")
|
||||
self.model_call_details["input"] = input
|
||||
self.model_call_details["api_key"] = api_key
|
||||
self.model_call_details["additional_args"] = additional_args
|
||||
|
@ -215,6 +229,7 @@ class Logging:
|
|||
litellm_params=self.model_call_details["litellm_params"],
|
||||
optional_params=self.model_call_details["optional_params"],
|
||||
print_verbose=print_verbose,
|
||||
call_type=self.call_type,
|
||||
)
|
||||
except Exception as e:
|
||||
print_verbose(
|
||||
|
@ -235,7 +250,7 @@ class Logging:
|
|||
if capture_exception: # log this error to sentry for debugging
|
||||
capture_exception(e)
|
||||
|
||||
def post_call(self, input, api_key, original_response, additional_args={}):
|
||||
def post_call(self, original_response, input=None, api_key=None, additional_args={}):
|
||||
# Do something here
|
||||
try:
|
||||
self.model_call_details["input"] = input
|
||||
|
@ -262,13 +277,13 @@ class Logging:
|
|||
try:
|
||||
if callback == "lite_debugger":
|
||||
print_verbose("reaches litedebugger for post-call logging!")
|
||||
model = self.model_call_details["model"]
|
||||
messages = self.model_call_details["input"]
|
||||
print_verbose(f"liteDebuggerClient: {liteDebuggerClient}")
|
||||
liteDebuggerClient.post_call_log_event(
|
||||
original_response=original_response,
|
||||
litellm_call_id=self.litellm_params["litellm_call_id"],
|
||||
print_verbose=print_verbose,
|
||||
call_type = self.call_type,
|
||||
stream = self.stream
|
||||
)
|
||||
except:
|
||||
print_verbose(
|
||||
|
@ -285,7 +300,72 @@ class Logging:
|
|||
)
|
||||
pass
|
||||
|
||||
# Add more methods as needed
|
||||
|
||||
def success_handler(self, result, start_time, end_time):
|
||||
try:
|
||||
for callback in litellm.success_callback:
|
||||
try:
|
||||
if callback == "lite_debugger":
|
||||
print_verbose("reaches lite_debugger for logging!")
|
||||
print_verbose(f"liteDebuggerClient: {liteDebuggerClient}")
|
||||
print_verbose(f"liteDebuggerClient details function {self.call_type} and stream set to {self.stream}")
|
||||
liteDebuggerClient.log_event(
|
||||
end_user=litellm._thread_context.user,
|
||||
response_obj=result,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
litellm_call_id=self.litellm_call_id,
|
||||
print_verbose=print_verbose,
|
||||
call_type = self.call_type,
|
||||
stream = self.stream
|
||||
)
|
||||
except Exception as e:
|
||||
print_verbose(
|
||||
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {traceback.format_exc()}"
|
||||
)
|
||||
print_verbose(
|
||||
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}"
|
||||
)
|
||||
if capture_exception: # log this error to sentry for debugging
|
||||
capture_exception(e)
|
||||
except:
|
||||
print_verbose(
|
||||
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}"
|
||||
)
|
||||
pass
|
||||
|
||||
def failure_handler(self, exception, traceback_exception, start_time, end_time):
|
||||
try:
|
||||
for callback in litellm.failure_callback:
|
||||
if callback == "lite_debugger":
|
||||
print_verbose("reaches lite_debugger for logging!")
|
||||
print_verbose(f"liteDebuggerClient: {liteDebuggerClient}")
|
||||
result = {
|
||||
"model": self.model,
|
||||
"created": time.time(),
|
||||
"error": traceback_exception,
|
||||
"usage": {
|
||||
"prompt_tokens": prompt_token_calculator(
|
||||
self.model, messages=self.messages
|
||||
),
|
||||
"completion_tokens": 0,
|
||||
},
|
||||
}
|
||||
liteDebuggerClient.log_event(
|
||||
model=self.model,
|
||||
messages=self.messages,
|
||||
end_user=litellm._thread_context.user,
|
||||
response_obj=result,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
litellm_call_id=self.litellm_call_id,
|
||||
print_verbose=print_verbose,
|
||||
call_type = self.call_type,
|
||||
stream = self.stream
|
||||
)
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def exception_logging(
|
||||
|
@ -327,7 +407,7 @@ def client(original_function):
|
|||
*args, **kwargs
|
||||
): # just run once to check if user wants to send their data anywhere - PostHog/Sentry/Slack/etc.
|
||||
try:
|
||||
global callback_list, add_breadcrumb, user_logger_fn
|
||||
global callback_list, add_breadcrumb, user_logger_fn, Logging
|
||||
if (
|
||||
litellm.email is not None
|
||||
or os.getenv("LITELLM_EMAIL", None) is not None
|
||||
|
@ -369,12 +449,22 @@ def client(original_function):
|
|||
)
|
||||
if "logger_fn" in kwargs:
|
||||
user_logger_fn = kwargs["logger_fn"]
|
||||
# LOG SUCCESS
|
||||
# CRASH REPORTING TELEMETRY
|
||||
crash_reporting(*args, **kwargs)
|
||||
# INIT LOGGER - for user-specified integrations
|
||||
model = args[0] if len(args) > 1 else kwargs["model"]
|
||||
call_type = original_function.__name__
|
||||
if call_type == CallTypes.completion.value:
|
||||
messages = args[1] if len(args) > 2 else kwargs["messages"]
|
||||
elif call_type == CallTypes.embedding.value:
|
||||
messages = args[1] if len(args) > 2 else kwargs["input"]
|
||||
stream = True if "stream" in kwargs and kwargs["stream"] == True else False
|
||||
logging_obj = Logging(model=model, messages=messages, stream=stream, litellm_call_id=kwargs["litellm_call_id"], call_type=call_type)
|
||||
return logging_obj
|
||||
except: # DO NOT BLOCK running the function because of this
|
||||
print_verbose(f"[Non-Blocking] {traceback.format_exc()}")
|
||||
pass
|
||||
|
||||
|
||||
def crash_reporting(*args, **kwargs):
|
||||
if litellm.telemetry:
|
||||
try:
|
||||
|
@ -397,10 +487,11 @@ def client(original_function):
|
|||
def wrapper(*args, **kwargs):
|
||||
start_time = None
|
||||
result = None
|
||||
litellm_call_id = str(uuid.uuid4())
|
||||
kwargs["litellm_call_id"] = litellm_call_id
|
||||
logging_obj = function_setup(*args, **kwargs)
|
||||
kwargs["litellm_logging_obj"] = logging_obj
|
||||
try:
|
||||
function_setup(*args, **kwargs)
|
||||
litellm_call_id = str(uuid.uuid4())
|
||||
kwargs["litellm_call_id"] = litellm_call_id
|
||||
start_time = datetime.datetime.now()
|
||||
# [OPTIONAL] CHECK CACHE
|
||||
# remove this after deprecating litellm.caching
|
||||
|
@ -415,10 +506,13 @@ def client(original_function):
|
|||
|
||||
# MODEL CALL
|
||||
result = original_function(*args, **kwargs)
|
||||
end_time = datetime.datetime.now()
|
||||
# LOG SUCCESS
|
||||
logging_obj.success_handler(result, start_time, end_time)
|
||||
|
||||
if "stream" in kwargs and kwargs["stream"] == True:
|
||||
# TODO: Add to cache for streaming
|
||||
return result
|
||||
end_time = datetime.datetime.now()
|
||||
# [OPTIONAL] ADD TO CACHE
|
||||
if litellm.caching or litellm.caching_with_models or litellm.cache != None: # user init a cache object
|
||||
litellm.cache.add_cache(result, *args, **kwargs)
|
||||
|
@ -433,6 +527,7 @@ def client(original_function):
|
|||
traceback_exception = traceback.format_exc()
|
||||
crash_reporting(*args, **kwargs, exception=traceback_exception)
|
||||
end_time = datetime.datetime.now()
|
||||
logging_obj.failure_handler(e, traceback_exception, start_time, end_time)
|
||||
my_thread = threading.Thread(
|
||||
target=handle_failure,
|
||||
args=(e, traceback_exception, start_time, end_time, args, kwargs),
|
||||
|
@ -917,44 +1012,6 @@ def handle_failure(exception, traceback_exception, start_time, end_time, args, k
|
|||
litellm_call_id=kwargs["litellm_call_id"],
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
elif callback == "lite_debugger":
|
||||
print_verbose("reaches lite_debugger for logging!")
|
||||
print_verbose(f"liteDebuggerClient: {liteDebuggerClient}")
|
||||
model = args[0] if len(args) > 0 else kwargs["model"]
|
||||
messages = (
|
||||
args[1]
|
||||
if len(args) > 1
|
||||
else kwargs.get(
|
||||
"messages",
|
||||
[
|
||||
{
|
||||
"role": "user",
|
||||
"content": " ".join(kwargs.get("input", "")),
|
||||
}
|
||||
],
|
||||
)
|
||||
)
|
||||
result = {
|
||||
"model": model,
|
||||
"created": time.time(),
|
||||
"error": traceback_exception,
|
||||
"usage": {
|
||||
"prompt_tokens": prompt_token_calculator(
|
||||
model, messages=messages
|
||||
),
|
||||
"completion_tokens": 0,
|
||||
},
|
||||
}
|
||||
liteDebuggerClient.log_event(
|
||||
model=model,
|
||||
messages=messages,
|
||||
end_user=litellm._thread_context.user,
|
||||
response_obj=result,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
litellm_call_id=kwargs["litellm_call_id"],
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
except:
|
||||
print_verbose(
|
||||
f"Error Occurred while logging failure: {traceback.format_exc()}"
|
||||
|
@ -1085,32 +1142,6 @@ def handle_success(args, kwargs, result, start_time, end_time):
|
|||
litellm_call_id=kwargs["litellm_call_id"],
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
elif callback == "lite_debugger":
|
||||
print_verbose("reaches lite_debugger for logging!")
|
||||
print_verbose(f"liteDebuggerClient: {liteDebuggerClient}")
|
||||
messages = (
|
||||
args[1]
|
||||
if len(args) > 1
|
||||
else kwargs.get(
|
||||
"messages",
|
||||
[
|
||||
{
|
||||
"role": "user",
|
||||
"content": " ".join(kwargs.get("input", "")),
|
||||
}
|
||||
],
|
||||
)
|
||||
)
|
||||
liteDebuggerClient.log_event(
|
||||
model=model,
|
||||
messages=messages,
|
||||
end_user=litellm._thread_context.user,
|
||||
response_obj=result,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
litellm_call_id=kwargs["litellm_call_id"],
|
||||
print_verbose=print_verbose,
|
||||
)
|
||||
except Exception as e:
|
||||
# LOGGING
|
||||
exception_logging(logger_fn=user_logger_fn, exception=e)
|
||||
|
@ -1486,9 +1517,10 @@ def get_secret(secret_name):
|
|||
# wraps the completion stream to return the correct format for the model
|
||||
# replicate/anthropic/cohere
|
||||
class CustomStreamWrapper:
|
||||
def __init__(self, completion_stream, model, custom_llm_provider=None):
|
||||
def __init__(self, completion_stream, model, custom_llm_provider=None, logging_obj=None):
|
||||
self.model = model
|
||||
self.custom_llm_provider = custom_llm_provider
|
||||
self.logging_obj = logging_obj
|
||||
if model in litellm.cohere_models:
|
||||
# cohere does not return an iterator, so we need to wrap it in one
|
||||
self.completion_stream = iter(completion_stream)
|
||||
|
@ -1497,6 +1529,10 @@ class CustomStreamWrapper:
|
|||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def logging(self, text):
|
||||
if self.logging_obj:
|
||||
self.logging_obj.post_call(text)
|
||||
|
||||
def handle_anthropic_chunk(self, chunk):
|
||||
str_line = chunk.decode("utf-8") # Convert bytes to string
|
||||
|
@ -1586,6 +1622,8 @@ class CustomStreamWrapper:
|
|||
elif self.model in litellm.open_ai_text_completion_models:
|
||||
chunk = next(self.completion_stream)
|
||||
completion_obj["content"] = self.handle_openai_text_completion_chunk(chunk)
|
||||
# LOGGING
|
||||
self.logging_obj(completion_obj["content"])
|
||||
# return this for all models
|
||||
return {"choices": [{"delta": completion_obj}]}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue