docs(routing.md): add queueing to docs

This commit is contained in:
Krrish Dholakia 2023-11-21 18:00:14 -08:00
parent a2681e353f
commit 9d97082eed
9 changed files with 244 additions and 261 deletions

View file

@ -13,7 +13,7 @@ import pytest
from litellm import Router
import litellm
litellm.set_verbose=False
os.environ.pop("AZURE_AD_TOKEN")
# os.environ.pop("AZURE_AD_TOKEN")
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
@ -142,6 +142,7 @@ successful_calls = 0
failed_calls = 0
for future in futures:
if future.done():
if future.result() is not None:
successful_calls += 1
else:

View file

@ -0,0 +1,48 @@
Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope
Response ID: 71a47cd4-92d9-4091-9429-8d22af6b56bf Url: /queue/response/71a47cd4-92d9-4091-9429-8d22af6b56bf
Time: 0.77 seconds
Question: Given this context, what is litellm? LiteLLM about: About
Call all LLM APIs using the OpenAI format.
Response ID: a0855c20-59ba-4eed-85c1-e0719eebdeab Url: /queue/response/a0855c20-59ba-4eed-85c1-e0719eebdeab
Time: 1.46 seconds
Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope
Response ID: b131cdcd-0693-495b-ad41-b0cf2afc4833 Url: /queue/response/b131cdcd-0693-495b-ad41-b0cf2afc4833
Time: 2.13 seconds
Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope
Response ID: a58e5185-90e7-4832-9f28-e5a5ac167a40 Url: /queue/response/a58e5185-90e7-4832-9f28-e5a5ac167a40
Time: 2.83 seconds
Question: Given this context, what is litellm? LiteLLM about: About
Call all LLM APIs using the OpenAI format.
Response ID: 52dbbd49-eedb-4c11-8382-3ca7deb1af35 Url: /queue/response/52dbbd49-eedb-4c11-8382-3ca7deb1af35
Time: 3.50 seconds
Question: What endpoints does the litellm proxy have 💥 OpenAI Proxy Server
LiteLLM Server manages:
Calling 10
Response ID: eedda05f-61e1-4081-b49d-27f9449bcf69 Url: /queue/response/eedda05f-61e1-4081-b49d-27f9449bcf69
Time: 4.20 seconds
Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope
Response ID: 8a484722-66ec-4193-b19b-2dfc4265cfd2 Url: /queue/response/8a484722-66ec-4193-b19b-2dfc4265cfd2
Time: 4.89 seconds
Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope
Response ID: ae1e2b71-d711-456d-8df0-13ce0709eb04 Url: /queue/response/ae1e2b71-d711-456d-8df0-13ce0709eb04
Time: 5.60 seconds
Question: What endpoints does the litellm proxy have 💥 OpenAI Proxy Server
LiteLLM Server manages:
Calling 10
Response ID: cfabd174-838e-4252-b82b-648923573db8 Url: /queue/response/cfabd174-838e-4252-b82b-648923573db8
Time: 6.29 seconds
Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope
Response ID: 02d5b7d6-5443-41e9-94e4-90d8b00d49fb Url: /queue/response/02d5b7d6-5443-41e9-94e4-90d8b00d49fb
Time: 7.01 seconds

View file

