(test) add testing for litellm callback for supabase

This commit is contained in:
ishaan-jaff 2023-10-11 12:34:09 -07:00
parent cc55bc886a
commit ae096bb18e

View file

@ -1,54 +1,67 @@
# #### What this tests #### #### What this tests ####
# # This tests if logging to the supabase integration actually works # This tests if logging to the supabase integration actually works
# # pytest mistakes intentional bad calls as failed tests -> [TODO] fix this import sys, os
# import sys, os import traceback
# import traceback import pytest
# import pytest
# sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path
# import litellm import litellm
# from litellm import embedding, completion from litellm import embedding, completion
# litellm.input_callback = ["supabase"] litellm.input_callback = ["supabase"]
# litellm.success_callback = ["supabase"] litellm.success_callback = ["supabase"]
# litellm.failure_callback = ["supabase"] litellm.failure_callback = ["supabase"]
# litellm.set_verbose = False litellm.set_verbose = False
# user_message = "Hello, how are you?" def test_supabase_logging():
# messages = [{ "content": user_message,"role": "user"}] try:
response = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello tell me hi"}],
user="ishaanRegular",
max_tokens=10,
)
print(response)
except Exception as e:
print(e)
# test_supabase_logging()
# #openai call def test_acompletion_sync():
# response = completion( import asyncio
# model="gpt-3.5-turbo", import time
# messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}], async def completion_call():
# user="ishaan22" try:
# ) response = await litellm.acompletion(
model="gpt-3.5-turbo",
# import asyncio messages=[{"role": "user", "content": "write a poem"}],
# import time max_tokens=10,
# async def completion_call(): stream=True,
# try: user="ishaanStreamingUser"
# response = await litellm.acompletion( )
# model="gpt-3.5-turbo", messages=messages, stream=True complete_response = ""
# ) start_time = time.time()
# complete_response = "" async for chunk in response:
# start_time = time.time() chunk_time = time.time()
# async for chunk in response:
# chunk_time = time.time()
#print(chunk) #print(chunk)
# complete_response += chunk["choices"][0]["delta"].get("content", "") complete_response += chunk["choices"][0]["delta"].get("content", "")
#print(complete_response) #print(complete_response)
#print(f"time since initial request: {chunk_time - start_time:.5f}") #print(f"time since initial request: {chunk_time - start_time:.5f}")
# if chunk["choices"][0].get("finish_reason", None) != None: if chunk["choices"][0].get("finish_reason", None) != None:
# print("🤗🤗🤗 DONE") print("🤗🤗🤗 DONE")
# except: return
# print(f"error occurred: {traceback.format_exc()}")
# pass except:
print(f"error occurred: {traceback.format_exc()}")
pass
asyncio.run(completion_call())
# test_acompletion_sync()
# asyncio.run(completion_call())