litellm/tests/old_proxy_tests/tests/load_test_q.py
Krish Dholakia d57be47b0f
Litellm ruff linting enforcement (#5992)
* ci(config.yml): add a 'check_code_quality' step

Addresses https://github.com/BerriAI/litellm/issues/5991

* ci(config.yml): check why circle ci doesn't pick up this test

* ci(config.yml): fix to run 'check_code_quality' tests

* fix(__init__.py): fix unprotected import

* fix(__init__.py): don't remove unused imports

* build(ruff.toml): update ruff.toml to ignore unused imports

* fix: fix: ruff + pyright - fix linting + type-checking errors

* fix: fix linting errors

* fix(lago.py): fix module init error

* fix: fix linting errors

* ci(config.yml): cd into correct dir for checks

* fix(proxy_server.py): fix linting error

* fix(utils.py): fix bare except

causes ruff linting errors

* fix: ruff - fix remaining linting errors

* fix(clickhouse.py): use standard logging object

* fix(__init__.py): fix unprotected import

* fix: ruff - fix linting errors

* fix: fix linting errors

* ci(config.yml): cleanup code qa step (formatting handled in local_testing)

* fix(_health_endpoints.py): fix ruff linting errors

* ci(config.yml): just use ruff in check_code_quality pipeline for now

* build(custom_guardrail.py): include missing file

* style(embedding_handler.py): fix ruff check
2024-10-01 19:44:20 -04:00

121 lines
3.8 KiB
Python

import os
import time
import requests
from dotenv import load_dotenv
load_dotenv()
# Set the base URL as needed
base_url = "https://api.litellm.ai"
# # Uncomment the line below if you want to switch to the local server
# base_url = "http://0.0.0.0:8000"
# Step 1 Add a config to the proxy, generate a temp key
config = {
"model_list": [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.environ["OPENAI_API_KEY"],
},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.environ["AZURE_API_KEY"],
"api_base": "https://openai-gpt-4-test-v-1.openai.azure.com/",
"api_version": "2023-07-01-preview",
},
},
]
}
print("STARTING LOAD TEST Q")
print(os.environ["AZURE_API_KEY"])
response = requests.post(
url=f"{base_url}/key/generate",
json={
"config": config,
"duration": "30d", # default to 30d, set it to 30m if you want a temp key
},
headers={"Authorization": "Bearer sk-hosted-litellm"},
)
print("\nresponse from generating key", response.text)
print("\n json response from gen key", response.json())
generated_key = response.json()["key"]
print("\ngenerated key for proxy", generated_key)
# Step 2: Queue 50 requests to the proxy, using your generated_key
import concurrent.futures
def create_job_and_poll(request_num):
print(f"Creating a job on the proxy for request {request_num}")
job_response = requests.post(
url=f"{base_url}/queue/request",
json={
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": "write a short poem"},
],
},
headers={"Authorization": f"Bearer {generated_key}"},
)
print(job_response.status_code)
print(job_response.text)
print("\nResponse from creating job", job_response.text)
job_response = job_response.json()
job_response["id"]
polling_url = job_response["url"]
polling_url = f"{base_url}{polling_url}"
print(f"\nCreated Job {request_num}, Polling Url {polling_url}")
# Poll each request
while True:
try:
print(f"\nPolling URL for request {request_num}", polling_url)
polling_response = requests.get(
url=polling_url, headers={"Authorization": f"Bearer {generated_key}"}
)
print(
f"\nResponse from polling url for request {request_num}",
polling_response.text,
)
polling_response = polling_response.json()
status = polling_response.get("status", None)
if status == "finished":
llm_response = polling_response["result"]
print(f"LLM Response for request {request_num}")
print(llm_response)
# Write the llm_response to load_test_log.txt
try:
with open("load_test_log.txt", "a") as response_file:
response_file.write(
f"Response for request: {request_num}\n{llm_response}\n\n"
)
except Exception as e:
print("GOT EXCEPTION", e)
break
time.sleep(0.5)
except Exception as e:
print("got exception when polling", e)
# Number of requests
num_requests = 100
# Use ThreadPoolExecutor for parallel execution
with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as executor:
# Create and poll each request in parallel
futures = [executor.submit(create_job_and_poll, i) for i in range(num_requests)]
# Wait for all futures to complete
concurrent.futures.wait(futures)