@ -3,11 +3,13 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# Router - Load Balancing, Queing
# Router - Load Balancing, Queueing
- Load-balance across multiple deployments (e.g. Azure/OpenAI): Pick the deployment which is below rate-limit and has the least amount of tokens used.
- Queuing Requests to ensure requests don't fail
LiteLLM manages:
- Load-balance across multiple deployments (e.g. Azure/OpenAI)
- Prioritizing important requests to ensure they don't fail (i.e. Queueing)
## Load Balancing
(s/o [@paulpierre](https://www.linkedin.com/in/paulpierre/) for his contribution to this implementation)
[**See Code**](https://github.com/BerriAI/litellm/blob/main/litellm/router.py)
@ -56,8 +58,8 @@ print(response)
- `router.aembeddings()` - async embeddings endpoint
- `router.text_completion()` - completion calls in the old OpenAI `/v1/completions` endpoint format
## Advanced
### Routing Strategies - Shuffle, Rate Limit Aware
### Advanced
#### Routing Strategies - Shuffle, Rate Limit Aware
Router provides 2 strategies for routing your calls across multiple deployments:
@ -164,7 +166,7 @@ print(response)
</TabItem>
</Tabs>
### Caching + Request Timeouts
#### Caching + Request Timeouts
In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.
@ -190,7 +192,7 @@ router = Router(model_list=model_list,
print(response)
```
### Retry failed requests
#### Retry failed requests
For both async + sync functions, we support retrying failed requests.
@ -219,7 +221,7 @@ response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
```
### Default litellm.completion/embedding params
#### Default litellm.completion/embedding params
You can also set default params for litellm completion/embedding calls. Here's how to do that:
@ -241,14 +243,51 @@ print(f"response: {response}")
```
### Deploy Router
#### Deploy Router
If you want a server to just route requests to different LLM APIs, use our [OpenAI Proxy Server](./simple_proxy.md#multiple-instances-of-1-model)
## Queuing
### Quick Start
This requires a [Redis DB](https://redis.com/) to work.
Our implementation uses LiteLLM's proxy server + Celery workers to process up to 100 req./s
[**See Code**](https://github.com/BerriAI/litellm/blob/fbf9cab5b9e35df524e2c9953180c58d92e4cd97/litellm/proxy/proxy_server.py#L589)
1. Add Redis credentials in a .env file
```python
REDIS_HOST="my-redis-endpoint"
REDIS_PORT="my-redis-port"
REDIS_PASSWORD="my-redis-password" # [OPTIONAL] if self-hosted
REDIS_USERNAME="default" # [OPTIONAL] if self-hosted
```
2. Start litellm server with your model config
```bash
$ litellm --config /path/to/config.yaml --use_queue
```
3. Test (in another window) → sends 100 simultaneous requests to the queue
```bash
$ litellm --test_async --num_requests 100
```
### Available Endpoints
- `/queue/request` - Queues a /chat/completions request. Returns a job id.
- `/queue/response/{id}` - Returns the status of a job. If completed, returns the response as well. Potential status's are: `queued` and `finished`.
## Hosted Router + Request Queing api.litellm.ai
Queue your LLM API requests to ensure you're under your rate limits
- Step 1: Create a `/queue/reques` request
- Step 2: Poll your request, to check if it's completed
- Step 1: Make a POST request `/queue/request` (this follows the same input format as an openai `/chat/completions` call, and returns a job id).
- Step 2: Make a GET request, `queue/response` to check if it's completed
## Step 1 Add a config to the proxy, generate a temp key
@ -340,204 +379,3 @@ while True:
break
```
<!-- ## litellm.completion()
If you're calling litellm.completion(), here's the different reliability options you can enable.
## Retry failed requests
Call it in completion like this `completion(..num_retries=2)`.
Here's a quick look at how you can use it:
```python
from litellm import completion
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = completion(
model="gpt-3.5-turbo",
messages=messages,
num_retries=2
)
```
## Fallbacks
## Helper utils
LiteLLM supports the following functions for reliability:
* `litellm.longer_context_model_fallback_dict`: Dictionary which has a mapping for those models which have larger equivalents
* `num_retries`: use tenacity retries
* `completion()` with fallbacks: switch between models/keys/api bases in case of errors.
### Context Window Fallbacks
```python
from litellm import completion
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
messages = [{"content": "how does a court case get to the Supreme Court?" * 500, "role": "user"}]
completion(model="gpt-3.5-turbo", messages=messages, context_window_fallback_dict=fallback_dict)
```
### Fallbacks - Switch Models/API Keys/API Bases
LLM APIs can be unstable, completion() with fallbacks ensures you'll always get a response from your calls
#### Usage
To use fallback models with `completion()`, specify a list of models in the `fallbacks` parameter.
The `fallbacks` list should include the primary model you want to use, followed by additional models that can be used as backups in case the primary model fails to provide a response.
#### switch models
```python
response = completion(model="bad-model", messages=messages,
fallbacks=["gpt-3.5-turbo" "command-nightly"])
```
#### switch api keys/bases (E.g. azure deployment)
Switch between different keys for the same azure deployment, or use another deployment as well.
```python
api_key="bad-key"
response = completion(model="azure/gpt-4", messages=messages, api_key=api_key,
fallbacks=[{"api_key": "good-key-1"}, {"api_key": "good-key-2", "api_base": "good-api-base-2"}])
```
[Check out this section for implementation details](#fallbacks-1)
## Implementation Details
### Fallbacks
#### Output from calls
```
Completion with 'bad-model': got exception Unable to map your input to a model. Check your input - {'model': 'bad-model'
completion call gpt-3.5-turbo
{
"id": "chatcmpl-7qTmVRuO3m3gIBg4aTmAumV1TmQhB",
"object": "chat.completion",
"created": 1692741891,
"model": "gpt-3.5-turbo-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "I apologize, but as an AI, I do not have the capability to provide real-time weather updates. However, you can easily check the current weather in San Francisco by using a search engine or checking a weather website or app."
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 16,
"completion_tokens": 46,
"total_tokens": 62
}
}
```
#### How does fallbacks work
When you pass `fallbacks` to `completion`, it makes the first `completion` call using the primary model specified as `model` in `completion(model=model)`. If the primary model fails or encounters an error, it automatically tries the `fallbacks` models in the specified order. This ensures a response even if the primary model is unavailable.
#### Key components of Model Fallbacks implementation:
* Looping through `fallbacks`
* Cool-Downs for rate-limited models
#### Looping through `fallbacks`
Allow `45seconds` for each request. In the 45s this function tries calling the primary model set as `model`. If model fails it loops through the backup `fallbacks` models and attempts to get a response in the allocated `45s` time set here:
```python
while response == None and time.time() - start_time < 45:
for model in fallbacks:
```
#### Cool-Downs for rate-limited models
If a model API call leads to an error - allow it to cooldown for `60s`
```python
except Exception as e:
print(f"got exception {e} for model {model}")
rate_limited_models.add(model)
model_expiration_times[model] = (
time.time() + 60
) # cool down this selected model
pass
```
Before making an LLM API call we check if the selected model is in `rate_limited_models`, if so skip making the API call
```python
if (
model in rate_limited_models
): # check if model is currently cooling down
if (
model_expiration_times.get(model)
and time.time() >= model_expiration_times[model]
):
rate_limited_models.remove(
model
) # check if it's been 60s of cool down and remove model
else:
continue # skip model
```
#### Full code of completion with fallbacks()
```python
response = None
rate_limited_models = set()
model_expiration_times = {}
start_time = time.time()
fallbacks = [kwargs["model"]] + kwargs["fallbacks"]
del kwargs["fallbacks"] # remove fallbacks so it's not recursive
while response == None and time.time() - start_time < 45:
for model in fallbacks:
# loop thru all models
try:
if (
model in rate_limited_models
): # check if model is currently cooling down
if (
model_expiration_times.get(model)
and time.time() >= model_expiration_times[model]
):
rate_limited_models.remove(
model
) # check if it's been 60s of cool down and remove model
else:
continue # skip model
# delete model from kwargs if it exists
if kwargs.get("model"):
del kwargs["model"]
print("making completion call", model)
response = litellm.completion(**kwargs, model=model)
if response != None:
return response
except Exception as e:
print(f"got exception {e} for model {model}")
rate_limited_models.add(model)
model_expiration_times[model] = (
time.time() + 60
) # cool down this selected model
pass
return response
``` -->

View file

@ -38,6 +38,7 @@ const sidebars = {
"completion/model_alias",
"completion/batching",
"completion/mock_requests",
"completion/reliable_completions",
],
},
{

View file

@ -67,6 +67,7 @@ def is_port_in_use(port):
@click.option('--headers', default=None, help='headers for the API call')
@click.option('--save', is_flag=True, type=bool, help='Save the model-specific config')
@click.option('--debug', default=False, is_flag=True, type=bool, help='To debug the input')
@click.option('--use_queue', default=False, is_flag=True, type=bool, help='To use celery workers for async endpoints')
@click.option('--temperature', default=None, type=float, help='Set temperature for the model')
@click.option('--max_tokens', default=None, type=int, help='Set max tokens for the model')
@click.option('--request_timeout', default=600, type=int, help='Set timeout in seconds for completion calls')
@ -78,8 +79,10 @@ def is_port_in_use(port):
@click.option('--telemetry', default=True, type=bool, help='Helps us know if people are using this feature. Turn this off by doing `--telemetry False`')
@click.option('--logs', flag_value=False, type=int, help='Gets the "n" most recent logs. By default gets most recent log.')
@click.option('--test', flag_value=True, help='proxy chat completions url to make a test request to')
@click.option('--test_async', default=False, is_flag=True, help='Calls async endpoints /queue/requests and /queue/response')
@click.option('--num_requests', default=10, type=int, help='Number of requests to hit async endpoint with')
@click.option('--local', is_flag=True, default=False, help='for local debugging')
def run_server(host, port, api_base, api_version, model, alias, add_key, headers, save, debug, temperature, max_tokens, request_timeout, drop_params, add_function_to_prompt, config, file, max_budget, telemetry, logs, test, local, num_workers):
def run_server(host, port, api_base, api_version, model, alias, add_key, headers, save, debug, temperature, max_tokens, request_timeout, drop_params, add_function_to_prompt, config, file, max_budget, telemetry, logs, test, local, num_workers, test_async, num_requests, use_queue):
global feature_telemetry
args = locals()
if local:
@ -112,6 +115,72 @@ def run_server(host, port, api_base, api_version, model, alias, add_key, headers
return
if model and "ollama" in model:
run_ollama_serve()
if test_async is True:
import requests, concurrent, time
api_base = f"http://{host}:{port}"
def _make_openai_completion():
data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Write a short poem about the moon"}]
}
response = requests.post("http://0.0.0.0:8000/queue/request", json=data)
response = response.json()
while True:
try:
url = response["url"]
polling_url = f"{api_base}{url}"
polling_response = requests.get(polling_url)
polling_response = polling_response.json()
print("\n RESPONSE FROM POLLING JOB", polling_response)
status = polling_response["status"]
if status == "finished":
llm_response = polling_response["result"]
with open("response_log.txt", "a") as log_file:
log_file.write(
f"Response ID: {llm_response.get('id', 'NA')}\nLLM Response: {llm_response}\nTime: {end_time - start_time:.2f} seconds\n\n"
)
break
print(f"POLLING JOB{polling_url}\nSTATUS: {status}, \n Response {polling_response}")
time.sleep(0.5)
except Exception as e:
print("got exception in polling", e)
break
# Number of concurrent calls (you can adjust this)
concurrent_calls = num_requests
# List to store the futures of concurrent calls
futures = []
# Make concurrent calls
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_calls) as executor:
for _ in range(concurrent_calls):
futures.append(executor.submit(_make_openai_completion))
# Wait for all futures to complete
concurrent.futures.wait(futures)
# Summarize the results
successful_calls = 0
failed_calls = 0
for future in futures:
if future.done():
if future.result() is not None:
successful_calls += 1
else:
failed_calls += 1
print(f"Load test Summary:")
print(f"Total Requests: {concurrent_calls}")
print(f"Successful Calls: {successful_calls}")
print(f"Failed Calls: {failed_calls}")
return
if test != False:
click.echo('\nLiteLLM: Making a test ChatCompletions request to your proxy')
import openai
@ -152,7 +221,7 @@ def run_server(host, port, api_base, api_version, model, alias, add_key, headers
else:
if headers:
headers = json.loads(headers)
save_worker_config(model=model, alias=alias, api_base=api_base, api_version=api_version, debug=debug, temperature=temperature, max_tokens=max_tokens, request_timeout=request_timeout, max_budget=max_budget, telemetry=telemetry, drop_params=drop_params, add_function_to_prompt=add_function_to_prompt, headers=headers, save=save, config=config)
save_worker_config(model=model, alias=alias, api_base=api_base, api_version=api_version, debug=debug, temperature=temperature, max_tokens=max_tokens, request_timeout=request_timeout, max_budget=max_budget, telemetry=telemetry, drop_params=drop_params, add_function_to_prompt=add_function_to_prompt, headers=headers, save=save, config=config, use_queue=use_queue)
try:
import uvicorn
except:

View file

@ -139,9 +139,9 @@ worker_config = None
master_key = None
prisma_client = None
### REDIS QUEUE ###
redis_job = None
redis_connection = None
request_queue = None # Redis Queue for handling requests
async_result = None
celery_app_conn = None
celery_fn = None # Redis Queue for handling requests
#### HELPER FUNCTIONS ####
def print_verbose(print_statement):
global user_debug
@ -219,15 +219,17 @@ def prisma_setup(database_url: Optional[str]):
except Exception as e:
print("Error when initializing prisma, Ensure you run pip install prisma", e)
def rq_setup(use_queue: bool):
global request_queue, redis_connection, redis_job
def celery_setup(use_queue: bool):
global celery_fn, celery_app_conn, async_result
print(f"value of use_queue: {use_queue}")
if use_queue:
from litellm.proxy.queue.celery_worker import start_worker
from litellm.proxy.queue.celery_app import celery_app, process_job
from celery.result import AsyncResult
request_queue = process_job
redis_job = AsyncResult
redis_connection = celery_app
start_worker(os.getcwd())
celery_fn = process_job
async_result = AsyncResult
celery_app_conn = celery_app
def run_ollama_serve():
command = ['ollama', 'serve']
@ -266,7 +268,7 @@ def load_router_config(router: Optional[litellm.Router], config_file_path: str):
prisma_setup(database_url=database_url)
### START REDIS QUEUE ###
use_queue = general_settings.get("use_queue", False)
rq_setup(use_queue=use_queue)
celery_setup(use_queue=use_queue)
## LITELLM MODULE SETTINGS (e.g. litellm.drop_params=True,..)
litellm_settings = config.get('litellm_settings', None)
@ -356,6 +358,7 @@ def initialize(
headers,
save,
config,
use_queue
):
global user_model, user_api_base, user_debug, user_max_tokens, user_request_timeout, user_temperature, user_telemetry, user_headers, experimental, llm_model_list, llm_router, server_settings
generate_feedback_box()
@ -396,6 +399,8 @@ def initialize(
dynamic_config["general"]["max_budget"] = max_budget
if debug==True: # litellm-specific param
litellm.set_verbose = True
if use_queue:
celery_setup(use_queue=use_queue)
if experimental:
pass
if save:
@ -588,8 +593,8 @@ async def generate_key_fn(request: Request):
)
@router.post("/queue/request", dependencies=[Depends(user_api_key_auth)])
async def async_chat_completions(request: Request):
global request_queue, llm_model_list
async def async_queue_request(request: Request):
global celery_fn, llm_model_list
body = await request.body()
body_str = body.decode()
try:
@ -603,19 +608,19 @@ async def async_chat_completions(request: Request):
)
data["llm_model_list"] = llm_model_list
print(f"data: {data}")
job = request_queue.apply_async(kwargs=data)
job = celery_fn.apply_async(kwargs=data)
return {"id": job.id, "url": f"/queue/response/{job.id}", "eta": 5, "status": "queued"}
pass
@router.get("/queue/response/{task_id}", dependencies=[Depends(user_api_key_auth)])
async def async_chat_completions(request: Request, task_id: str):
global redis_connection, redis_job
async def async_queue_response(request: Request, task_id: str):
global celery_app_conn, async_result
try:
job = redis_job(task_id, app=redis_connection)
job = async_result(task_id, app=celery_app_conn)
if job.ready():
return job.result
else:
return {'status': 'processing'}
return {'status': 'queued'}
except Exception as e:
return {"status": "finished", "result": str(e)}

View file

@ -1,8 +1,26 @@
from dotenv import load_dotenv
load_dotenv()
import json
import redis
import json, subprocess
import psutil # Import the psutil library
import atexit
try:
### OPTIONAL DEPENDENCIES ### - pip install redis and celery only when a user opts into using the async endpoints which require both
from celery import Celery
import redis
except:
import sys
subprocess.check_call(
[
sys.executable,
"-m",
"pip",
"install",
"redis",
"celery"
]
)
import time
import sys, os
sys.path.insert(
@ -11,7 +29,7 @@ sys.path.insert(
import litellm
# Redis connection setup
pool = redis.ConnectionPool(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"), db=0, max_connections=10)
pool = redis.ConnectionPool(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"), db=0, max_connections=5)
redis_client = redis.Redis(connection_pool=pool)
# Celery setup
@ -37,3 +55,18 @@ def process_job(*args, **kwargs):
print(e)
raise e
# Ensure Celery workers are terminated when the script exits
def cleanup():
try:
# Get a list of all running processes
for process in psutil.process_iter(attrs=['pid', 'name']):
# Check if the process is a Celery worker process
if process.info['name'] == 'celery':
print(f"Terminating Celery worker with PID {process.info['pid']}")
# Terminate the Celery worker process
psutil.Process(process.info['pid']).terminate()
except Exception as e:
print(f"Error during cleanup: {e}")
# Register the cleanup function to run when the script exits
atexit.register(cleanup)

View file

@ -1,15 +0,0 @@
from dotenv import load_dotenv
load_dotenv()
import sys, os
sys.path.insert(
0, os.path.abspath("../../..")
) # Adds the parent directory to the system path - for litellm local dev
import litellm
from litellm.proxy.queue.celery_app import celery_app
# Celery task
@celery_app.task(name='process_job')
def process_job(*args, **kwargs):
llm_router: litellm.Router = litellm.Router(model_list=kwargs.pop("llm_model_list"))
return llm_router.completion(*args, **kwargs)

View file

@ -1,9 +1,12 @@
import os
from multiprocessing import Process
def run_worker():
os.system("celery worker -A your_project_name.celery_app --concurrency=10 --loglevel=info")
def run_worker(cwd):
os.chdir(cwd)
os.system("celery -A celery_app.celery_app worker --concurrency=120 --loglevel=info")
if __name__ == "__main__":
worker_process = Process(target=run_worker)
def start_worker(cwd):
cwd += "/queue"
worker_process = Process(target=run_worker, args=(cwd,))
worker_process.start()