From 4791dda66f420686b9a9e0196e3256b03cabe8f8 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Fri, 15 Dec 2023 20:03:41 -0800 Subject: [PATCH] feat(proxy_server.py): enable infinite retries on rate limited requests --- .gitignore | 1 + litellm/llms/ollama.py | 14 +- .../proxy/hooks/parallel_request_limiter.py | 2 + litellm/proxy/proxy_server.py | 150 ++++++---- litellm/tests/test_ollama_local.py | 261 +++++++++--------- litellm/utils.py | 9 +- pyproject.toml | 4 +- 7 files changed, 255 insertions(+), 186 deletions(-) diff --git a/.gitignore b/.gitignore index bad6dc917..b31366a33 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ litellm/tests/test_custom_logger.py litellm/tests/langfuse.log litellm/tests/dynamo*.log .vscode/settings.json +litellm/proxy/log.txt diff --git a/litellm/llms/ollama.py b/litellm/llms/ollama.py index 6d6d774a4..032c7a048 100644 --- a/litellm/llms/ollama.py +++ b/litellm/llms/ollama.py @@ -182,17 +182,11 @@ def ollama_completion_stream(url, data): traceback.print_exc() session.close() -async def iter_lines(reader): - buffer = b"" - async for chunk in reader.iter_any(): - buffer += chunk - while b'\n' in buffer: - line, buffer = buffer.split(b'\n', 1) - yield line async def ollama_async_streaming(url, data, model_response, encoding, logging_obj): try: - with httpx.stream( + client = httpx.AsyncClient() + async with client.stream( url=f"{url}", json=data, method="POST", @@ -201,8 +195,8 @@ async def ollama_async_streaming(url, data, model_response, encoding, logging_ob if response.status_code != 200: raise OllamaError(status_code=response.status_code, message=response.text) - streamwrapper = litellm.CustomStreamWrapper(completion_stream=response.iter_lines(), model=data['model'], custom_llm_provider="ollama",logging_obj=logging_obj) - for transformed_chunk in streamwrapper: + streamwrapper = litellm.CustomStreamWrapper(completion_stream=response.aiter_lines(), model=data['model'], custom_llm_provider="ollama",logging_obj=logging_obj) + async for transformed_chunk in streamwrapper: yield transformed_chunk except Exception as e: traceback.print_exc() diff --git a/litellm/proxy/hooks/parallel_request_limiter.py b/litellm/proxy/hooks/parallel_request_limiter.py index 6875af484..d42a5739a 100644 --- a/litellm/proxy/hooks/parallel_request_limiter.py +++ b/litellm/proxy/hooks/parallel_request_limiter.py @@ -16,6 +16,7 @@ class MaxParallelRequestsHandler(CustomLogger): async def async_pre_call_hook(self, user_api_key_dict: UserAPIKeyAuth, cache: DualCache, data: dict, call_type: str): + self.print_verbose(f"Inside Max Parallel Request Pre-Call Hook") api_key = user_api_key_dict.api_key max_parallel_requests = user_api_key_dict.max_parallel_requests @@ -61,6 +62,7 @@ class MaxParallelRequestsHandler(CustomLogger): async def async_log_failure_call(self, user_api_key_dict: UserAPIKeyAuth, original_exception: Exception): try: + self.print_verbose(f"Inside Max Parallel Request Failure Hook") api_key = user_api_key_dict.api_key if api_key is None: return diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 46cd3225a..baee3b49c 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -195,8 +195,10 @@ prisma_client: Optional[PrismaClient] = None user_api_key_cache = DualCache() user_custom_auth = None use_background_health_checks = None +use_queue = False health_check_interval = None health_check_results = {} +queue: List = [] ### INITIALIZE GLOBAL LOGGING OBJECT ### proxy_logging_obj = ProxyLogging(user_api_key_cache=user_api_key_cache) ### REDIS QUEUE ### @@ -324,17 +326,6 @@ def prisma_setup(database_url: Optional[str]): except Exception as e: print("Error when initializing prisma, Ensure you run pip install prisma", e) -def celery_setup(use_queue: bool): - global celery_fn, celery_app_conn, async_result - if use_queue: - from litellm.proxy.queue.celery_worker import start_worker - from litellm.proxy.queue.celery_app import celery_app, process_job - from celery.result import AsyncResult - start_worker(os.getcwd()) - celery_fn = process_job - async_result = AsyncResult - celery_app_conn = celery_app - def load_from_azure_key_vault(use_azure_key_vault: bool = False): if use_azure_key_vault is False: return @@ -450,7 +441,7 @@ async def _run_background_health_check(): await asyncio.sleep(health_check_interval) def load_router_config(router: Optional[litellm.Router], config_file_path: str): - global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, use_background_health_checks, health_check_interval + global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, use_background_health_checks, health_check_interval, use_queue config = {} try: if os.path.exists(config_file_path): @@ -566,7 +557,6 @@ def load_router_config(router: Optional[litellm.Router], config_file_path: str): cost_tracking() ### START REDIS QUEUE ### use_queue = general_settings.get("use_queue", False) - celery_setup(use_queue=use_queue) ### MASTER KEY ### master_key = general_settings.get("master_key", None) if master_key and master_key.startswith("os.environ/"): @@ -757,8 +747,6 @@ def initialize( if max_budget: # litellm-specific param litellm.max_budget = max_budget dynamic_config["general"]["max_budget"] = max_budget - if use_queue: - celery_setup(use_queue=use_queue) if experimental: pass user_telemetry = telemetry @@ -815,9 +803,11 @@ def get_litellm_model_info(model: dict = {}): @router.on_event("startup") async def startup_event(): - global prisma_client, master_key, use_background_health_checks + global prisma_client, master_key, use_background_health_checks, use_queue import json + print(f"VALUE OF USE_QUEUE: {use_queue}") + ### LOAD CONFIG ### worker_config = litellm.get_secret("WORKER_CONFIG") print_verbose(f"worker_config: {worker_config}") @@ -841,6 +831,7 @@ async def startup_event(): # add master key to db await generate_key_helper_fn(duration=None, models=[], aliases={}, config={}, spend=0, token=master_key) + print("END OF STARTUP") #### API ENDPOINTS #### @router.get("/v1/models", dependencies=[Depends(user_api_key_auth)]) @@ -1361,47 +1352,108 @@ async def delete_model(model_info: ModelInfoDelete): raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") #### EXPERIMENTAL QUEUING #### -@router.post("/queue/request", dependencies=[Depends(user_api_key_auth)]) -async def async_queue_request(request: Request): - global celery_fn, llm_model_list - if celery_fn is not None: - body = await request.body() - body_str = body.decode() - try: - data = ast.literal_eval(body_str) - except: - data = json.loads(body_str) +async def _litellm_chat_completions_worker(data, user_api_key_dict): + """ + worker to make litellm completions calls + """ + while True: + try: + ### CALL HOOKS ### - modify incoming data before calling the model + data = await proxy_logging_obj.pre_call_hook(user_api_key_dict=user_api_key_dict, data=data, call_type="completion") + + print(f"_litellm_chat_completions_worker started") + ### ROUTE THE REQUEST ### + router_model_names = [m["model_name"] for m in llm_model_list] if llm_model_list is not None else [] + if llm_router is not None and data["model"] in router_model_names: # model in router model list + response = await llm_router.acompletion(**data) + elif llm_router is not None and data["model"] in llm_router.deployment_names: # model in router deployments, calling a specific deployment on the router + response = await llm_router.acompletion(**data, specific_deployment = True) + elif llm_router is not None and llm_router.model_group_alias is not None and data["model"] in llm_router.model_group_alias: # model set in model_group_alias + response = await llm_router.acompletion(**data) + else: # router is not set + response = await litellm.acompletion(**data) + + print(f"final response: {response}") + return response + except HTTPException as e: + print(f"EXCEPTION RAISED IN _litellm_chat_completions_worker - {e.status_code}; {e.detail}") + if e.status_code == 429 and "Max parallel request limit reached" in e.detail: + print(f"Max parallel request limit reached!") + timeout = litellm._calculate_retry_after(remaining_retries=3, max_retries=3, min_timeout=1) + await asyncio.sleep(timeout) + else: + raise e + + +@router.post("/queue/chat/completions", tags=["experimental"], dependencies=[Depends(user_api_key_auth)]) +async def async_queue_request(request: Request, model: Optional[str] = None, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), background_tasks: BackgroundTasks = BackgroundTasks()): + global general_settings, user_debug, proxy_logging_obj + """ + v2 attempt at a background worker to handle queuing. + + Just supports /chat/completion calls currently. + + Now using a FastAPI background task + /chat/completions compatible endpoint + """ + try: + data = {} + data = await request.json() # type: ignore + + # Include original request and headers in the data + data["proxy_server_request"] = { + "url": str(request.url), + "method": request.method, + "headers": dict(request.headers), + "body": copy.copy(data) # use copy instead of deepcopy + } + + print_verbose(f"receiving data: {data}") data["model"] = ( general_settings.get("completion_model", None) # server default or user_model # model name passed via cli args + or model # for azure deployments or data["model"] # default passed in http request ) - data["llm_model_list"] = llm_model_list - print(f"data: {data}") - job = celery_fn.apply_async(kwargs=data) - return {"id": job.id, "url": f"/queue/response/{job.id}", "eta": 5, "status": "queued"} - else: + + # users can pass in 'user' param to /chat/completions. Don't override it + if data.get("user", None) is None and user_api_key_dict.user_id is not None: + # if users are using user_api_key_auth, set `user` in `data` + data["user"] = user_api_key_dict.user_id + + if "metadata" in data: + print(f'received metadata: {data["metadata"]}') + data["metadata"]["user_api_key"] = user_api_key_dict.api_key + data["metadata"]["headers"] = dict(request.headers) + else: + data["metadata"] = {"user_api_key": user_api_key_dict.api_key} + data["metadata"]["headers"] = dict(request.headers) + + global user_temperature, user_request_timeout, user_max_tokens, user_api_base + # override with user settings, these are params passed via cli + if user_temperature: + data["temperature"] = user_temperature + if user_request_timeout: + data["request_timeout"] = user_request_timeout + if user_max_tokens: + data["max_tokens"] = user_max_tokens + if user_api_base: + data["api_base"] = user_api_base + + response = await asyncio.wait_for(_litellm_chat_completions_worker(data=data, user_api_key_dict=user_api_key_dict), timeout=litellm.request_timeout) + + if 'stream' in data and data['stream'] == True: # use generate_responses to stream responses + return StreamingResponse(async_data_generator(user_api_key_dict=user_api_key_dict, response=response), media_type='text/event-stream') + + background_tasks.add_task(log_input_output, request, response) # background task for logging to OTEL + return response + except Exception as e: + await proxy_logging_obj.post_call_failure_hook(user_api_key_dict=user_api_key_dict, original_exception=e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail={"error": "Queue not initialized"}, + detail={"error": str(e)}, ) - -@router.get("/queue/response/{task_id}", dependencies=[Depends(user_api_key_auth)]) -async def async_queue_response(request: Request, task_id: str): - global celery_app_conn, async_result - try: - if celery_app_conn is not None and async_result is not None: - job = async_result(task_id, app=celery_app_conn) - if job.ready(): - return {"status": "finished", "result": job.result} - else: - return {'status': 'queued'} - else: - raise Exception() - except Exception as e: - return {"status": "finished", "result": str(e)} - - + + @router.get("/ollama_logs", dependencies=[Depends(user_api_key_auth)]) async def retrieve_server_log(request: Request): filepath = os.path.expanduser("~/.ollama/logs/server.log") diff --git a/litellm/tests/test_ollama_local.py b/litellm/tests/test_ollama_local.py index 05dd9c646..db0d78737 100644 --- a/litellm/tests/test_ollama_local.py +++ b/litellm/tests/test_ollama_local.py @@ -1,158 +1,171 @@ -# ##### THESE TESTS CAN ONLY RUN LOCALLY WITH THE OLLAMA SERVER RUNNING ###### -# # https://ollama.ai/ +##### THESE TESTS CAN ONLY RUN LOCALLY WITH THE OLLAMA SERVER RUNNING ###### +# https://ollama.ai/ -# import sys, os -# import traceback -# from dotenv import load_dotenv -# load_dotenv() -# import os -# sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path -# import pytest -# import litellm -# from litellm import embedding, completion -# import asyncio +import sys, os +import traceback +from dotenv import load_dotenv +load_dotenv() +import os +sys.path.insert(0, os.path.abspath('../..')) # Adds the parent directory to the system path +import pytest +import litellm +from litellm import embedding, completion +import asyncio -# user_message = "respond in 20 words. who are you?" -# messages = [{ "content": user_message,"role": "user"}] +user_message = "respond in 20 words. who are you?" +messages = [{ "content": user_message,"role": "user"}] -# def test_completion_ollama(): -# try: -# response = completion( -# model="ollama/llama2", -# messages=messages, -# max_tokens=200, -# request_timeout = 10, +async def test_async_ollama_streaming(): + try: + litellm.set_verbose = True + response = await litellm.acompletion(model="ollama/mistral-openorca", + messages=[{"role": "user", "content": "Hey, how's it going?"}], + stream=True) + async for chunk in response: + print(chunk) + except Exception as e: + print(e) -# ) -# print(response) -# except Exception as e: -# pytest.fail(f"Error occurred: {e}") +asyncio.run(test_async_ollama_streaming()) + +def test_completion_ollama(): + try: + response = completion( + model="ollama/llama2", + messages=messages, + max_tokens=200, + request_timeout = 10, + + ) + print(response) + except Exception as e: + pytest.fail(f"Error occurred: {e}") # test_completion_ollama() -# def test_completion_ollama_with_api_base(): -# try: -# response = completion( -# model="ollama/llama2", -# messages=messages, -# api_base="http://localhost:11434" -# ) -# print(response) -# except Exception as e: -# pytest.fail(f"Error occurred: {e}") +def test_completion_ollama_with_api_base(): + try: + response = completion( + model="ollama/llama2", + messages=messages, + api_base="http://localhost:11434" + ) + print(response) + except Exception as e: + pytest.fail(f"Error occurred: {e}") # test_completion_ollama_with_api_base() -# def test_completion_ollama_custom_prompt_template(): -# user_message = "what is litellm?" -# litellm.register_prompt_template( -# model="ollama/llama2", -# roles={ -# "system": {"pre_message": "System: "}, -# "user": {"pre_message": "User: "}, -# "assistant": {"pre_message": "Assistant: "} -# } -# ) -# messages = [{ "content": user_message,"role": "user"}] -# litellm.set_verbose = True -# try: -# response = completion( -# model="ollama/llama2", -# messages=messages, -# stream=True -# ) -# print(response) -# for chunk in response: -# print(chunk) -# # print(chunk['choices'][0]['delta']) +def test_completion_ollama_custom_prompt_template(): + user_message = "what is litellm?" + litellm.register_prompt_template( + model="ollama/llama2", + roles={ + "system": {"pre_message": "System: "}, + "user": {"pre_message": "User: "}, + "assistant": {"pre_message": "Assistant: "} + } + ) + messages = [{ "content": user_message,"role": "user"}] + litellm.set_verbose = True + try: + response = completion( + model="ollama/llama2", + messages=messages, + stream=True + ) + print(response) + for chunk in response: + print(chunk) + # print(chunk['choices'][0]['delta']) -# except Exception as e: -# traceback.print_exc() -# pytest.fail(f"Error occurred: {e}") + except Exception as e: + traceback.print_exc() + pytest.fail(f"Error occurred: {e}") # test_completion_ollama_custom_prompt_template() -# async def test_completion_ollama_async_stream(): -# user_message = "what is the weather" -# messages = [{ "content": user_message,"role": "user"}] -# try: -# response = await litellm.acompletion( -# model="ollama/llama2", -# messages=messages, -# api_base="http://localhost:11434", -# stream=True -# ) -# async for chunk in response: -# print(chunk['choices'][0]['delta']) +async def test_completion_ollama_async_stream(): + user_message = "what is the weather" + messages = [{ "content": user_message,"role": "user"}] + try: + response = await litellm.acompletion( + model="ollama/llama2", + messages=messages, + api_base="http://localhost:11434", + stream=True + ) + async for chunk in response: + print(chunk['choices'][0]['delta']) -# print("TEST ASYNC NON Stream") -# response = await litellm.acompletion( -# model="ollama/llama2", -# messages=messages, -# api_base="http://localhost:11434", -# ) -# print(response) -# except Exception as e: -# pytest.fail(f"Error occurred: {e}") + print("TEST ASYNC NON Stream") + response = await litellm.acompletion( + model="ollama/llama2", + messages=messages, + api_base="http://localhost:11434", + ) + print(response) + except Exception as e: + pytest.fail(f"Error occurred: {e}") # import asyncio # asyncio.run(test_completion_ollama_async_stream()) -# def prepare_messages_for_chat(text: str) -> list: -# messages = [ -# {"role": "user", "content": text}, -# ] -# return messages +def prepare_messages_for_chat(text: str) -> list: + messages = [ + {"role": "user", "content": text}, + ] + return messages -# async def ask_question(): -# params = { -# "messages": prepare_messages_for_chat("What is litellm? tell me 10 things about it who is sihaan.write an essay"), -# "api_base": "http://localhost:11434", -# "model": "ollama/llama2", -# "stream": True, -# } -# response = await litellm.acompletion(**params) -# return response +async def ask_question(): + params = { + "messages": prepare_messages_for_chat("What is litellm? tell me 10 things about it who is sihaan.write an essay"), + "api_base": "http://localhost:11434", + "model": "ollama/llama2", + "stream": True, + } + response = await litellm.acompletion(**params) + return response -# async def main(): -# response = await ask_question() -# async for chunk in response: -# print(chunk) +async def main(): + response = await ask_question() + async for chunk in response: + print(chunk) -# print("test async completion without streaming") -# response = await litellm.acompletion( -# model="ollama/llama2", -# messages=prepare_messages_for_chat("What is litellm? respond in 2 words"), -# ) -# print("response", response) + print("test async completion without streaming") + response = await litellm.acompletion( + model="ollama/llama2", + messages=prepare_messages_for_chat("What is litellm? respond in 2 words"), + ) + print("response", response) -# def test_completion_expect_error(): -# # this tests if we can exception map correctly for ollama -# print("making ollama request") -# # litellm.set_verbose=True -# user_message = "what is litellm?" -# messages = [{ "content": user_message,"role": "user"}] -# try: -# response = completion( -# model="ollama/invalid", -# messages=messages, -# stream=True -# ) -# print(response) -# for chunk in response: -# print(chunk) -# # print(chunk['choices'][0]['delta']) +def test_completion_expect_error(): + # this tests if we can exception map correctly for ollama + print("making ollama request") + # litellm.set_verbose=True + user_message = "what is litellm?" + messages = [{ "content": user_message,"role": "user"}] + try: + response = completion( + model="ollama/invalid", + messages=messages, + stream=True + ) + print(response) + for chunk in response: + print(chunk) + # print(chunk['choices'][0]['delta']) -# except Exception as e: -# pass -# pytest.fail(f"Error occurred: {e}") + except Exception as e: + pass + pytest.fail(f"Error occurred: {e}") # test_completion_expect_error() diff --git a/litellm/utils.py b/litellm/utils.py index 8ab0da414..bdd935119 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -1079,7 +1079,8 @@ class Logging: # print_verbose(f"final set of received chunks: {self.streaming_chunks}") try: complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks, messages=self.model_call_details.get("messages", None)) - except: + except Exception as e: + print_verbose(f"Error occurred building stream chunk: {traceback.format_exc()}") complete_streaming_response = None else: self.streaming_chunks.append(result) @@ -5953,14 +5954,20 @@ class CustomStreamWrapper: or self.custom_llm_provider == "custom_openai" or self.custom_llm_provider == "text-completion-openai" or self.custom_llm_provider == "huggingface" + or self.custom_llm_provider == "ollama" or self.custom_llm_provider == "vertex_ai"): + print_verbose(f"INSIDE ASYNC STREAMING!!!") + print_verbose(f"value of async completion stream: {self.completion_stream}") async for chunk in self.completion_stream: + print_verbose(f"value of async chunk: {chunk}") if chunk == "None" or chunk is None: raise Exception # chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks. # __anext__ also calls async_success_handler, which does logging + print_verbose(f"PROCESSED ASYNC CHUNK PRE CHUNK CREATOR: {chunk}") processed_chunk = self.chunk_creator(chunk=chunk) + print_verbose(f"PROCESSED ASYNC CHUNK POST CHUNK CREATOR: {processed_chunk}") if processed_chunk is None: continue ## LOGGING diff --git a/pyproject.toml b/pyproject.toml index 9a900336b..b93e3eb90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "litellm" -version = "1.14.9" +version = "1.14.10" description = "Library to easily interface with LLM API providers" authors = ["BerriAI"] license = "MIT License" @@ -55,7 +55,7 @@ requires = ["poetry-core", "wheel"] build-backend = "poetry.core.masonry.api" [tool.commitizen] -version = "1.14.9" +version = "1.14.10" version_files = [ "pyproject.toml:^version" ]