forked from phoenix/litellm-mirror
(load testing) add vertex_ai embeddings load test (#6004)
* use vertex llm as base class for embeddings * use correct vertex class in main.py * set_headers in vertex llm base * add types for vertex embedding requests * add embedding handler for vertex * use async mode for vertex embedding tests * use vertexAI textEmbeddingConfig * fix linting * add sync and async mode testing for vertex ai embeddings * add basic load test * add vertex ai load test on ci cd
This commit is contained in:
parent
f8d9be1301
commit
835db6ae98
1 changed files with 121 additions and 0 deletions
121
tests/load_tests/test_vertex_embeddings_load_test.py
Normal file
121
tests/load_tests/test_vertex_embeddings_load_test.py
Normal file
|
@ -0,0 +1,121 @@
|
|||
"""
|
||||
Load test on vertex AI embeddings to ensure vertex median response time is less than 300ms
|
||||
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.abspath("../.."))
|
||||
|
||||
import asyncio
|
||||
import litellm
|
||||
import pytest
|
||||
import time
|
||||
from statistics import mean, median
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
|
||||
def load_vertex_ai_credentials():
|
||||
# Define the path to the vertex_key.json file
|
||||
print("loading vertex ai credentials")
|
||||
filepath = os.path.dirname(os.path.abspath(__file__))
|
||||
vertex_key_path = filepath + "/vertex_key.json"
|
||||
|
||||
# Read the existing content of the file or create an empty dictionary
|
||||
try:
|
||||
with open(vertex_key_path, "r") as file:
|
||||
# Read the file content
|
||||
print("Read vertexai file path")
|
||||
content = file.read()
|
||||
|
||||
# If the file is empty or not valid JSON, create an empty dictionary
|
||||
if not content or not content.strip():
|
||||
service_account_key_data = {}
|
||||
else:
|
||||
# Attempt to load the existing JSON content
|
||||
file.seek(0)
|
||||
service_account_key_data = json.load(file)
|
||||
except FileNotFoundError:
|
||||
# If the file doesn't exist, create an empty dictionary
|
||||
service_account_key_data = {}
|
||||
|
||||
# Update the service_account_key_data with environment variables
|
||||
private_key_id = os.environ.get("VERTEX_AI_PRIVATE_KEY_ID", "")
|
||||
private_key = os.environ.get("VERTEX_AI_PRIVATE_KEY", "")
|
||||
private_key = private_key.replace("\\n", "\n")
|
||||
service_account_key_data["private_key_id"] = private_key_id
|
||||
service_account_key_data["private_key"] = private_key
|
||||
|
||||
# Create a temporary file
|
||||
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
|
||||
# Write the updated content to the temporary files
|
||||
json.dump(service_account_key_data, temp_file, indent=2)
|
||||
|
||||
# Export the temporary file as GOOGLE_APPLICATION_CREDENTIALS
|
||||
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.abspath(temp_file.name)
|
||||
|
||||
|
||||
async def create_async_vertex_embedding_task():
|
||||
load_vertex_ai_credentials()
|
||||
base_url = "https://exampleopenaiendpoint-production.up.railway.app/v1/projects/adroit-crow-413218/locations/us-central1/publishers/google/models/embedding-gecko-001:predict"
|
||||
embedding_args = {
|
||||
"model": "vertex_ai/textembedding-gecko",
|
||||
"input": "This is a test sentence for embedding.",
|
||||
"timeout": 10,
|
||||
"api_base": base_url,
|
||||
}
|
||||
start_time = time.time()
|
||||
response = await litellm.aembedding(**embedding_args)
|
||||
end_time = time.time()
|
||||
print(f"Vertex AI embedding time: {end_time - start_time:.2f} seconds")
|
||||
return response, end_time - start_time
|
||||
|
||||
|
||||
async def run_load_test(duration_seconds, requests_per_second):
|
||||
end_time = time.time() + duration_seconds
|
||||
vertex_times = []
|
||||
|
||||
print(
|
||||
f"Running Load Test for {duration_seconds} seconds at {requests_per_second} RPS..."
|
||||
)
|
||||
while time.time() < end_time:
|
||||
vertex_tasks = [
|
||||
create_async_vertex_embedding_task() for _ in range(requests_per_second)
|
||||
]
|
||||
|
||||
vertex_results = await asyncio.gather(*vertex_tasks)
|
||||
|
||||
vertex_times.extend([duration for _, duration in vertex_results])
|
||||
|
||||
# Sleep for 1 second to maintain the desired RPS
|
||||
await asyncio.sleep(1)
|
||||
|
||||
return vertex_times
|
||||
|
||||
|
||||
def analyze_results(vertex_times):
|
||||
median_vertex = median(vertex_times)
|
||||
print(f"Vertex AI median response time: {median_vertex:.4f} seconds")
|
||||
|
||||
if median_vertex > 0.3:
|
||||
pytest.fail(
|
||||
f"Vertex AI median response time is greater than 300ms: {median_vertex:.4f} seconds"
|
||||
)
|
||||
else:
|
||||
print("Performance is good")
|
||||
return True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedding_performance():
|
||||
"""
|
||||
Run load test on vertex AI embeddings to ensure vertex median response time is less than 300ms
|
||||
|
||||
20 RPS for 20 seconds
|
||||
"""
|
||||
duration_seconds = 20
|
||||
requests_per_second = 20
|
||||
vertex_times = await run_load_test(duration_seconds, requests_per_second)
|
||||
result = analyze_results(vertex_times)
|
Loading…
Add table
Add a link
Reference in a new issue