diff --git a/cookbook/litellm_router/load_test_queuing.py b/cookbook/litellm_router/load_test_queuing.py index cbe3b09ae5..f3acb8f044 100644 --- a/cookbook/litellm_router/load_test_queuing.py +++ b/cookbook/litellm_router/load_test_queuing.py @@ -13,7 +13,7 @@ import pytest from litellm import Router import litellm litellm.set_verbose=False -os.environ.pop("AZURE_AD_TOKEN") +# os.environ.pop("AZURE_AD_TOKEN") model_list = [{ # list of model deployments "model_name": "gpt-3.5-turbo", # model alias @@ -142,10 +142,11 @@ successful_calls = 0 failed_calls = 0 for future in futures: - if future.result() is not None: - successful_calls += 1 - else: - failed_calls += 1 + if future.done(): + if future.result() is not None: + successful_calls += 1 + else: + failed_calls += 1 print(f"Load test Summary:") print(f"Total Requests: {concurrent_calls}") diff --git a/cookbook/litellm_router/request_log.txt b/cookbook/litellm_router/request_log.txt index e69de29bb2..0aed749049 100644 --- a/cookbook/litellm_router/request_log.txt +++ b/cookbook/litellm_router/request_log.txt @@ -0,0 +1,48 @@ +Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope +Response ID: 71a47cd4-92d9-4091-9429-8d22af6b56bf Url: /queue/response/71a47cd4-92d9-4091-9429-8d22af6b56bf +Time: 0.77 seconds + +Question: Given this context, what is litellm? LiteLLM about: About +Call all LLM APIs using the OpenAI format. +Response ID: a0855c20-59ba-4eed-85c1-e0719eebdeab Url: /queue/response/a0855c20-59ba-4eed-85c1-e0719eebdeab +Time: 1.46 seconds + +Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope +Response ID: b131cdcd-0693-495b-ad41-b0cf2afc4833 Url: /queue/response/b131cdcd-0693-495b-ad41-b0cf2afc4833 +Time: 2.13 seconds + +Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope +Response ID: a58e5185-90e7-4832-9f28-e5a5ac167a40 Url: /queue/response/a58e5185-90e7-4832-9f28-e5a5ac167a40 +Time: 2.83 seconds + +Question: Given this context, what is litellm? LiteLLM about: About +Call all LLM APIs using the OpenAI format. +Response ID: 52dbbd49-eedb-4c11-8382-3ca7deb1af35 Url: /queue/response/52dbbd49-eedb-4c11-8382-3ca7deb1af35 +Time: 3.50 seconds + +Question: What endpoints does the litellm proxy have 💥 OpenAI Proxy Server +LiteLLM Server manages: + +Calling 10 +Response ID: eedda05f-61e1-4081-b49d-27f9449bcf69 Url: /queue/response/eedda05f-61e1-4081-b49d-27f9449bcf69 +Time: 4.20 seconds + +Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope +Response ID: 8a484722-66ec-4193-b19b-2dfc4265cfd2 Url: /queue/response/8a484722-66ec-4193-b19b-2dfc4265cfd2 +Time: 4.89 seconds + +Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope +Response ID: ae1e2b71-d711-456d-8df0-13ce0709eb04 Url: /queue/response/ae1e2b71-d711-456d-8df0-13ce0709eb04 +Time: 5.60 seconds + +Question: What endpoints does the litellm proxy have 💥 OpenAI Proxy Server +LiteLLM Server manages: + +Calling 10 +Response ID: cfabd174-838e-4252-b82b-648923573db8 Url: /queue/response/cfabd174-838e-4252-b82b-648923573db8 +Time: 6.29 seconds + +Question: Does litellm support ooobagooba llms? how can i call oobagooba llms. Call all LLM APIs using the Ope +Response ID: 02d5b7d6-5443-41e9-94e4-90d8b00d49fb Url: /queue/response/02d5b7d6-5443-41e9-94e4-90d8b00d49fb +Time: 7.01 seconds + diff --git a/docs/my-website/docs/routing.md b/docs/my-website/docs/routing.md index c4aa5a0848..86c8fa3026 100644 --- a/docs/my-website/docs/routing.md +++ b/docs/my-website/docs/routing.md @@ -3,11 +3,13 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -# Router - Load Balancing, Queing +# Router - Load Balancing, Queueing -- Load-balance across multiple deployments (e.g. Azure/OpenAI): Pick the deployment which is below rate-limit and has the least amount of tokens used. -- Queuing Requests to ensure requests don't fail +LiteLLM manages: +- Load-balance across multiple deployments (e.g. Azure/OpenAI) +- Prioritizing important requests to ensure they don't fail (i.e. Queueing) +## Load Balancing (s/o [@paulpierre](https://www.linkedin.com/in/paulpierre/) for his contribution to this implementation) [**See Code**](https://github.com/BerriAI/litellm/blob/main/litellm/router.py) @@ -56,8 +58,8 @@ print(response) - `router.aembeddings()` - async embeddings endpoint - `router.text_completion()` - completion calls in the old OpenAI `/v1/completions` endpoint format -## Advanced -### Routing Strategies - Shuffle, Rate Limit Aware +### Advanced +#### Routing Strategies - Shuffle, Rate Limit Aware Router provides 2 strategies for routing your calls across multiple deployments: @@ -164,7 +166,7 @@ print(response) -### Caching + Request Timeouts +#### Caching + Request Timeouts In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching. @@ -190,7 +192,7 @@ router = Router(model_list=model_list, print(response) ``` -### Retry failed requests +#### Retry failed requests For both async + sync functions, we support retrying failed requests. @@ -219,7 +221,7 @@ response = router.completion(model="gpt-3.5-turbo", messages=messages) print(f"response: {response}") ``` -### Default litellm.completion/embedding params +#### Default litellm.completion/embedding params You can also set default params for litellm completion/embedding calls. Here's how to do that: @@ -241,14 +243,51 @@ print(f"response: {response}") ``` -### Deploy Router +#### Deploy Router If you want a server to just route requests to different LLM APIs, use our [OpenAI Proxy Server](./simple_proxy.md#multiple-instances-of-1-model) +## Queuing + +### Quick Start + +This requires a [Redis DB](https://redis.com/) to work. + +Our implementation uses LiteLLM's proxy server + Celery workers to process up to 100 req./s + +[**See Code**](https://github.com/BerriAI/litellm/blob/fbf9cab5b9e35df524e2c9953180c58d92e4cd97/litellm/proxy/proxy_server.py#L589) + +1. Add Redis credentials in a .env file + +```python +REDIS_HOST="my-redis-endpoint" +REDIS_PORT="my-redis-port" +REDIS_PASSWORD="my-redis-password" # [OPTIONAL] if self-hosted +REDIS_USERNAME="default" # [OPTIONAL] if self-hosted +``` + +2. Start litellm server with your model config + +```bash +$ litellm --config /path/to/config.yaml --use_queue +``` + +3. Test (in another window) → sends 100 simultaneous requests to the queue + +```bash +$ litellm --test_async --num_requests 100 +``` + + +### Available Endpoints +- `/queue/request` - Queues a /chat/completions request. Returns a job id. +- `/queue/response/{id}` - Returns the status of a job. If completed, returns the response as well. Potential status's are: `queued` and `finished`. + + ## Hosted Router + Request Queing api.litellm.ai Queue your LLM API requests to ensure you're under your rate limits -- Step 1: Create a `/queue/reques` request -- Step 2: Poll your request, to check if it's completed +- Step 1: Make a POST request `/queue/request` (this follows the same input format as an openai `/chat/completions` call, and returns a job id). +- Step 2: Make a GET request, `queue/response` to check if it's completed ## Step 1 Add a config to the proxy, generate a temp key @@ -339,205 +378,4 @@ while True: print("got exception in polling", e) break -``` - - - - - - - \ No newline at end of file +``` \ No newline at end of file diff --git a/docs/my-website/sidebars.js b/docs/my-website/sidebars.js index c471c0d6d0..d50f036753 100644 --- a/docs/my-website/sidebars.js +++ b/docs/my-website/sidebars.js @@ -38,6 +38,7 @@ const sidebars = { "completion/model_alias", "completion/batching", "completion/mock_requests", + "completion/reliable_completions", ], }, { diff --git a/litellm/proxy/proxy_cli.py b/litellm/proxy/proxy_cli.py index 60b475ebac..629705749a 100644 --- a/litellm/proxy/proxy_cli.py +++ b/litellm/proxy/proxy_cli.py @@ -67,6 +67,7 @@ def is_port_in_use(port): @click.option('--headers', default=None, help='headers for the API call') @click.option('--save', is_flag=True, type=bool, help='Save the model-specific config') @click.option('--debug', default=False, is_flag=True, type=bool, help='To debug the input') +@click.option('--use_queue', default=False, is_flag=True, type=bool, help='To use celery workers for async endpoints') @click.option('--temperature', default=None, type=float, help='Set temperature for the model') @click.option('--max_tokens', default=None, type=int, help='Set max tokens for the model') @click.option('--request_timeout', default=600, type=int, help='Set timeout in seconds for completion calls') @@ -78,8 +79,10 @@ def is_port_in_use(port): @click.option('--telemetry', default=True, type=bool, help='Helps us know if people are using this feature. Turn this off by doing `--telemetry False`') @click.option('--logs', flag_value=False, type=int, help='Gets the "n" most recent logs. By default gets most recent log.') @click.option('--test', flag_value=True, help='proxy chat completions url to make a test request to') +@click.option('--test_async', default=False, is_flag=True, help='Calls async endpoints /queue/requests and /queue/response') +@click.option('--num_requests', default=10, type=int, help='Number of requests to hit async endpoint with') @click.option('--local', is_flag=True, default=False, help='for local debugging') -def run_server(host, port, api_base, api_version, model, alias, add_key, headers, save, debug, temperature, max_tokens, request_timeout, drop_params, add_function_to_prompt, config, file, max_budget, telemetry, logs, test, local, num_workers): +def run_server(host, port, api_base, api_version, model, alias, add_key, headers, save, debug, temperature, max_tokens, request_timeout, drop_params, add_function_to_prompt, config, file, max_budget, telemetry, logs, test, local, num_workers, test_async, num_requests, use_queue): global feature_telemetry args = locals() if local: @@ -112,6 +115,72 @@ def run_server(host, port, api_base, api_version, model, alias, add_key, headers return if model and "ollama" in model: run_ollama_serve() + if test_async is True: + import requests, concurrent, time + api_base = f"http://{host}:{port}" + + def _make_openai_completion(): + data = { + "model": "gpt-3.5-turbo", + "messages": [{"role": "user", "content": "Write a short poem about the moon"}] + } + + response = requests.post("http://0.0.0.0:8000/queue/request", json=data) + + response = response.json() + + while True: + try: + url = response["url"] + polling_url = f"{api_base}{url}" + polling_response = requests.get(polling_url) + polling_response = polling_response.json() + print("\n RESPONSE FROM POLLING JOB", polling_response) + status = polling_response["status"] + if status == "finished": + llm_response = polling_response["result"] + with open("response_log.txt", "a") as log_file: + log_file.write( + f"Response ID: {llm_response.get('id', 'NA')}\nLLM Response: {llm_response}\nTime: {end_time - start_time:.2f} seconds\n\n" + ) + + break + print(f"POLLING JOB{polling_url}\nSTATUS: {status}, \n Response {polling_response}") + time.sleep(0.5) + except Exception as e: + print("got exception in polling", e) + break + + # Number of concurrent calls (you can adjust this) + concurrent_calls = num_requests + + # List to store the futures of concurrent calls + futures = [] + + # Make concurrent calls + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_calls) as executor: + for _ in range(concurrent_calls): + futures.append(executor.submit(_make_openai_completion)) + + # Wait for all futures to complete + concurrent.futures.wait(futures) + + # Summarize the results + successful_calls = 0 + failed_calls = 0 + + for future in futures: + if future.done(): + if future.result() is not None: + successful_calls += 1 + else: + failed_calls += 1 + + print(f"Load test Summary:") + print(f"Total Requests: {concurrent_calls}") + print(f"Successful Calls: {successful_calls}") + print(f"Failed Calls: {failed_calls}") + return if test != False: click.echo('\nLiteLLM: Making a test ChatCompletions request to your proxy') import openai @@ -152,7 +221,7 @@ def run_server(host, port, api_base, api_version, model, alias, add_key, headers else: if headers: headers = json.loads(headers) - save_worker_config(model=model, alias=alias, api_base=api_base, api_version=api_version, debug=debug, temperature=temperature, max_tokens=max_tokens, request_timeout=request_timeout, max_budget=max_budget, telemetry=telemetry, drop_params=drop_params, add_function_to_prompt=add_function_to_prompt, headers=headers, save=save, config=config) + save_worker_config(model=model, alias=alias, api_base=api_base, api_version=api_version, debug=debug, temperature=temperature, max_tokens=max_tokens, request_timeout=request_timeout, max_budget=max_budget, telemetry=telemetry, drop_params=drop_params, add_function_to_prompt=add_function_to_prompt, headers=headers, save=save, config=config, use_queue=use_queue) try: import uvicorn except: diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index eb939bc0cf..4485e75ced 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -139,9 +139,9 @@ worker_config = None master_key = None prisma_client = None ### REDIS QUEUE ### -redis_job = None -redis_connection = None -request_queue = None # Redis Queue for handling requests +async_result = None +celery_app_conn = None +celery_fn = None # Redis Queue for handling requests #### HELPER FUNCTIONS #### def print_verbose(print_statement): global user_debug @@ -219,15 +219,17 @@ def prisma_setup(database_url: Optional[str]): except Exception as e: print("Error when initializing prisma, Ensure you run pip install prisma", e) -def rq_setup(use_queue: bool): - global request_queue, redis_connection, redis_job +def celery_setup(use_queue: bool): + global celery_fn, celery_app_conn, async_result print(f"value of use_queue: {use_queue}") if use_queue: + from litellm.proxy.queue.celery_worker import start_worker from litellm.proxy.queue.celery_app import celery_app, process_job from celery.result import AsyncResult - request_queue = process_job - redis_job = AsyncResult - redis_connection = celery_app + start_worker(os.getcwd()) + celery_fn = process_job + async_result = AsyncResult + celery_app_conn = celery_app def run_ollama_serve(): command = ['ollama', 'serve'] @@ -266,7 +268,7 @@ def load_router_config(router: Optional[litellm.Router], config_file_path: str): prisma_setup(database_url=database_url) ### START REDIS QUEUE ### use_queue = general_settings.get("use_queue", False) - rq_setup(use_queue=use_queue) + celery_setup(use_queue=use_queue) ## LITELLM MODULE SETTINGS (e.g. litellm.drop_params=True,..) litellm_settings = config.get('litellm_settings', None) @@ -356,6 +358,7 @@ def initialize( headers, save, config, + use_queue ): global user_model, user_api_base, user_debug, user_max_tokens, user_request_timeout, user_temperature, user_telemetry, user_headers, experimental, llm_model_list, llm_router, server_settings generate_feedback_box() @@ -396,6 +399,8 @@ def initialize( dynamic_config["general"]["max_budget"] = max_budget if debug==True: # litellm-specific param litellm.set_verbose = True + if use_queue: + celery_setup(use_queue=use_queue) if experimental: pass if save: @@ -588,8 +593,8 @@ async def generate_key_fn(request: Request): ) @router.post("/queue/request", dependencies=[Depends(user_api_key_auth)]) -async def async_chat_completions(request: Request): - global request_queue, llm_model_list +async def async_queue_request(request: Request): + global celery_fn, llm_model_list body = await request.body() body_str = body.decode() try: @@ -603,19 +608,19 @@ async def async_chat_completions(request: Request): ) data["llm_model_list"] = llm_model_list print(f"data: {data}") - job = request_queue.apply_async(kwargs=data) + job = celery_fn.apply_async(kwargs=data) return {"id": job.id, "url": f"/queue/response/{job.id}", "eta": 5, "status": "queued"} pass @router.get("/queue/response/{task_id}", dependencies=[Depends(user_api_key_auth)]) -async def async_chat_completions(request: Request, task_id: str): - global redis_connection, redis_job +async def async_queue_response(request: Request, task_id: str): + global celery_app_conn, async_result try: - job = redis_job(task_id, app=redis_connection) + job = async_result(task_id, app=celery_app_conn) if job.ready(): return job.result else: - return {'status': 'processing'} + return {'status': 'queued'} except Exception as e: return {"status": "finished", "result": str(e)} diff --git a/litellm/proxy/queue/celery_app.py b/litellm/proxy/queue/celery_app.py index f06ca15f95..466d2da89e 100644 --- a/litellm/proxy/queue/celery_app.py +++ b/litellm/proxy/queue/celery_app.py @@ -1,8 +1,26 @@ from dotenv import load_dotenv load_dotenv() -import json -import redis -from celery import Celery +import json, subprocess +import psutil # Import the psutil library +import atexit +try: + ### OPTIONAL DEPENDENCIES ### - pip install redis and celery only when a user opts into using the async endpoints which require both + from celery import Celery + import redis +except: + import sys + + subprocess.check_call( + [ + sys.executable, + "-m", + "pip", + "install", + "redis", + "celery" + ] + ) + import time import sys, os sys.path.insert( @@ -11,7 +29,7 @@ sys.path.insert( import litellm # Redis connection setup -pool = redis.ConnectionPool(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"), db=0, max_connections=10) +pool = redis.ConnectionPool(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"), db=0, max_connections=5) redis_client = redis.Redis(connection_pool=pool) # Celery setup @@ -36,4 +54,19 @@ def process_job(*args, **kwargs): except Exception as e: print(e) raise e - \ No newline at end of file + +# Ensure Celery workers are terminated when the script exits +def cleanup(): + try: + # Get a list of all running processes + for process in psutil.process_iter(attrs=['pid', 'name']): + # Check if the process is a Celery worker process + if process.info['name'] == 'celery': + print(f"Terminating Celery worker with PID {process.info['pid']}") + # Terminate the Celery worker process + psutil.Process(process.info['pid']).terminate() + except Exception as e: + print(f"Error during cleanup: {e}") + +# Register the cleanup function to run when the script exits +atexit.register(cleanup) \ No newline at end of file diff --git a/litellm/proxy/queue/celery_task.py b/litellm/proxy/queue/celery_task.py deleted file mode 100644 index ea2075db07..0000000000 --- a/litellm/proxy/queue/celery_task.py +++ /dev/null @@ -1,15 +0,0 @@ -from dotenv import load_dotenv -load_dotenv() - -import sys, os -sys.path.insert( - 0, os.path.abspath("../../..") -) # Adds the parent directory to the system path - for litellm local dev -import litellm -from litellm.proxy.queue.celery_app import celery_app - -# Celery task -@celery_app.task(name='process_job') -def process_job(*args, **kwargs): - llm_router: litellm.Router = litellm.Router(model_list=kwargs.pop("llm_model_list")) - return llm_router.completion(*args, **kwargs) \ No newline at end of file diff --git a/litellm/proxy/queue/celery_worker.py b/litellm/proxy/queue/celery_worker.py index 97179bd844..7206723f82 100644 --- a/litellm/proxy/queue/celery_worker.py +++ b/litellm/proxy/queue/celery_worker.py @@ -1,9 +1,12 @@ import os from multiprocessing import Process -def run_worker(): - os.system("celery worker -A your_project_name.celery_app --concurrency=10 --loglevel=info") +def run_worker(cwd): + os.chdir(cwd) + os.system("celery -A celery_app.celery_app worker --concurrency=120 --loglevel=info") -if __name__ == "__main__": - worker_process = Process(target=run_worker) - worker_process.start() \ No newline at end of file +def start_worker(cwd): + cwd += "/queue" + worker_process = Process(target=run_worker, args=(cwd,)) + worker_process.start() + \ No newline at end of file