diff --git a/litellm/__init__.py b/litellm/__init__.py index a90b77fb5..d2e709932 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -6,6 +6,7 @@ from litellm.caching import Cache input_callback: List[Union[str, Callable]] = [] success_callback: List[Union[str, Callable]] = [] failure_callback: List[Union[str, Callable]] = [] +callbacks: List[Callable] = [] set_verbose = False email: Optional[ str diff --git a/litellm/integrations/custom_logger.py b/litellm/integrations/custom_logger.py index d79b01cfe..66dd57eb2 100644 --- a/litellm/integrations/custom_logger.py +++ b/litellm/integrations/custom_logger.py @@ -12,7 +12,25 @@ class CustomLogger: # Class variables or attributes def __init__(self): pass + + def log_pre_api_call(self, model, messages, kwargs): + pass + + def log_post_api_call(self, kwargs, response_obj, start_time, end_time): + pass + def log_stream_event(self, kwargs, response_obj, start_time, end_time): + pass + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + pass + + def log_failure_event(self, kwargs, response_obj, start_time, end_time): + pass + + + #### DEPRECATED #### + def log_input_event(self, model, messages, kwargs, print_verbose, callback_func): try: print_verbose( diff --git a/litellm/tests/test_custom_logger.py b/litellm/tests/test_custom_logger.py index 75ba4253e..dc79eb3ce 100644 --- a/litellm/tests/test_custom_logger.py +++ b/litellm/tests/test_custom_logger.py @@ -6,90 +6,44 @@ sys.path.insert(0, os.path.abspath('../..')) from litellm import completion, embedding import litellm +from litellm.integrations.custom_logger import CustomLogger -def custom_callback( - kwargs, - completion_response, - start_time, - end_time, -): - print( - "in custom callback func" - ) - print("kwargs", kwargs) - print(completion_response) - print(start_time) - print(end_time) - if "complete_streaming_response" in kwargs: - print("\n\n complete response\n\n") - complete_streaming_response = kwargs["complete_streaming_response"] - print(kwargs["complete_streaming_response"]) - usage = complete_streaming_response["usage"] - print("usage", usage) -def send_slack_alert( - kwargs, - completion_response, - start_time, - end_time, -): - print( - "in custom slack callback func" - ) - import requests - import json +class MyCustomHandler(CustomLogger): + def log_pre_api_call(self, model, messages, kwargs): + print(f"Pre-API Call") + + def log_post_api_call(self, kwargs, response_obj, start_time, end_time): + print(f"Post-API Call") + + def log_stream_event(self, kwargs, response_obj, start_time, end_time): + print(f"On Stream") + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + print(f"On Success") - # Define the Slack webhook URL - slack_webhook_url = os.environ['SLACK_WEBHOOK_URL'] # "https://hooks.slack.com/services/<>/<>/<>" - - # Define the text payload, send data available in litellm custom_callbacks - text_payload = f"""LiteLLM Logging: kwargs: {str(kwargs)}\n\n, response: {str(completion_response)}\n\n, start time{str(start_time)} end time: {str(end_time)} - """ - payload = { - "text": text_payload - } - - # Set the headers - headers = { - "Content-type": "application/json" - } - - # Make the POST request - response = requests.post(slack_webhook_url, json=payload, headers=headers) - - # Check the response status - if response.status_code == 200: - print("Message sent successfully to Slack!") - else: - print(f"Failed to send message to Slack. Status code: {response.status_code}") - print(response.json()) - -def get_transformed_inputs( - kwargs, -): - params_to_model = kwargs["additional_args"]["complete_input_dict"] - print("params to model", params_to_model) - -litellm.success_callback = [custom_callback, send_slack_alert] -litellm.failure_callback = [send_slack_alert] - - -litellm.set_verbose = False - -# litellm.input_callback = [get_transformed_inputs] + def log_failure_event(self, kwargs, response_obj, start_time, end_time): + print(f"On Failure") +customHandler = MyCustomHandler() def test_chat_openai(): try: + litellm.callbacks = [customHandler] response = completion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai" }], stream=True) - - print(response) for chunk in response: - print(chunk) + # print(chunk) + continue + response = completion(model="gpt-3.5-turbo", + messages=[{ + "role": "user", + "content": "Hi 👋 - i'm openai" + }]) + # print(response) except Exception as e: print(e) @@ -97,3 +51,77 @@ def test_chat_openai(): test_chat_openai() + + + + + +# def custom_callback( +# kwargs, +# completion_response, +# start_time, +# end_time, +# ): +# print( +# "in custom callback func" +# ) +# print("kwargs", kwargs) +# print(completion_response) +# print(start_time) +# print(end_time) +# if "complete_streaming_response" in kwargs: +# print("\n\n complete response\n\n") +# complete_streaming_response = kwargs["complete_streaming_response"] +# print(kwargs["complete_streaming_response"]) +# usage = complete_streaming_response["usage"] +# print("usage", usage) +# def send_slack_alert( +# kwargs, +# completion_response, +# start_time, +# end_time, +# ): +# print( +# "in custom slack callback func" +# ) +# import requests +# import json + +# # Define the Slack webhook URL +# slack_webhook_url = os.environ['SLACK_WEBHOOK_URL'] # "https://hooks.slack.com/services/<>/<>/<>" + +# # Define the text payload, send data available in litellm custom_callbacks +# text_payload = f"""LiteLLM Logging: kwargs: {str(kwargs)}\n\n, response: {str(completion_response)}\n\n, start time{str(start_time)} end time: {str(end_time)} +# """ +# payload = { +# "text": text_payload +# } + +# # Set the headers +# headers = { +# "Content-type": "application/json" +# } + +# # Make the POST request +# response = requests.post(slack_webhook_url, json=payload, headers=headers) + +# # Check the response status +# if response.status_code == 200: +# print("Message sent successfully to Slack!") +# else: +# print(f"Failed to send message to Slack. Status code: {response.status_code}") +# print(response.json()) + +# def get_transformed_inputs( +# kwargs, +# ): +# params_to_model = kwargs["additional_args"]["complete_input_dict"] +# print("params to model", params_to_model) + +# litellm.success_callback = [custom_callback, send_slack_alert] +# litellm.failure_callback = [send_slack_alert] + + +# litellm.set_verbose = False + +# # litellm.input_callback = [get_transformed_inputs] diff --git a/litellm/utils.py b/litellm/utils.py index 673622d5c..c08517ae6 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -398,6 +398,12 @@ class Logging: message=f"Model Call Details pre-call: {self.model_call_details}", level="info", ) + elif isinstance(callback, CustomLogger): # custom logger class + callback.log_pre_api_call( + model=self.model, + messages=self.messages, + kwargs=self.model_call_details, + ) elif callable(callback): # custom logger functions customLogger.log_input_event( model=self.model, @@ -471,6 +477,12 @@ class Logging: message=f"Model Call Details post-call: {self.model_call_details}", level="info", ) + elif isinstance(callback, CustomLogger): # custom logger class + callback.log_post_api_call( + model=self.model, + messages=self.messages, + kwargs=self.model_call_details, + ) except: print_verbose( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {traceback.format_exc()}" @@ -603,6 +615,23 @@ class Logging: end_time=end_time, print_verbose=print_verbose, ) + if isinstance(callback, CustomLogger): # custom logger class + if self.stream and complete_streaming_response is None: + callback.log_stream_event( + kwargs=self.model_call_details, + response_obj=result, + start_time=start_time, + end_time=end_time + ) + else: + if self.stream and complete_streaming_response: + self.model_call_details["complete_response"] = self.model_call_details.pop("complete_streaming_response", complete_streaming_response) + callback.log_success_event( + kwargs=self.model_call_details, + response_obj=result, + start_time=start_time, + end_time=end_time, + ) if callable(callback): # custom logger functions customLogger.log_event( kwargs=self.model_call_details, @@ -690,6 +719,12 @@ class Logging: print_verbose=print_verbose, callback_func=callback ) + elif isinstance(callback, CustomLogger): # custom logger class + callback.log_failure_event( + model=self.model, + messages=self.messages, + kwargs=self.model_call_details, + ) except Exception as e: print_verbose( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {traceback.format_exc()}" @@ -755,6 +790,14 @@ def client(original_function): litellm.success_callback.append("lite_debugger") if "lite_debugger" not in litellm.failure_callback: litellm.failure_callback.append("lite_debugger") + if len(litellm.callbacks) > 0: + for callback in litellm.callbacks: + if callback not in litellm.input_callback: + litellm.input_callback.append(callback) + if callback not in litellm.success_callback: + litellm.success_callback.append(callback) + if callback not in litellm.failure_callback: + litellm.failure_callback.append(callback) if ( len(litellm.input_callback) > 0 or len(litellm.success_callback) > 0