From 3bc6b5d11927d166e6590e2be293fe12e91be68b Mon Sep 17 00:00:00 2001 From: sumanth Date: Fri, 3 May 2024 10:50:45 +0530 Subject: [PATCH 1/2] usage-based-routing-ttl-on-cache --- litellm/router_strategy/lowest_tpm_rpm.py | 8 ++-- litellm/router_strategy/lowest_tpm_rpm_v2.py | 8 ++-- litellm/tests/test_router_caching.py | 49 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/litellm/router_strategy/lowest_tpm_rpm.py b/litellm/router_strategy/lowest_tpm_rpm.py index 0a7773a84b..a58eb9e02e 100644 --- a/litellm/router_strategy/lowest_tpm_rpm.py +++ b/litellm/router_strategy/lowest_tpm_rpm.py @@ -59,13 +59,13 @@ class LowestTPMLoggingHandler(CustomLogger): request_count_dict = self.router_cache.get_cache(key=tpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + total_tokens - self.router_cache.set_cache(key=tpm_key, value=request_count_dict) + self.router_cache.set_cache(key=tpm_key, value=request_count_dict, ttl= 60) ## RPM request_count_dict = self.router_cache.get_cache(key=rpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + 1 - self.router_cache.set_cache(key=rpm_key, value=request_count_dict) + self.router_cache.set_cache(key=rpm_key, value=request_count_dict, ttl= 60) ### TESTING ### if self.test_flag: @@ -110,13 +110,13 @@ class LowestTPMLoggingHandler(CustomLogger): request_count_dict = self.router_cache.get_cache(key=tpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + total_tokens - self.router_cache.set_cache(key=tpm_key, value=request_count_dict) + self.router_cache.set_cache(key=tpm_key, value=request_count_dict, ttl= 60) ## RPM request_count_dict = self.router_cache.get_cache(key=rpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + 1 - self.router_cache.set_cache(key=rpm_key, value=request_count_dict) + self.router_cache.set_cache(key=rpm_key, value=request_count_dict, ttl= 60) ### TESTING ### if self.test_flag: diff --git a/litellm/router_strategy/lowest_tpm_rpm_v2.py b/litellm/router_strategy/lowest_tpm_rpm_v2.py index f7a55d9709..59004c22b9 100644 --- a/litellm/router_strategy/lowest_tpm_rpm_v2.py +++ b/litellm/router_strategy/lowest_tpm_rpm_v2.py @@ -91,7 +91,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): ) else: # if local result below limit, check redis ## prevent unnecessary redis checks - result = self.router_cache.increment_cache(key=rpm_key, value=1) + result = self.router_cache.increment_cache(key=rpm_key, value=1, ttl = 60) if result is not None and result > deployment_rpm: raise litellm.RateLimitError( message="Deployment over defined rpm limit={}. current usage={}".format( @@ -170,7 +170,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): else: # if local result below limit, check redis ## prevent unnecessary redis checks result = await self.router_cache.async_increment_cache( - key=rpm_key, value=1 + key=rpm_key, value=1, ttl = 60 ) if result is not None and result > deployment_rpm: raise litellm.RateLimitError( @@ -231,7 +231,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): # update cache ## TPM - self.router_cache.increment_cache(key=tpm_key, value=total_tokens) + self.router_cache.increment_cache(key=tpm_key, value=total_tokens, ttl = 60) ### TESTING ### if self.test_flag: self.logged_success += 1 @@ -275,7 +275,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): ## TPM await self.router_cache.async_increment_cache( - key=tpm_key, value=total_tokens + key=tpm_key, value=total_tokens, ttl = 60 ) ### TESTING ### diff --git a/litellm/tests/test_router_caching.py b/litellm/tests/test_router_caching.py index ebace161c9..ed1ddac6ee 100644 --- a/litellm/tests/test_router_caching.py +++ b/litellm/tests/test_router_caching.py @@ -134,6 +134,55 @@ async def test_acompletion_caching_on_router(): traceback.print_exc() pytest.fail(f"Error occurred: {e}") +@pytest.mark.asyncio +async def test_completion_caching_on_router(): + # tests completion + caching on router + try: + litellm.set_verbose = True + model_list = [ + { + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "gpt-3.5-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000, + "rpm": 1, + }, + ] + + messages = [ + {"role": "user", "content": f"write a one sentence poem {time.time()}?"} + ] + router = Router( + model_list=model_list, + redis_host=os.environ["REDIS_HOST"], + redis_password=os.environ["REDIS_PASSWORD"], + redis_port=os.environ["REDIS_PORT"], + cache_responses=True, + timeout=30, + routing_strategy="usage-based-routing", + ) + response1 = await router.completion( + model="gpt-3.5-turbo", messages=messages, temperature=1 + ) + print(f"response1: {response1}") + await asyncio.sleep(60) + response2 = await router.completion( + model="gpt-3.5-turbo", messages=messages, temperature=1 + ) + print(f"response2: {response2}") + assert len(response1.choices[0].message.content) > 0 + assert len(response2.choices[0].message.content) > 0 + + router.reset() + except litellm.Timeout as e: + end_time = time.time() + print(f"timeout error occurred: {end_time - start_time}") + pass + except Exception as e: + traceback.print_exc() + pytest.fail(f"Error occurred: {e}") @pytest.mark.asyncio async def test_acompletion_caching_with_ttl_on_router(): From 71e0294485ea274102d2678d9231b553878e830b Mon Sep 17 00:00:00 2001 From: sumanth Date: Tue, 14 May 2024 10:05:19 +0530 Subject: [PATCH 2/2] addressed comments --- litellm/router.py | 8 ++++-- litellm/router_strategy/lowest_tpm_rpm.py | 27 +++++++++++++++----- litellm/router_strategy/lowest_tpm_rpm_v2.py | 27 +++++++++++++++----- litellm/tests/test_router_caching.py | 3 ++- 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/litellm/router.py b/litellm/router.py index e524937aea..3ac8484f89 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -340,13 +340,17 @@ class Router: litellm.callbacks.append(self.leastbusy_logger) # type: ignore elif routing_strategy == "usage-based-routing": self.lowesttpm_logger = LowestTPMLoggingHandler( - router_cache=self.cache, model_list=self.model_list + router_cache=self.cache, + model_list=self.model_list, + routing_args=routing_strategy_args ) if isinstance(litellm.callbacks, list): litellm.callbacks.append(self.lowesttpm_logger) # type: ignore elif routing_strategy == "usage-based-routing-v2": self.lowesttpm_logger_v2 = LowestTPMLoggingHandler_v2( - router_cache=self.cache, model_list=self.model_list + router_cache=self.cache, + model_list=self.model_list, + routing_args=routing_strategy_args ) if isinstance(litellm.callbacks, list): litellm.callbacks.append(self.lowesttpm_logger_v2) # type: ignore diff --git a/litellm/router_strategy/lowest_tpm_rpm.py b/litellm/router_strategy/lowest_tpm_rpm.py index 6e78bbe414..15460051b8 100644 --- a/litellm/router_strategy/lowest_tpm_rpm.py +++ b/litellm/router_strategy/lowest_tpm_rpm.py @@ -1,6 +1,6 @@ #### What this does #### # identifies lowest tpm deployment - +from pydantic import BaseModel import dotenv, os, requests, random from typing import Optional, Union, List, Dict from datetime import datetime @@ -11,16 +11,31 @@ from litellm.integrations.custom_logger import CustomLogger from litellm._logging import verbose_router_logger from litellm.utils import print_verbose +class LiteLLMBase(BaseModel): + """ + Implements default functions, all pydantic objects should have. + """ + def json(self, **kwargs): + try: + return self.model_dump() # noqa + except: + # if using pydantic v1 + return self.dict() + +class RoutingArgs(LiteLLMBase): + ttl: int = 1 * 60 # 1min (RPM/TPM expire key) + class LowestTPMLoggingHandler(CustomLogger): test_flag: bool = False logged_success: int = 0 logged_failure: int = 0 default_cache_time_seconds: int = 1 * 60 * 60 # 1 hour - def __init__(self, router_cache: DualCache, model_list: list): + def __init__(self, router_cache: DualCache, model_list: list, routing_args: dict = {}): self.router_cache = router_cache self.model_list = model_list + self.routing_args = RoutingArgs(**routing_args) def log_success_event(self, kwargs, response_obj, start_time, end_time): try: @@ -57,13 +72,13 @@ class LowestTPMLoggingHandler(CustomLogger): request_count_dict = self.router_cache.get_cache(key=tpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + total_tokens - self.router_cache.set_cache(key=tpm_key, value=request_count_dict, ttl= 60) + self.router_cache.set_cache(key=tpm_key, value=request_count_dict, ttl=self.routing_args.ttl) ## RPM request_count_dict = self.router_cache.get_cache(key=rpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + 1 - self.router_cache.set_cache(key=rpm_key, value=request_count_dict, ttl= 60) + self.router_cache.set_cache(key=rpm_key, value=request_count_dict, ttl=self.routing_args.ttl) ### TESTING ### if self.test_flag: @@ -108,13 +123,13 @@ class LowestTPMLoggingHandler(CustomLogger): request_count_dict = self.router_cache.get_cache(key=tpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + total_tokens - self.router_cache.set_cache(key=tpm_key, value=request_count_dict, ttl= 60) + self.router_cache.set_cache(key=tpm_key, value=request_count_dict, ttl=self.routing_args.ttl) ## RPM request_count_dict = self.router_cache.get_cache(key=rpm_key) or {} request_count_dict[id] = request_count_dict.get(id, 0) + 1 - self.router_cache.set_cache(key=rpm_key, value=request_count_dict, ttl= 60) + self.router_cache.set_cache(key=rpm_key, value=request_count_dict, ttl=self.routing_args.ttl) ### TESTING ### if self.test_flag: diff --git a/litellm/router_strategy/lowest_tpm_rpm_v2.py b/litellm/router_strategy/lowest_tpm_rpm_v2.py index 401e3eae58..40e75031ad 100644 --- a/litellm/router_strategy/lowest_tpm_rpm_v2.py +++ b/litellm/router_strategy/lowest_tpm_rpm_v2.py @@ -1,6 +1,6 @@ #### What this does #### # identifies lowest tpm deployment - +from pydantic import BaseModel import dotenv, os, requests, random from typing import Optional, Union, List, Dict import datetime as datetime_og @@ -14,6 +14,20 @@ from litellm._logging import verbose_router_logger from litellm.utils import print_verbose, get_utc_datetime from litellm.types.router import RouterErrors +class LiteLLMBase(BaseModel): + """ + Implements default functions, all pydantic objects should have. + """ + + def json(self, **kwargs): + try: + return self.model_dump() # noqa + except: + # if using pydantic v1 + return self.dict() + +class RoutingArgs(LiteLLMBase): + ttl: int = 1 * 60 # 1min (RPM/TPM expire key) class LowestTPMLoggingHandler_v2(CustomLogger): """ @@ -33,9 +47,10 @@ class LowestTPMLoggingHandler_v2(CustomLogger): logged_failure: int = 0 default_cache_time_seconds: int = 1 * 60 * 60 # 1 hour - def __init__(self, router_cache: DualCache, model_list: list): + def __init__(self, router_cache: DualCache, model_list: list, routing_args: dict = {}): self.router_cache = router_cache self.model_list = model_list + self.routing_args = RoutingArgs(**routing_args) def pre_call_check(self, deployment: Dict) -> Optional[Dict]: """ @@ -89,7 +104,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): ) else: # if local result below limit, check redis ## prevent unnecessary redis checks - result = self.router_cache.increment_cache(key=rpm_key, value=1, ttl = 60) + result = self.router_cache.increment_cache(key=rpm_key, value=1, ttl=self.routing_args.ttl) if result is not None and result > deployment_rpm: raise litellm.RateLimitError( message="Deployment over defined rpm limit={}. current usage={}".format( @@ -168,7 +183,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): else: # if local result below limit, check redis ## prevent unnecessary redis checks result = await self.router_cache.async_increment_cache( - key=rpm_key, value=1, ttl = 60 + key=rpm_key, value=1, ttl=self.routing_args.ttl ) if result is not None and result > deployment_rpm: raise litellm.RateLimitError( @@ -229,7 +244,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): # update cache ## TPM - self.router_cache.increment_cache(key=tpm_key, value=total_tokens, ttl = 60) + self.router_cache.increment_cache(key=tpm_key, value=total_tokens, ttl=self.routing_args.ttl) ### TESTING ### if self.test_flag: self.logged_success += 1 @@ -273,7 +288,7 @@ class LowestTPMLoggingHandler_v2(CustomLogger): ## TPM await self.router_cache.async_increment_cache( - key=tpm_key, value=total_tokens, ttl = 60 + key=tpm_key, value=total_tokens, ttl=self.routing_args.ttl ) ### TESTING ### diff --git a/litellm/tests/test_router_caching.py b/litellm/tests/test_router_caching.py index ed1ddac6ee..a7ea322b52 100644 --- a/litellm/tests/test_router_caching.py +++ b/litellm/tests/test_router_caching.py @@ -161,13 +161,14 @@ async def test_completion_caching_on_router(): redis_port=os.environ["REDIS_PORT"], cache_responses=True, timeout=30, + routing_strategy_args={"ttl": 10}, routing_strategy="usage-based-routing", ) response1 = await router.completion( model="gpt-3.5-turbo", messages=messages, temperature=1 ) print(f"response1: {response1}") - await asyncio.sleep(60) + await asyncio.sleep(10) response2 = await router.completion( model="gpt-3.5-turbo", messages=messages, temperature=1 )