(docs) add 1k rps load test doc (#6059)

* docs 1k rps load test

* docs load testing

* docs load testing litellm

* docs load testing

* clean up load test doc

* docs prom metrics for load testing

* docs using prometheus on load testing

* doc load testing with prometheus
This commit is contained in:
Ishaan Jaff 2024-10-04 16:56:34 +05:30 committed by GitHub
parent 224460d4c9
commit 2449d258cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 657 additions and 512 deletions

View file

@ -1,8 +1,8 @@
import Image from '@theme/IdealImage';
# Load Test LiteLLM
# LiteLLM Proxy - Locust Load Test
## How to run a locust load test on LiteLLM Proxy
## Locust Load Test LiteLLM Proxy
1. Add `fake-openai-endpoint` to your proxy config.yaml and start your litellm proxy
litellm provides a free hosted `fake-openai-endpoint` you can load test against
@ -50,512 +50,3 @@ model_list:
<Image img={require('../img/litellm_load_test.png')} />
## Load Test LiteLLM Proxy - 1500+ req/s
## 1500+ concurrent requests/s
LiteLLM proxy has been load tested to handle 1500+ concurrent req/s
```python
import time, asyncio
from openai import AsyncOpenAI, AsyncAzureOpenAI
import uuid
import traceback
# base_url - litellm proxy endpoint
# api_key - litellm proxy api-key, is created proxy with auth
litellm_client = AsyncOpenAI(base_url="http://0.0.0.0:4000", api_key="sk-1234")
async def litellm_completion():
# Your existing code for litellm_completion goes here
try:
response = await litellm_client.chat.completions.create(
model="azure-gpt-3.5",
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
print(response)
return response
except Exception as e:
# If there's an exception, log the error message
with open("error_log.txt", "a") as error_log:
error_log.write(f"Error during completion: {str(e)}\n")
pass
async def main():
for i in range(1):
start = time.time()
n = 1500 # Number of concurrent tasks
tasks = [litellm_completion() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
# Write errors to error_log.txt
with open("error_log.txt", "a") as error_log:
for completion in chat_completions:
if isinstance(completion, str):
error_log.write(completion + "\n")
print(n, time.time() - start, len(successful_completions))
time.sleep(10)
if __name__ == "__main__":
# Blank out contents of error_log.txt
open("error_log.txt", "w").close()
asyncio.run(main())
```
### Throughput - 30% Increase
LiteLLM proxy + Load Balancer gives **30% increase** in throughput compared to Raw OpenAI API
<Image img={require('../img/throughput.png')} />
### Latency Added - 0.00325 seconds
LiteLLM proxy adds **0.00325 seconds** latency as compared to using the Raw OpenAI API
<Image img={require('../img/latency.png')} />
### Testing LiteLLM Proxy with Locust
- 1 LiteLLM container can handle ~140 requests/second with 0.4 failures
<Image img={require('../img/locust.png')} />
## Load Test LiteLLM SDK vs OpenAI
Here is a script to load test LiteLLM vs OpenAI
```python
from openai import AsyncOpenAI, AsyncAzureOpenAI
import random, uuid
import time, asyncio, litellm
# import logging
# logging.basicConfig(level=logging.DEBUG)
#### LITELLM PROXY ####
litellm_client = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4000"
)
#### AZURE OPENAI CLIENT ####
client = AsyncAzureOpenAI(
api_key="my-api-key", # [CHANGE THIS]
azure_endpoint="my-api-base", # [CHANGE THIS]
api_version="2023-07-01-preview"
)
#### LITELLM ROUTER ####
model_list = [
{
"model_name": "azure-canada",
"litellm_params": {
"model": "azure/my-azure-deployment-name", # [CHANGE THIS]
"api_key": "my-api-key", # [CHANGE THIS]
"api_base": "my-api-base", # [CHANGE THIS]
"api_version": "2023-07-01-preview"
}
}
]
router = litellm.Router(model_list=model_list)
async def openai_completion():
try:
response = await client.chat.completions.create(
model="gpt-35-turbo",
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
stream=True
)
return response
except Exception as e:
print(e)
return None
async def router_completion():
try:
response = await router.acompletion(
model="azure-canada", # [CHANGE THIS]
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
stream=True
)
return response
except Exception as e:
print(e)
return None
async def proxy_completion_non_streaming():
try:
response = await litellm_client.chat.completions.create(
model="sagemaker-models", # [CHANGE THIS] (if you call it something else on your proxy)
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
return response
except Exception as e:
print(e)
return None
async def loadtest_fn():
start = time.time()
n = 500 # Number of concurrent tasks
tasks = [proxy_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))
# Run the event loop to execute the async function
asyncio.run(loadtest_fn())
```
## Multi-Instance TPM/RPM Load Test (Router)
Test if your defined tpm/rpm limits are respected across multiple instances of the Router object.
In our test:
- Max RPM per deployment is = 100 requests per minute
- Max Throughput / min on router = 200 requests per minute (2 deployments)
- Load we'll send through router = 600 requests per minute
:::info
If you don't want to call a real LLM API endpoint, you can setup a fake openai server. [See code](#extra---setup-fake-openai-server)
:::
### Code
Let's hit the router with 600 requests per minute.
Copy this script 👇. Save it as `test_loadtest_router.py` AND run it with `python3 test_loadtest_router.py`
```python
from litellm import Router
import litellm
litellm.suppress_debug_info = True
litellm.set_verbose = False
import logging
logging.basicConfig(level=logging.CRITICAL)
import os, random, uuid, time, asyncio
# Model list for OpenAI and Anthropic models
model_list = [
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-fake-key",
"api_base": "http://0.0.0.0:8080",
"rpm": 100
},
},
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-fake-key",
"api_base": "http://0.0.0.0:8081",
"rpm": 100
},
},
]
router_1 = Router(model_list=model_list, num_retries=0, enable_pre_call_checks=True, routing_strategy="usage-based-routing-v2", redis_host=os.getenv("REDIS_HOST"), redis_port=os.getenv("REDIS_PORT"), redis_password=os.getenv("REDIS_PASSWORD"))
router_2 = Router(model_list=model_list, num_retries=0, routing_strategy="usage-based-routing-v2", enable_pre_call_checks=True, redis_host=os.getenv("REDIS_HOST"), redis_port=os.getenv("REDIS_PORT"), redis_password=os.getenv("REDIS_PASSWORD"))
async def router_completion_non_streaming():
try:
client: Router = random.sample([router_1, router_2], 1)[0] # randomly pick b/w clients
# print(f"client={client}")
response = await client.acompletion(
model="fake-openai-endpoint", # [CHANGE THIS] (if you call it something else on your proxy)
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
return response
except Exception as e:
# print(e)
return None
async def loadtest_fn():
start = time.time()
n = 600 # Number of concurrent tasks
tasks = [router_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))
def get_utc_datetime():
import datetime as dt
from datetime import datetime
if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore
# Run the event loop to execute the async function
async def parent_fn():
for _ in range(10):
dt = get_utc_datetime()
current_minute = dt.strftime("%H-%M")
print(f"triggered new batch - {current_minute}")
await loadtest_fn()
await asyncio.sleep(10)
asyncio.run(parent_fn())
```
## Multi-Instance TPM/RPM Load Test (Proxy)
Test if your defined tpm/rpm limits are respected across multiple instances.
The quickest way to do this is by testing the [proxy](./proxy/quick_start.md). The proxy uses the [router](./routing.md) under the hood, so if you're using either of them, this test should work for you.
In our test:
- Max RPM per deployment is = 100 requests per minute
- Max Throughput / min on proxy = 200 requests per minute (2 deployments)
- Load we'll send to proxy = 600 requests per minute
So we'll send 600 requests per minute, but expect only 200 requests per minute to succeed.
:::info
If you don't want to call a real LLM API endpoint, you can setup a fake openai server. [See code](#extra---setup-fake-openai-server)
:::
### 1. Setup config
```yaml
model_list:
- litellm_params:
api_base: http://0.0.0.0:8080
api_key: my-fake-key
model: openai/my-fake-model
rpm: 100
model_name: fake-openai-endpoint
- litellm_params:
api_base: http://0.0.0.0:8081
api_key: my-fake-key
model: openai/my-fake-model-2
rpm: 100
model_name: fake-openai-endpoint
router_settings:
num_retries: 0
enable_pre_call_checks: true
redis_host: os.environ/REDIS_HOST ## 👈 IMPORTANT! Setup the proxy w/ redis
redis_password: os.environ/REDIS_PASSWORD
redis_port: os.environ/REDIS_PORT
routing_strategy: usage-based-routing-v2
```
### 2. Start proxy 2 instances
**Instance 1**
```bash
litellm --config /path/to/config.yaml --port 4000
## RUNNING on http://0.0.0.0:4000
```
**Instance 2**
```bash
litellm --config /path/to/config.yaml --port 4001
## RUNNING on http://0.0.0.0:4001
```
### 3. Run Test
Let's hit the proxy with 600 requests per minute.
Copy this script 👇. Save it as `test_loadtest_proxy.py` AND run it with `python3 test_loadtest_proxy.py`
```python
from openai import AsyncOpenAI, AsyncAzureOpenAI
import random, uuid
import time, asyncio, litellm
# import logging
# logging.basicConfig(level=logging.DEBUG)
#### LITELLM PROXY ####
litellm_client = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4000"
)
litellm_client_2 = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4001"
)
async def proxy_completion_non_streaming():
try:
client = random.sample([litellm_client, litellm_client_2], 1)[0] # randomly pick b/w clients
# print(f"client={client}")
response = await client.chat.completions.create(
model="fake-openai-endpoint", # [CHANGE THIS] (if you call it something else on your proxy)
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
return response
except Exception as e:
# print(e)
return None
async def loadtest_fn():
start = time.time()
n = 600 # Number of concurrent tasks
tasks = [proxy_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))
def get_utc_datetime():
import datetime as dt
from datetime import datetime
if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore
# Run the event loop to execute the async function
async def parent_fn():
for _ in range(10):
dt = get_utc_datetime()
current_minute = dt.strftime("%H-%M")
print(f"triggered new batch - {current_minute}")
await loadtest_fn()
await asyncio.sleep(10)
asyncio.run(parent_fn())
```
### Extra - Setup Fake OpenAI Server
Let's setup a fake openai server with a RPM limit of 100.
Let's call our file `fake_openai_server.py`.
```
# import sys, os
# sys.path.insert(
# 0, os.path.abspath("../")
# ) # Adds the parent directory to the system path
from fastapi import FastAPI, Request, status, HTTPException, Depends
from fastapi.responses import StreamingResponse
from fastapi.security import OAuth2PasswordBearer
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi import FastAPI, Request, HTTPException, UploadFile, File
import httpx, os, json
from openai import AsyncOpenAI
from typing import Optional
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse
class ProxyException(Exception):
# NOTE: DO NOT MODIFY THIS
# This is used to map exactly to OPENAI Exceptions
def __init__(
self,
message: str,
type: str,
param: Optional[str],
code: Optional[int],
):
self.message = message
self.type = type
self.param = param
self.code = code
def to_dict(self) -> dict:
"""Converts the ProxyException instance to a dictionary."""
return {
"message": self.message,
"type": self.type,
"param": self.param,
"code": self.code,
}
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
@app.exception_handler(RateLimitExceeded)
async def _rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(status_code=429,
content={"detail": "Rate Limited!"})
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# for completion
@app.post("/chat/completions")
@app.post("/v1/chat/completions")
@limiter.limit("100/minute")
async def completion(request: Request):
# raise HTTPException(status_code=429, detail="Rate Limited!")
return {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": None,
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "\n\nHello there, how may I assist you today?",
},
"logprobs": None,
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 9,
"completion_tokens": 12,
"total_tokens": 21
}
}
if __name__ == "__main__":
import socket
import uvicorn
port = 8080
while True:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
print(f"Port {port} is available, starting server...")
break
else:
port += 1
uvicorn.run(app, host="0.0.0.0", port=port)
```
```bash
python3 fake_openai_server.py
```

View file

@ -0,0 +1,209 @@
import Image from '@theme/IdealImage';
# LiteLLM Proxy - 1K RPS Load test on locust
Tutorial on how to get to 1K+ RPS with LiteLLM Proxy on locust
## Pre-Testing Checklist
- [ ] Ensure you're using the **latest `-stable` version** of litellm
- [Github releases](https://github.com/BerriAI/litellm/releases)
- [litellm docker containers](https://github.com/BerriAI/litellm/pkgs/container/litellm)
- [litellm database docker container](https://github.com/BerriAI/litellm/pkgs/container/litellm-database)
- [ ] Ensure you're following **ALL** [best practices for production](./proxy/production_setup.md)
- [ ] Locust - Ensure you're Locust instance can create 1K+ requests per second
- 👉 You can use our **[maintained locust instance here](https://locust-load-tester-production.up.railway.app/)**
- If you're self hosting locust
- [here's the spec used for our locust machine](#machine-specifications-for-running-locust)
- [here is the locustfile.py used for our tests](#locust-file-used-for-testing)
- [ ] Use this [**machine specification for running litellm proxy**](#machine-specifications-for-running-litellm-proxy)
- [ ] **Enterprise LiteLLM** - Use `prometheus` as a callback in your `proxy_config.yaml` to get metrics on your load test
Set `litellm_settings.callbacks` to monitor success/failures/all types of errors
```yaml
litellm_settings:
callbacks: ["prometheus"] # Enterprise LiteLLM Only - use prometheus to get metrics on your load test
```
## Load Test - Fake OpenAI Endpoint
### Expected Performance
| Metric | Value |
|--------|-------|
| Requests per Second | 1174+ |
| Median Response Time | `96ms` |
| Average Response Time | `142.18ms` |
### Run Test
1. Add `fake-openai-endpoint` to your proxy config.yaml and start your litellm proxy
litellm provides a hosted `fake-openai-endpoint` you can load test against
```yaml
model_list:
- model_name: fake-openai-endpoint
litellm_params:
model: openai/fake
api_key: fake-key
api_base: https://exampleopenaiendpoint-production.up.railway.app/
litellm_settings:
callbacks: ["prometheus"] # Enterprise LiteLLM Only - use prometheus to get metrics on your load test
```
2. `pip install locust`
3. Create a file called `locustfile.py` on your local machine. Copy the contents from the litellm load test located [here](https://github.com/BerriAI/litellm/blob/main/.github/workflows/locustfile.py)
4. Start locust
Run `locust` in the same directory as your `locustfile.py` from step 2
```shell
locust -f locustfile.py --processes 4
```
5. Run Load test on locust
Head to the locust UI on http://0.0.0.0:8089
Set **Users=1000, Ramp Up Users=1000**, Host=Base URL of your LiteLLM Proxy
6. Expected results
<Image img={require('../img/locust_load_test1.png')} />
## Load test - Endpoints with Rate Limits
Run a load test on 2 LLM deployments each with 10K RPM Quota. Expect to see ~20K RPM (333 RPS)
### Expected Performance
- We expect to see 20,000+ successful responses in 1 minute
- The remaining requests **fail because the endpoint exceeds it's 10K RPM quota limit - from the LLM API provider**
| Metric | Value |
|--------|-------|
| Successful Responses in 1 minute | 20,000+ |
| Requests per Second | ~1170+ |
| Median Response Time | `70ms` |
| Average Response Time | `640.18ms` |
### Run Test
1. Add 2 `gemini-vision` deployments on your config.yaml. Each deployment can handle 10K RPM. (We setup a fake endpoint with a rate limit of 1000 RPM on the `/v1/projects/bad-adroit-crow` route below )
:::info
All requests with `model="gemini-vision"` will be load balanced equally across the 2 deployments.
:::
```yaml
model_list:
- model_name: gemini-vision
litellm_params:
model: vertex_ai/gemini-1.0-pro-vision-001
api_base: https://exampleopenaiendpoint-production.up.railway.app/v1/projects/bad-adroit-crow-413218/locations/us-central1/publishers/google/models/gemini-1.0-pro-vision-001
vertex_project: "adroit-crow-413218"
vertex_location: "us-central1"
vertex_credentials: /etc/secrets/adroit_crow.json
- model_name: gemini-vision
litellm_params:
model: vertex_ai/gemini-1.0-pro-vision-001
api_base: https://exampleopenaiendpoint-production-c715.up.railway.app/v1/projects/bad-adroit-crow-413218/locations/us-central1/publishers/google/models/gemini-1.0-pro-vision-001
vertex_project: "adroit-crow-413218"
vertex_location: "us-central1"
vertex_credentials: /etc/secrets/adroit_crow.json
litellm_settings:
callbacks: ["prometheus"] # Enterprise LiteLLM Only - use prometheus to get metrics on your load test
```
2. `pip install locust`
3. Create a file called `locustfile.py` on your local machine. Copy the contents from the litellm load test located [here](https://github.com/BerriAI/litellm/blob/main/.github/workflows/locustfile.py)
4. Start locust
Run `locust` in the same directory as your `locustfile.py` from step 2
```shell
locust -f locustfile.py --processes 4 -t 60
```
5. Run Load test on locust
Head to the locust UI on http://0.0.0.0:8089 and use the following settings
<Image img={require('../img/locust_load_test2_setup.png')} />
6. Expected results
- Successful responses in 1 minute = 19,800 = (69415 - 49615)
- Requests per second = 1170
- Median response time = 70ms
- Average response time = 640ms
<Image img={require('../img/locust_load_test2.png')} />
## Prometheus Metrics for debugging load tests
Use the following [prometheus metrics to debug your load tests / failures](./proxy/prometheus)
| Metric Name | Description |
|----------------------|--------------------------------------|
| `litellm_deployment_failure_responses` | Total number of failed LLM API calls for a specific LLM deployment. Labels: `"requested_model", "litellm_model_name", "model_id", "api_base", "api_provider", "hashed_api_key", "api_key_alias", "team", "team_alias", "exception_status", "exception_class"` |
| `litellm_deployment_cooled_down` | Number of times a deployment has been cooled down by LiteLLM load balancing logic. Labels: `"litellm_model_name", "model_id", "api_base", "api_provider", "exception_status"` |
## Machine Specifications for Running Locust
| Metric | Value |
|--------|-------|
| `locust --processes 4` | 4|
| `vCPUs` on Load Testing Machine | 2.0 vCPUs |
| `Memory` on Load Testing Machine | 450 MB |
| `Replicas` of Load Testing Machine | 1 |
## Machine Specifications for Running LiteLLM Proxy
👉 **Number of Replicas of LiteLLM Proxy=20** for getting 1K+ RPS
| Service | Spec | CPUs | Memory | Architecture | Version|
| --- | --- | --- | --- | --- | --- |
| Server | `t2.large`. | `2vCPUs` | `8GB` | `x86` |
## Locust file used for testing
```python
import os
import uuid
from locust import HttpUser, task, between
class MyUser(HttpUser):
wait_time = between(0.5, 1) # Random wait time between requests
@task(100)
def litellm_completion(self):
# no cache hits with this
payload = {
"model": "fake-openai-endpoint",
"messages": [{"role": "user", "content": f"{uuid.uuid4()} This is a test there will be no cache hits and we'll fill up the context" * 150 }],
"user": "my-new-end-user-1"
}
response = self.client.post("chat/completions", json=payload)
if response.status_code != 200:
# log the errors in error.txt
with open("error.txt", "a") as error_log:
error_log.write(response.text + "\n")
def on_start(self):
self.api_key = os.getenv('API_KEY', 'sk-1234')
self.client.headers.update({'Authorization': f'Bearer {self.api_key}'})
```

View file

@ -0,0 +1,348 @@
# Multi-Instance TPM/RPM (litellm.Router)
Test if your defined tpm/rpm limits are respected across multiple instances of the Router object.
In our test:
- Max RPM per deployment is = 100 requests per minute
- Max Throughput / min on router = 200 requests per minute (2 deployments)
- Load we'll send through router = 600 requests per minute
:::info
If you don't want to call a real LLM API endpoint, you can setup a fake openai server. [See code](#extra---setup-fake-openai-server)
:::
### Code
Let's hit the router with 600 requests per minute.
Copy this script 👇. Save it as `test_loadtest_router.py` AND run it with `python3 test_loadtest_router.py`
```python
from litellm import Router
import litellm
litellm.suppress_debug_info = True
litellm.set_verbose = False
import logging
logging.basicConfig(level=logging.CRITICAL)
import os, random, uuid, time, asyncio
# Model list for OpenAI and Anthropic models
model_list = [
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-fake-key",
"api_base": "http://0.0.0.0:8080",
"rpm": 100
},
},
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-fake-key",
"api_base": "http://0.0.0.0:8081",
"rpm": 100
},
},
]
router_1 = Router(model_list=model_list, num_retries=0, enable_pre_call_checks=True, routing_strategy="usage-based-routing-v2", redis_host=os.getenv("REDIS_HOST"), redis_port=os.getenv("REDIS_PORT"), redis_password=os.getenv("REDIS_PASSWORD"))
router_2 = Router(model_list=model_list, num_retries=0, routing_strategy="usage-based-routing-v2", enable_pre_call_checks=True, redis_host=os.getenv("REDIS_HOST"), redis_port=os.getenv("REDIS_PORT"), redis_password=os.getenv("REDIS_PASSWORD"))
async def router_completion_non_streaming():
try:
client: Router = random.sample([router_1, router_2], 1)[0] # randomly pick b/w clients
# print(f"client={client}")
response = await client.acompletion(
model="fake-openai-endpoint", # [CHANGE THIS] (if you call it something else on your proxy)
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
return response
except Exception as e:
# print(e)
return None
async def loadtest_fn():
start = time.time()
n = 600 # Number of concurrent tasks
tasks = [router_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))
def get_utc_datetime():
import datetime as dt
from datetime import datetime
if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore
# Run the event loop to execute the async function
async def parent_fn():
for _ in range(10):
dt = get_utc_datetime()
current_minute = dt.strftime("%H-%M")
print(f"triggered new batch - {current_minute}")
await loadtest_fn()
await asyncio.sleep(10)
asyncio.run(parent_fn())
```
## Multi-Instance TPM/RPM Load Test (Proxy)
Test if your defined tpm/rpm limits are respected across multiple instances.
The quickest way to do this is by testing the [proxy](./proxy/quick_start.md). The proxy uses the [router](./routing.md) under the hood, so if you're using either of them, this test should work for you.
In our test:
- Max RPM per deployment is = 100 requests per minute
- Max Throughput / min on proxy = 200 requests per minute (2 deployments)
- Load we'll send to proxy = 600 requests per minute
So we'll send 600 requests per minute, but expect only 200 requests per minute to succeed.
:::info
If you don't want to call a real LLM API endpoint, you can setup a fake openai server. [See code](#extra---setup-fake-openai-server)
:::
### 1. Setup config
```yaml
model_list:
- litellm_params:
api_base: http://0.0.0.0:8080
api_key: my-fake-key
model: openai/my-fake-model
rpm: 100
model_name: fake-openai-endpoint
- litellm_params:
api_base: http://0.0.0.0:8081
api_key: my-fake-key
model: openai/my-fake-model-2
rpm: 100
model_name: fake-openai-endpoint
router_settings:
num_retries: 0
enable_pre_call_checks: true
redis_host: os.environ/REDIS_HOST ## 👈 IMPORTANT! Setup the proxy w/ redis
redis_password: os.environ/REDIS_PASSWORD
redis_port: os.environ/REDIS_PORT
routing_strategy: usage-based-routing-v2
```
### 2. Start proxy 2 instances
**Instance 1**
```bash
litellm --config /path/to/config.yaml --port 4000
## RUNNING on http://0.0.0.0:4000
```
**Instance 2**
```bash
litellm --config /path/to/config.yaml --port 4001
## RUNNING on http://0.0.0.0:4001
```
### 3. Run Test
Let's hit the proxy with 600 requests per minute.
Copy this script 👇. Save it as `test_loadtest_proxy.py` AND run it with `python3 test_loadtest_proxy.py`
```python
from openai import AsyncOpenAI, AsyncAzureOpenAI
import random, uuid
import time, asyncio, litellm
# import logging
# logging.basicConfig(level=logging.DEBUG)
#### LITELLM PROXY ####
litellm_client = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4000"
)
litellm_client_2 = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4001"
)
async def proxy_completion_non_streaming():
try:
client = random.sample([litellm_client, litellm_client_2], 1)[0] # randomly pick b/w clients
# print(f"client={client}")
response = await client.chat.completions.create(
model="fake-openai-endpoint", # [CHANGE THIS] (if you call it something else on your proxy)
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
return response
except Exception as e:
# print(e)
return None
async def loadtest_fn():
start = time.time()
n = 600 # Number of concurrent tasks
tasks = [proxy_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))
def get_utc_datetime():
import datetime as dt
from datetime import datetime
if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore
# Run the event loop to execute the async function
async def parent_fn():
for _ in range(10):
dt = get_utc_datetime()
current_minute = dt.strftime("%H-%M")
print(f"triggered new batch - {current_minute}")
await loadtest_fn()
await asyncio.sleep(10)
asyncio.run(parent_fn())
```
### Extra - Setup Fake OpenAI Server
Let's setup a fake openai server with a RPM limit of 100.
Let's call our file `fake_openai_server.py`.
```
# import sys, os
# sys.path.insert(
# 0, os.path.abspath("../")
# ) # Adds the parent directory to the system path
from fastapi import FastAPI, Request, status, HTTPException, Depends
from fastapi.responses import StreamingResponse
from fastapi.security import OAuth2PasswordBearer
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi import FastAPI, Request, HTTPException, UploadFile, File
import httpx, os, json
from openai import AsyncOpenAI
from typing import Optional
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse
class ProxyException(Exception):
# NOTE: DO NOT MODIFY THIS
# This is used to map exactly to OPENAI Exceptions
def __init__(
self,
message: str,
type: str,
param: Optional[str],
code: Optional[int],
):
self.message = message
self.type = type
self.param = param
self.code = code
def to_dict(self) -> dict:
"""Converts the ProxyException instance to a dictionary."""
return {
"message": self.message,
"type": self.type,
"param": self.param,
"code": self.code,
}
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
@app.exception_handler(RateLimitExceeded)
async def _rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(status_code=429,
content={"detail": "Rate Limited!"})
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# for completion
@app.post("/chat/completions")
@app.post("/v1/chat/completions")
@limiter.limit("100/minute")
async def completion(request: Request):
# raise HTTPException(status_code=429, detail="Rate Limited!")
return {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": None,
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "\n\nHello there, how may I assist you today?",
},
"logprobs": None,
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 9,
"completion_tokens": 12,
"total_tokens": 21
}
}
if __name__ == "__main__":
import socket
import uvicorn
port = 8080
while True:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
print(f"Port {port} is available, starting server...")
break
else:
port += 1
uvicorn.run(app, host="0.0.0.0", port=port)
```
```bash
python3 fake_openai_server.py
```

View file

@ -0,0 +1,87 @@
# LiteLLM SDK vs OpenAI
Here is a script to load test LiteLLM vs OpenAI
```python
from openai import AsyncOpenAI, AsyncAzureOpenAI
import random, uuid
import time, asyncio, litellm
# import logging
# logging.basicConfig(level=logging.DEBUG)
#### LITELLM PROXY ####
litellm_client = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4000"
)
#### AZURE OPENAI CLIENT ####
client = AsyncAzureOpenAI(
api_key="my-api-key", # [CHANGE THIS]
azure_endpoint="my-api-base", # [CHANGE THIS]
api_version="2023-07-01-preview"
)
#### LITELLM ROUTER ####
model_list = [
{
"model_name": "azure-canada",
"litellm_params": {
"model": "azure/my-azure-deployment-name", # [CHANGE THIS]
"api_key": "my-api-key", # [CHANGE THIS]
"api_base": "my-api-base", # [CHANGE THIS]
"api_version": "2023-07-01-preview"
}
}
]
router = litellm.Router(model_list=model_list)
async def openai_completion():
try:
response = await client.chat.completions.create(
model="gpt-35-turbo",
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
stream=True
)
return response
except Exception as e:
print(e)
return None
async def router_completion():
try:
response = await router.acompletion(
model="azure-canada", # [CHANGE THIS]
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
stream=True
)
return response
except Exception as e:
print(e)
return None
async def proxy_completion_non_streaming():
try:
response = await litellm_client.chat.completions.create(
model="sagemaker-models", # [CHANGE THIS] (if you call it something else on your proxy)
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
return response
except Exception as e:
print(e)
return None
async def loadtest_fn():
start = time.time()
n = 500 # Number of concurrent tasks
tasks = [proxy_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))
# Run the event loop to execute the async function
asyncio.run(loadtest_fn())
```

View file

@ -18,6 +18,7 @@ general_settings:
master_key: sk-1234 # enter your own master key, ensure it starts with 'sk-'
alerting: ["slack"] # Setup slack alerting - get alerts on LLM exceptions, Budget Alerts, Slow LLM Responses
proxy_batch_write_at: 60 # Batch write spend updates every 60s
database_connection_pool_limit: 10 # limit the number of database connections to = MAX Number of DB Connections/Number of instances of litellm proxy (Around 10-20 is good number)
litellm_settings:
set_verbose: False # Switch off Debug Logging, ensure your logs do not have any debugging on

Binary file not shown.

After

Width:  |  Height:  |  Size: 211 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 212 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 269 KiB

View file

@ -251,7 +251,16 @@ const sidebars = {
},
],
},
"load_test",
{
type: "category",
label: "Load Testing",
items: [
"load_test",
"load_test_advanced",
"load_test_sdk",
"load_test_rpm",
]
},
{
type: "category",
label: "Logging & Observability",