test(test_custom_callback_input.py): assert async success called only once during vertex ai streaming

This commit is contained in:
Krrish Dholakia 2024-02-26 09:18:46 -08:00
parent 0209588681
commit e48fff47dd
2 changed files with 78 additions and 0 deletions

View file

@ -600,6 +600,81 @@ async def test_async_chat_sagemaker_stream():
pytest.fail(f"An exception occurred: {str(e)}")
## Test Vertex AI + Async
import json
import tempfile
def load_vertex_ai_credentials():
# Define the path to the vertex_key.json file
print("loading vertex ai credentials")
filepath = os.path.dirname(os.path.abspath(__file__))
vertex_key_path = filepath + "/vertex_key.json"
# Read the existing content of the file or create an empty dictionary
try:
with open(vertex_key_path, "r") as file:
# Read the file content
print("Read vertexai file path")
content = file.read()
# If the file is empty or not valid JSON, create an empty dictionary
if not content or not content.strip():
service_account_key_data = {}
else:
# Attempt to load the existing JSON content
file.seek(0)
service_account_key_data = json.load(file)
except FileNotFoundError:
# If the file doesn't exist, create an empty dictionary
service_account_key_data = {}
# Update the service_account_key_data with environment variables
private_key_id = os.environ.get("VERTEX_AI_PRIVATE_KEY_ID", "")
private_key = os.environ.get("VERTEX_AI_PRIVATE_KEY", "")
private_key = private_key.replace("\\n", "\n")
service_account_key_data["private_key_id"] = private_key_id
service_account_key_data["private_key"] = private_key
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
# Write the updated content to the temporary file
json.dump(service_account_key_data, temp_file, indent=2)
# Export the temporary file as GOOGLE_APPLICATION_CREDENTIALS
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.abspath(temp_file.name)
@pytest.mark.asyncio
async def test_async_chat_vertex_ai_stream():
try:
load_vertex_ai_credentials()
customHandler = CompletionCustomHandler()
litellm.callbacks = [customHandler]
# test streaming
response = await litellm.acompletion(
model="gemini-pro",
messages=[
{
"role": "user",
"content": f"Hi 👋 - i'm async vertex_ai {uuid.uuid4()}",
}
],
stream=True,
)
print(f"response: {response}")
async for chunk in response:
print(f"chunk: {chunk}")
continue
print(f"customHandler.states: {customHandler.states}")
assert (
customHandler.states.count("async_success") == 1
) # pre, post, success, pre, post, failure
assert len(customHandler.states) >= 3 # pre, post, success
except Exception as e:
pytest.fail(f"An exception occurred: {str(e)}")
# Text Completion