diff --git a/docs/my-website/docs/scheduler.md b/docs/my-website/docs/scheduler.md new file mode 100644 index 000000000..347406ade --- /dev/null +++ b/docs/my-website/docs/scheduler.md @@ -0,0 +1,141 @@ +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +# [BETA] Request Prioritization + +:::info + +Beta feature. Use for testing only. + +[Help us improve this](https://github.com/BerriAI/litellm/issues) +::: + +Prioritize LLM API requests in high-traffic. + +- Add request to priority queue +- Poll queue, to check if request can be made. Returns 'True': + * if there's healthy deployments + * OR if request is at top of queue +- Priority - The lower the number, the higher the priority: + * e.g. `priority=0` > `priority=2000` + +## Quick Start + +```python +from litellm import Scheduler, FlowItem, Router + +scheduler = Scheduler() + +router = Router( + model_list=[ + { + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "gpt-3.5-turbo", + "mock_response": "Hello world this is Macintosh!", # fakes the LLM API call + "rpm": 1, + }, + }, + ], + timeout=2, # timeout request if takes > 2s + routing_strategy="usage-based-routing-v2", +) + +scheduler.update_variables(llm_router=router) + +### 🚨 IMPORTANT ### + +item = FlowItem( + priority=0, # 👈 SET PRIORITY FOR REQUEST + request_id=str(uuid.uuid4()), # 👈 SET REQUEST ID + model_name="gpt-3.5-turbo" # 👈 SAME as 'Router' +) + +### [fin] IMPORTANT ### + +## ADDS REQUEST TO QUEUE ## +await scheduler.add_request(request=item) + +## POLL QUEUE +default_timeout = router.timeout +end_time = time.time() + default_timeout +poll_interval = 0.03 # poll every 3ms +curr_time = time.time() + +make_request = False + +while curr_time < end_time: + make_request = await scheduler.poll( ## POLL QUEUE ## - returns 'True' if there's healthy deployments OR if request is at top of queue + id=item.request_id, model_name=item.model_name + ) + if make_request: ## IF TRUE -> MAKE REQUEST + break + else: ## ELSE -> loop till default_timeout + await asyncio.sleep(poll_interval) + curr_time = time.time() + +if make_request: + try: + _response = await router.acompletion( + model=item.model_name, + messages=[{"role": "user", "content": "Hey!"}], + ) + except Exception as e: + print("{}, {}, {}".format(item.priority, item.request_id, "Error occurred")) + + print("{}, {}, {}".format(item.priority, item.request_id, time.time())) + +print("didn't make request") +``` + +## LiteLLM Proxy + +To prioritize requests on LiteLLM Proxy call our beta openai-compatible `http://localhost:4000/queue` endpoint. + + + + +```curl +curl -X POST 'http://localhost:4000/queue/chat/completions' \ +-H 'Content-Type: application/json' \ +-H 'Authorization: Bearer sk-1234' \ +-D '{ + "model": "gpt-3.5-turbo-fake-model", + "messages": [ + { + "role": "user", + "content": "what is the meaning of the universe? 1234" + }], + "priority": 0 👈 SET VALUE HERE +}' +``` + + + + +```python +import openai +client = openai.OpenAI( + api_key="anything", + base_url="http://0.0.0.0:4000" +) + +# request sent to model set on litellm proxy, `litellm --model` +response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages = [ + { + "role": "user", + "content": "this is a test request, write a short poem" + } + ], + extra_body={ + "priority": 0 👈 SET VALUE HERE + } +) + +print(response) +``` + + + \ No newline at end of file diff --git a/docs/my-website/sidebars.js b/docs/my-website/sidebars.js index 29095d41f..d5c6f8bc2 100644 --- a/docs/my-website/sidebars.js +++ b/docs/my-website/sidebars.js @@ -164,6 +164,7 @@ const sidebars = { }, "proxy/custom_pricing", "routing", + "scheduler", "rules", "set_keys", "budget_manager", diff --git a/litellm/__init__.py b/litellm/__init__.py index 9fa801318..6097b8c94 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -805,3 +805,4 @@ from .proxy.proxy_cli import run_server from .router import Router from .assistants.main import * from .batches.main import * +from .scheduler import * diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 3a0efd653..f5289b024 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -141,7 +141,7 @@ from litellm.proxy.auth.auth_checks import ( from litellm.llms.custom_httpx.httpx_handler import HTTPHandler from litellm.exceptions import RejectedRequestError from litellm.integrations.slack_alerting import SlackAlertingArgs, SlackAlerting -from litellm.proxy.queue.scheduler import Scheduler, FlowItem, DefaultPriorities +from litellm.scheduler import Scheduler, FlowItem, DefaultPriorities try: from litellm._version import version @@ -11305,7 +11305,7 @@ async def async_queue_request( flow_item = FlowItem( priority=data.pop("priority", DefaultPriorities.Medium.value), request_id=request_id, - model_group=data["model"], + model_name=data["model"], ) # [TODO] only allow premium users to set non default priorities @@ -11330,9 +11330,7 @@ async def async_queue_request( ) while curr_time < end_time: - make_request = await scheduler.poll( - id=request_id, model_group=data["model"] - ) + make_request = await scheduler.poll(id=request_id, model_name=data["model"]) if make_request: ## IF TRUE -> MAKE REQUEST break else: ## ELSE -> loop till default_timeout diff --git a/litellm/proxy/queue/scheduler.py b/litellm/scheduler.py similarity index 70% rename from litellm/proxy/queue/scheduler.py rename to litellm/scheduler.py index 79a282d96..3bbd3916e 100644 --- a/litellm/proxy/queue/scheduler.py +++ b/litellm/scheduler.py @@ -20,7 +20,7 @@ class DefaultPriorities(enum.Enum): class FlowItem(BaseModel): priority: int # Priority between 0 and 255 request_id: str - model_group: str + model_name: str class Scheduler: @@ -39,16 +39,26 @@ class Scheduler: async def add_request(self, request: FlowItem): # We use the priority directly, as lower values indicate higher priority # get the queue - queue = await self.get_queue(model_group=request.model_group) + queue = await self.get_queue(model_name=request.model_name) # update the queue heapq.heappush(queue, (request.priority, request.request_id)) # save the queue - await self.save_queue(queue=queue, model_group=request.model_group) + await self.save_queue(queue=queue, model_name=request.model_name) - async def poll(self, id: str, model_group: str) -> bool: - """Return if the id is at the top of the queue and if the token bucket allows processing""" - queue = await self.get_queue(model_group=model_group) + async def poll(self, id: str, model_name: str) -> bool: + """ + Return if request can be processed. + + Returns: + - True: + * If healthy deployments are available + * OR If request at the top of queue + - False: + * If no healthy deployments available + * AND request not at the top of queue + """ + queue = await self.get_queue(model_name=model_name) if not queue or not self.llm_router: raise Exception( "Incorrectly setup. Queue or Router is invalid. Queue={}, Router={}".format( @@ -60,26 +70,26 @@ class Scheduler: # Setup values # ------------ _healthy_deployments = await self.llm_router._async_get_healthy_deployments( - model=model_group + model=model_name ) print_verbose(f"len(_healthy_deployments): {len(_healthy_deployments)}") if len(_healthy_deployments) == 0: - return False + print_verbose(f"queue: {queue}, seeking id={id}") + # Check if the id is at the top of the heap + if queue[0][1] == id: + # Remove the item from the queue + heapq.heappop(queue) + print_verbose(f"Popped id: {id}") + return True + else: + return False - print_verbose(f"queue: {queue}, seeking id={id}") - # Check if the id is at the top of the heap - if queue[0][1] == id: - # Remove the item from the queue - heapq.heappop(queue) - print_verbose(f"Popped id: {id}") - return True + return True - return False - - async def peek(self, id: str, model_group: str) -> bool: + async def peek(self, id: str, model_name: str) -> bool: """Return if the id is at the top of the queue. Don't pop the value from heap.""" - queue = await self.get_queue(model_group=model_group) + queue = await self.get_queue(model_name=model_name) if not queue or not self.llm_router: raise Exception( "Incorrectly setup. Queue or Router is invalid. Queue={}, Router={}".format( @@ -91,7 +101,7 @@ class Scheduler: # Setup values # ------------ _healthy_deployments = await self.llm_router._async_get_healthy_deployments( - model=model_group + model=model_name ) if len(_healthy_deployments) == 0: return False @@ -106,12 +116,12 @@ class Scheduler: """Get the status of items in the queue""" return self.queue - async def get_queue(self, model_group: str) -> list: + async def get_queue(self, model_name: str) -> list: """ Return a queue for that specific model group """ if self.cache is not None: - _cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_group) + _cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_name) response = await self.cache.async_get_cache(key=_cache_key) if response is None or not isinstance(response, list): return [] @@ -119,11 +129,11 @@ class Scheduler: return response return self.queue - async def save_queue(self, queue: list, model_group: str) -> None: + async def save_queue(self, queue: list, model_name: str) -> None: """ Save the updated queue of the model group """ if self.cache is not None: - _cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_group) + _cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_name) await self.cache.async_set_cache(key=_cache_key, value=queue) return None diff --git a/litellm/tests/test_scheduler.py b/litellm/tests/test_scheduler.py index 1177bc12c..2e48eab3c 100644 --- a/litellm/tests/test_scheduler.py +++ b/litellm/tests/test_scheduler.py @@ -9,11 +9,11 @@ sys.path.insert( 0, os.path.abspath("../..") ) # Adds the parent directory to the system path from litellm import Router -from litellm.proxy.queue.scheduler import FlowItem, Scheduler +from litellm.scheduler import FlowItem, Scheduler @pytest.mark.asyncio -async def test_scheduler_diff_model_groups(): +async def test_scheduler_diff_model_names(): """ Assert 2 requests to 2 diff model groups are top of their respective queue's """ @@ -33,13 +33,13 @@ async def test_scheduler_diff_model_groups(): scheduler.update_variables(llm_router=router) - item1 = FlowItem(priority=0, request_id="10", model_group="gpt-3.5-turbo") - item2 = FlowItem(priority=0, request_id="11", model_group="gpt-4") + item1 = FlowItem(priority=0, request_id="10", model_name="gpt-3.5-turbo") + item2 = FlowItem(priority=0, request_id="11", model_name="gpt-4") await scheduler.add_request(item1) await scheduler.add_request(item2) - assert await scheduler.poll(id="10", model_group="gpt-3.5-turbo") == True - assert await scheduler.poll(id="11", model_group="gpt-4") == True + assert await scheduler.poll(id="10", model_name="gpt-3.5-turbo") == True + assert await scheduler.poll(id="11", model_name="gpt-4") == True @pytest.mark.parametrize("p0, p1", [(0, 0), (0, 1), (1, 0)]) @@ -64,17 +64,17 @@ async def test_scheduler_prioritized_requests(p0, p1): scheduler.update_variables(llm_router=router) - item1 = FlowItem(priority=p0, request_id="10", model_group="gpt-3.5-turbo") - item2 = FlowItem(priority=p1, request_id="11", model_group="gpt-3.5-turbo") + item1 = FlowItem(priority=p0, request_id="10", model_name="gpt-3.5-turbo") + item2 = FlowItem(priority=p1, request_id="11", model_name="gpt-3.5-turbo") await scheduler.add_request(item1) await scheduler.add_request(item2) if p0 == 0: - assert await scheduler.peek(id="10", model_group="gpt-3.5-turbo") == True - assert await scheduler.peek(id="11", model_group="gpt-3.5-turbo") == False + assert await scheduler.peek(id="10", model_name="gpt-3.5-turbo") == True + assert await scheduler.peek(id="11", model_name="gpt-3.5-turbo") == False else: - assert await scheduler.peek(id="11", model_group="gpt-3.5-turbo") == True - assert await scheduler.peek(id="10", model_group="gpt-3.5-turbo") == False + assert await scheduler.peek(id="11", model_name="gpt-3.5-turbo") == True + assert await scheduler.peek(id="10", model_name="gpt-3.5-turbo") == False @pytest.mark.parametrize("p0, p1", [(0, 0), (0, 1), (1, 0)]) @@ -92,10 +92,12 @@ async def test_scheduler_prioritized_requests_mock_response(p0, p1): "litellm_params": { "model": "gpt-3.5-turbo", "mock_response": "Hello world this is Macintosh!", + "rpm": 1, }, }, ], timeout=2, + routing_strategy="usage-based-routing-v2", ) scheduler.update_variables(llm_router=router) @@ -114,7 +116,7 @@ async def test_scheduler_prioritized_requests_mock_response(p0, p1): while curr_time < end_time: make_request = await scheduler.poll( - id=flow_item.request_id, model_group=flow_item.model_group + id=flow_item.request_id, model_name=flow_item.model_name ) if make_request: ## IF TRUE -> MAKE REQUEST break @@ -123,10 +125,13 @@ async def test_scheduler_prioritized_requests_mock_response(p0, p1): curr_time = time.time() if make_request: - _response = await router.acompletion( - model=flow_item.model_group, - messages=[{"role": "user", "content": "Hey!"}], - ) + try: + _response = await router.acompletion( + model=flow_item.model_name, + messages=[{"role": "user", "content": "Hey!"}], + ) + except Exception as e: + return flow_item.priority, flow_item.request_id, "Error occurred" return flow_item.priority, flow_item.request_id, time.time() @@ -135,13 +140,13 @@ async def test_scheduler_prioritized_requests_mock_response(p0, p1): tasks = [] item = FlowItem( - priority=p0, request_id=str(uuid.uuid4()), model_group="gpt-3.5-turbo" + priority=p0, request_id=str(uuid.uuid4()), model_name="gpt-3.5-turbo" ) await scheduler.add_request(request=item) tasks.append(_make_prioritized_call(flow_item=item)) item = FlowItem( - priority=p1, request_id=str(uuid.uuid4()), model_group="gpt-3.5-turbo" + priority=p1, request_id=str(uuid.uuid4()), model_name="gpt-3.5-turbo" ) await scheduler.add_request(request=item) tasks.append(_make_prioritized_call(flow_item=item)) @@ -157,8 +162,4 @@ async def test_scheduler_prioritized_requests_mock_response(p0, p1): assert ( completed_responses[0][0] == 0 ) # assert higher priority request got done first - assert ( - completed_responses[0][2] < completed_responses[1][2] - ), "1st response time={}, 2nd response time={}".format( - completed_responses[0][1], completed_responses[1][1] - ) # assert higher priority request got done first + assert isinstance(completed_responses[1][2], str) # 2nd request errored out