feat(utils.py): adding additional states for custom logging

This commit is contained in:
Krrish Dholakia 2023-11-04 17:07:20 -07:00
parent 7c46e85ed6
commit c3916a7754
4 changed files with 161 additions and 71 deletions

View file

@ -6,6 +6,7 @@ from litellm.caching import Cache
input_callback: List[Union[str, Callable]] = []
success_callback: List[Union[str, Callable]] = []
failure_callback: List[Union[str, Callable]] = []
callbacks: List[Callable] = []
set_verbose = False
email: Optional[
str

View file

@ -13,6 +13,24 @@ class CustomLogger:
def __init__(self):
pass
def log_pre_api_call(self, model, messages, kwargs):
pass
def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
pass
def log_stream_event(self, kwargs, response_obj, start_time, end_time):
pass
def log_success_event(self, kwargs, response_obj, start_time, end_time):
pass
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
pass
#### DEPRECATED ####
def log_input_event(self, model, messages, kwargs, print_verbose, callback_func):
try:
print_verbose(

View file

@ -6,90 +6,44 @@ sys.path.insert(0, os.path.abspath('../..'))
from litellm import completion, embedding
import litellm
from litellm.integrations.custom_logger import CustomLogger
def custom_callback(
kwargs,
completion_response,
start_time,
end_time,
):
print(
"in custom callback func"
)
print("kwargs", kwargs)
print(completion_response)
print(start_time)
print(end_time)
if "complete_streaming_response" in kwargs:
print("\n\n complete response\n\n")
complete_streaming_response = kwargs["complete_streaming_response"]
print(kwargs["complete_streaming_response"])
usage = complete_streaming_response["usage"]
print("usage", usage)
def send_slack_alert(
kwargs,
completion_response,
start_time,
end_time,
):
print(
"in custom slack callback func"
)
import requests
import json
class MyCustomHandler(CustomLogger):
def log_pre_api_call(self, model, messages, kwargs):
print(f"Pre-API Call")
# Define the Slack webhook URL
slack_webhook_url = os.environ['SLACK_WEBHOOK_URL'] # "https://hooks.slack.com/services/<>/<>/<>"
def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
print(f"Post-API Call")
# Define the text payload, send data available in litellm custom_callbacks
text_payload = f"""LiteLLM Logging: kwargs: {str(kwargs)}\n\n, response: {str(completion_response)}\n\n, start time{str(start_time)} end time: {str(end_time)}
"""
payload = {
"text": text_payload
}
def log_stream_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Stream")
# Set the headers
headers = {
"Content-type": "application/json"
}
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
# Make the POST request
response = requests.post(slack_webhook_url, json=payload, headers=headers)
# Check the response status
if response.status_code == 200:
print("Message sent successfully to Slack!")
else:
print(f"Failed to send message to Slack. Status code: {response.status_code}")
print(response.json())
def get_transformed_inputs(
kwargs,
):
params_to_model = kwargs["additional_args"]["complete_input_dict"]
print("params to model", params_to_model)
litellm.success_callback = [custom_callback, send_slack_alert]
litellm.failure_callback = [send_slack_alert]
litellm.set_verbose = False
# litellm.input_callback = [get_transformed_inputs]
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
customHandler = MyCustomHandler()
def test_chat_openai():
try:
litellm.callbacks = [customHandler]
response = completion(model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": "Hi 👋 - i'm openai"
}],
stream=True)
print(response)
for chunk in response:
print(chunk)
# print(chunk)
continue
response = completion(model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": "Hi 👋 - i'm openai"
}])
# print(response)
except Exception as e:
print(e)
@ -97,3 +51,77 @@ def test_chat_openai():
test_chat_openai()
# def custom_callback(
# kwargs,
# completion_response,
# start_time,
# end_time,
# ):
# print(
# "in custom callback func"
# )
# print("kwargs", kwargs)
# print(completion_response)
# print(start_time)
# print(end_time)
# if "complete_streaming_response" in kwargs:
# print("\n\n complete response\n\n")
# complete_streaming_response = kwargs["complete_streaming_response"]
# print(kwargs["complete_streaming_response"])
# usage = complete_streaming_response["usage"]
# print("usage", usage)
# def send_slack_alert(
# kwargs,
# completion_response,
# start_time,
# end_time,
# ):
# print(
# "in custom slack callback func"
# )
# import requests
# import json
# # Define the Slack webhook URL
# slack_webhook_url = os.environ['SLACK_WEBHOOK_URL'] # "https://hooks.slack.com/services/<>/<>/<>"
# # Define the text payload, send data available in litellm custom_callbacks
# text_payload = f"""LiteLLM Logging: kwargs: {str(kwargs)}\n\n, response: {str(completion_response)}\n\n, start time{str(start_time)} end time: {str(end_time)}
# """
# payload = {
# "text": text_payload
# }
# # Set the headers
# headers = {
# "Content-type": "application/json"
# }
# # Make the POST request
# response = requests.post(slack_webhook_url, json=payload, headers=headers)
# # Check the response status
# if response.status_code == 200:
# print("Message sent successfully to Slack!")
# else:
# print(f"Failed to send message to Slack. Status code: {response.status_code}")
# print(response.json())
# def get_transformed_inputs(
# kwargs,
# ):
# params_to_model = kwargs["additional_args"]["complete_input_dict"]
# print("params to model", params_to_model)
# litellm.success_callback = [custom_callback, send_slack_alert]
# litellm.failure_callback = [send_slack_alert]
# litellm.set_verbose = False
# # litellm.input_callback = [get_transformed_inputs]

