mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 10:44:24 +00:00
feat(proxy_server.py): enable infinite retries on rate limited requests
This commit is contained in:
parent
e5268fa6bc
commit
4791dda66f
7 changed files with 255 additions and 186 deletions
|
@ -195,8 +195,10 @@ prisma_client: Optional[PrismaClient] = None
|
|||
user_api_key_cache = DualCache()
|
||||
user_custom_auth = None
|
||||
use_background_health_checks = None
|
||||
use_queue = False
|
||||
health_check_interval = None
|
||||
health_check_results = {}
|
||||
queue: List = []
|
||||
### INITIALIZE GLOBAL LOGGING OBJECT ###
|
||||
proxy_logging_obj = ProxyLogging(user_api_key_cache=user_api_key_cache)
|
||||
### REDIS QUEUE ###
|
||||
|
@ -324,17 +326,6 @@ def prisma_setup(database_url: Optional[str]):
|
|||
except Exception as e:
|
||||
print("Error when initializing prisma, Ensure you run pip install prisma", e)
|
||||
|
||||
def celery_setup(use_queue: bool):
|
||||
global celery_fn, celery_app_conn, async_result
|
||||
if use_queue:
|
||||
from litellm.proxy.queue.celery_worker import start_worker
|
||||
from litellm.proxy.queue.celery_app import celery_app, process_job
|
||||
from celery.result import AsyncResult
|
||||
start_worker(os.getcwd())
|
||||
celery_fn = process_job
|
||||
async_result = AsyncResult
|
||||
celery_app_conn = celery_app
|
||||
|
||||
def load_from_azure_key_vault(use_azure_key_vault: bool = False):
|
||||
if use_azure_key_vault is False:
|
||||
return
|
||||
|
@ -450,7 +441,7 @@ async def _run_background_health_check():
|
|||
await asyncio.sleep(health_check_interval)
|
||||
|
||||
def load_router_config(router: Optional[litellm.Router], config_file_path: str):
|
||||
global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, use_background_health_checks, health_check_interval
|
||||
global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, use_background_health_checks, health_check_interval, use_queue
|
||||
config = {}
|
||||
try:
|
||||
if os.path.exists(config_file_path):
|
||||
|
@ -566,7 +557,6 @@ def load_router_config(router: Optional[litellm.Router], config_file_path: str):
|
|||
cost_tracking()
|
||||
### START REDIS QUEUE ###
|
||||
use_queue = general_settings.get("use_queue", False)
|
||||
celery_setup(use_queue=use_queue)
|
||||
### MASTER KEY ###
|
||||
master_key = general_settings.get("master_key", None)
|
||||
if master_key and master_key.startswith("os.environ/"):
|
||||
|
@ -757,8 +747,6 @@ def initialize(
|
|||
if max_budget: # litellm-specific param
|
||||
litellm.max_budget = max_budget
|
||||
dynamic_config["general"]["max_budget"] = max_budget
|
||||
if use_queue:
|
||||
celery_setup(use_queue=use_queue)
|
||||
if experimental:
|
||||
pass
|
||||
user_telemetry = telemetry
|
||||
|
@ -815,9 +803,11 @@ def get_litellm_model_info(model: dict = {}):
|
|||
|
||||
@router.on_event("startup")
|
||||
async def startup_event():
|
||||
global prisma_client, master_key, use_background_health_checks
|
||||
global prisma_client, master_key, use_background_health_checks, use_queue
|
||||
import json
|
||||
|
||||
print(f"VALUE OF USE_QUEUE: {use_queue}")
|
||||
|
||||
### LOAD CONFIG ###
|
||||
worker_config = litellm.get_secret("WORKER_CONFIG")
|
||||
print_verbose(f"worker_config: {worker_config}")
|
||||
|
@ -841,6 +831,7 @@ async def startup_event():
|
|||
# add master key to db
|
||||
await generate_key_helper_fn(duration=None, models=[], aliases={}, config={}, spend=0, token=master_key)
|
||||
|
||||
print("END OF STARTUP")
|
||||
|
||||
#### API ENDPOINTS ####
|
||||
@router.get("/v1/models", dependencies=[Depends(user_api_key_auth)])
|
||||
|
@ -1361,47 +1352,108 @@ async def delete_model(model_info: ModelInfoDelete):
|
|||
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
|
||||
|
||||
#### EXPERIMENTAL QUEUING ####
|
||||
@router.post("/queue/request", dependencies=[Depends(user_api_key_auth)])
|
||||
async def async_queue_request(request: Request):
|
||||
global celery_fn, llm_model_list
|
||||
if celery_fn is not None:
|
||||
body = await request.body()
|
||||
body_str = body.decode()
|
||||
try:
|
||||
data = ast.literal_eval(body_str)
|
||||
except:
|
||||
data = json.loads(body_str)
|
||||
async def _litellm_chat_completions_worker(data, user_api_key_dict):
|
||||
"""
|
||||
worker to make litellm completions calls
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
### CALL HOOKS ### - modify incoming data before calling the model
|
||||
data = await proxy_logging_obj.pre_call_hook(user_api_key_dict=user_api_key_dict, data=data, call_type="completion")
|
||||
|
||||
print(f"_litellm_chat_completions_worker started")
|
||||
### ROUTE THE REQUEST ###
|
||||
router_model_names = [m["model_name"] for m in llm_model_list] if llm_model_list is not None else []
|
||||
if llm_router is not None and data["model"] in router_model_names: # model in router model list
|
||||
response = await llm_router.acompletion(**data)
|
||||
elif llm_router is not None and data["model"] in llm_router.deployment_names: # model in router deployments, calling a specific deployment on the router
|
||||
response = await llm_router.acompletion(**data, specific_deployment = True)
|
||||
elif llm_router is not None and llm_router.model_group_alias is not None and data["model"] in llm_router.model_group_alias: # model set in model_group_alias
|
||||
response = await llm_router.acompletion(**data)
|
||||
else: # router is not set
|
||||
response = await litellm.acompletion(**data)
|
||||
|
||||
print(f"final response: {response}")
|
||||
return response
|
||||
except HTTPException as e:
|
||||
print(f"EXCEPTION RAISED IN _litellm_chat_completions_worker - {e.status_code}; {e.detail}")
|
||||
if e.status_code == 429 and "Max parallel request limit reached" in e.detail:
|
||||
print(f"Max parallel request limit reached!")
|
||||
timeout = litellm._calculate_retry_after(remaining_retries=3, max_retries=3, min_timeout=1)
|
||||
await asyncio.sleep(timeout)
|
||||
else:
|
||||
raise e
|
||||
|
||||
|
||||
@router.post("/queue/chat/completions", tags=["experimental"], dependencies=[Depends(user_api_key_auth)])
|
||||
async def async_queue_request(request: Request, model: Optional[str] = None, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), background_tasks: BackgroundTasks = BackgroundTasks()):
|
||||
global general_settings, user_debug, proxy_logging_obj
|
||||
"""
|
||||
v2 attempt at a background worker to handle queuing.
|
||||
|
||||
Just supports /chat/completion calls currently.
|
||||
|
||||
Now using a FastAPI background task + /chat/completions compatible endpoint
|
||||
"""
|
||||
try:
|
||||
data = {}
|
||||
data = await request.json() # type: ignore
|
||||
|
||||
# Include original request and headers in the data
|
||||
data["proxy_server_request"] = {
|
||||
"url": str(request.url),
|
||||
"method": request.method,
|
||||
"headers": dict(request.headers),
|
||||
"body": copy.copy(data) # use copy instead of deepcopy
|
||||
}
|
||||
|
||||
print_verbose(f"receiving data: {data}")
|
||||
data["model"] = (
|
||||
general_settings.get("completion_model", None) # server default
|
||||
or user_model # model name passed via cli args
|
||||
or model # for azure deployments
|
||||
or data["model"] # default passed in http request
|
||||
)
|
||||
data["llm_model_list"] = llm_model_list
|
||||
print(f"data: {data}")
|
||||
job = celery_fn.apply_async(kwargs=data)
|
||||
return {"id": job.id, "url": f"/queue/response/{job.id}", "eta": 5, "status": "queued"}
|
||||
else:
|
||||
|
||||
# users can pass in 'user' param to /chat/completions. Don't override it
|
||||
if data.get("user", None) is None and user_api_key_dict.user_id is not None:
|
||||
# if users are using user_api_key_auth, set `user` in `data`
|
||||
data["user"] = user_api_key_dict.user_id
|
||||
|
||||
if "metadata" in data:
|
||||
print(f'received metadata: {data["metadata"]}')
|
||||
data["metadata"]["user_api_key"] = user_api_key_dict.api_key
|
||||
data["metadata"]["headers"] = dict(request.headers)
|
||||
else:
|
||||
data["metadata"] = {"user_api_key": user_api_key_dict.api_key}
|
||||
data["metadata"]["headers"] = dict(request.headers)
|
||||
|
||||
global user_temperature, user_request_timeout, user_max_tokens, user_api_base
|
||||
# override with user settings, these are params passed via cli
|
||||
if user_temperature:
|
||||
data["temperature"] = user_temperature
|
||||
if user_request_timeout:
|
||||
data["request_timeout"] = user_request_timeout
|
||||
if user_max_tokens:
|
||||
data["max_tokens"] = user_max_tokens
|
||||
if user_api_base:
|
||||
data["api_base"] = user_api_base
|
||||
|
||||
response = await asyncio.wait_for(_litellm_chat_completions_worker(data=data, user_api_key_dict=user_api_key_dict), timeout=litellm.request_timeout)
|
||||
|
||||
if 'stream' in data and data['stream'] == True: # use generate_responses to stream responses
|
||||
return StreamingResponse(async_data_generator(user_api_key_dict=user_api_key_dict, response=response), media_type='text/event-stream')
|
||||
|
||||
background_tasks.add_task(log_input_output, request, response) # background task for logging to OTEL
|
||||
return response
|
||||
except Exception as e:
|
||||
await proxy_logging_obj.post_call_failure_hook(user_api_key_dict=user_api_key_dict, original_exception=e)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail={"error": "Queue not initialized"},
|
||||
detail={"error": str(e)},
|
||||
)
|
||||
|
||||
@router.get("/queue/response/{task_id}", dependencies=[Depends(user_api_key_auth)])
|
||||
async def async_queue_response(request: Request, task_id: str):
|
||||
global celery_app_conn, async_result
|
||||
try:
|
||||
if celery_app_conn is not None and async_result is not None:
|
||||
job = async_result(task_id, app=celery_app_conn)
|
||||
if job.ready():
|
||||
return {"status": "finished", "result": job.result}
|
||||
else:
|
||||
return {'status': 'queued'}
|
||||
else:
|
||||
raise Exception()
|
||||
except Exception as e:
|
||||
return {"status": "finished", "result": str(e)}
|
||||
|
||||
|
||||
|
||||
|
||||
@router.get("/ollama_logs", dependencies=[Depends(user_api_key_auth)])
|
||||
async def retrieve_server_log(request: Request):
|
||||
filepath = os.path.expanduser("~/.ollama/logs/server.log")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue