feat(utils.py): add async success callbacks for custom functions

This commit is contained in:
Krrish Dholakia 2023-12-04 16:36:21 -08:00
parent eab7d41cd3
commit d1a525b6c9
8 changed files with 232 additions and 138 deletions

View file

@ -1,5 +1,5 @@
### What this tests ####
import sys, os, time
import sys, os, time, inspect, asyncio
import pytest
sys.path.insert(0, os.path.abspath('../..'))
@ -7,6 +7,7 @@ from litellm import completion, embedding
import litellm
from litellm.integrations.custom_logger import CustomLogger
async_success = False
class MyCustomHandler(CustomLogger):
success: bool = False
failure: bool = False
@ -28,24 +29,29 @@ class MyCustomHandler(CustomLogger):
print(f"On Failure")
self.failure = True
# def test_chat_openai():
# try:
# customHandler = MyCustomHandler()
# litellm.callbacks = [customHandler]
# response = completion(model="gpt-3.5-turbo",
# messages=[{
# "role": "user",
# "content": "Hi 👋 - i'm openai"
# }],
# stream=True)
# time.sleep(1)
# assert customHandler.success == True
# except Exception as e:
# pytest.fail(f"An error occurred - {str(e)}")
# pass
async def async_test_logging_fn(kwargs, completion_obj, start_time, end_time):
global async_success
print(f"ON ASYNC LOGGING")
async_success = True
# test_chat_openai()
@pytest.mark.asyncio
async def test_chat_openai():
try:
# litellm.set_verbose = True
litellm.success_callback = [async_test_logging_fn]
response = await litellm.acompletion(model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": "Hi 👋 - i'm openai"
}],
stream=True)
async for chunk in response:
continue
assert async_success == True
except Exception as e:
print(e)
pytest.fail(f"An error occurred - {str(e)}")
def test_completion_azure_stream_moderation_failure():
try:
@ -71,76 +77,3 @@ def test_completion_azure_stream_moderation_failure():
assert customHandler.failure == True
except Exception as e:
pytest.fail(f"Error occurred: {e}")
# test_completion_azure_stream_moderation_failure()
# def custom_callback(
# kwargs,
# completion_response,
# start_time,
# end_time,
# ):
# print(
# "in custom callback func"
# )
# print("kwargs", kwargs)
# print(completion_response)
# print(start_time)
# print(end_time)
# if "complete_streaming_response" in kwargs:
# print("\n\n complete response\n\n")
# complete_streaming_response = kwargs["complete_streaming_response"]
# print(kwargs["complete_streaming_response"])
# usage = complete_streaming_response["usage"]
# print("usage", usage)
# def send_slack_alert(
# kwargs,
# completion_response,
# start_time,
# end_time,
# ):
# print(
# "in custom slack callback func"
# )
# import requests
# import json
# # Define the Slack webhook URL
# slack_webhook_url = os.environ['SLACK_WEBHOOK_URL'] # "https://hooks.slack.com/services/<>/<>/<>"
# # Define the text payload, send data available in litellm custom_callbacks
# text_payload = f"""LiteLLM Logging: kwargs: {str(kwargs)}\n\n, response: {str(completion_response)}\n\n, start time{str(start_time)} end time: {str(end_time)}
# """
# payload = {
# "text": text_payload
# }
# # Set the headers
# headers = {
# "Content-type": "application/json"
# }
# # Make the POST request
# response = requests.post(slack_webhook_url, json=payload, headers=headers)
# # Check the response status
# if response.status_code == 200:
# print("Message sent successfully to Slack!")
# else:
# print(f"Failed to send message to Slack. Status code: {response.status_code}")
# print(response.json())
# def get_transformed_inputs(
# kwargs,
# ):
# params_to_model = kwargs["additional_args"]["complete_input_dict"]
# print("params to model", params_to_model)
# litellm.success_callback = [custom_callback, send_slack_alert]
# litellm.failure_callback = [send_slack_alert]
# litellm.set_verbose = False
# # litellm.input_callback = [get_transformed_inputs]