View file

@ -398,6 +398,12 @@ class Logging:
message=f"Model Call Details pre-call: {self.model_call_details}",
level="info",
)
elif isinstance(callback, CustomLogger): # custom logger class
callback.log_pre_api_call(
model=self.model,
messages=self.messages,
kwargs=self.model_call_details,
)
elif callable(callback): # custom logger functions
customLogger.log_input_event(
model=self.model,
@ -471,6 +477,12 @@ class Logging:
message=f"Model Call Details post-call: {self.model_call_details}",
level="info",
)
elif isinstance(callback, CustomLogger): # custom logger class
callback.log_post_api_call(
model=self.model,
messages=self.messages,
kwargs=self.model_call_details,
)
except:
print_verbose(
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {traceback.format_exc()}"
@ -603,6 +615,23 @@ class Logging:
end_time=end_time,
print_verbose=print_verbose,
)
if isinstance(callback, CustomLogger): # custom logger class
if self.stream and complete_streaming_response is None:
callback.log_stream_event(
kwargs=self.model_call_details,
response_obj=result,
start_time=start_time,
end_time=end_time
)
else:
if self.stream and complete_streaming_response:
self.model_call_details["complete_response"] = self.model_call_details.pop("complete_streaming_response", complete_streaming_response)
callback.log_success_event(
kwargs=self.model_call_details,
response_obj=result,
start_time=start_time,
end_time=end_time,
)
if callable(callback): # custom logger functions
customLogger.log_event(
kwargs=self.model_call_details,
@ -690,6 +719,12 @@ class Logging:
print_verbose=print_verbose,
callback_func=callback
)
elif isinstance(callback, CustomLogger): # custom logger class
callback.log_failure_event(
model=self.model,
messages=self.messages,
kwargs=self.model_call_details,
)
except Exception as e:
print_verbose(
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {traceback.format_exc()}"
@ -755,6 +790,14 @@ def client(original_function):
litellm.success_callback.append("lite_debugger")
if "lite_debugger" not in litellm.failure_callback:
litellm.failure_callback.append("lite_debugger")
if len(litellm.callbacks) > 0:
for callback in litellm.callbacks:
if callback not in litellm.input_callback:
litellm.input_callback.append(callback)
if callback not in litellm.success_callback:
litellm.success_callback.append(callback)
if callback not in litellm.failure_callback:
litellm.failure_callback.append(callback)
if (
len(litellm.input_callback) > 0
or len(litellm.success_callback) > 0