diff --git a/litellm/main.py b/litellm/main.py index 2bb3671979..d469dd8fe8 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -243,7 +243,7 @@ def completion( messages: List = [], functions: List = [], function_call: str = "", # optional params - timeout: Union[float, int] = 600.0, + timeout: Optional[Union[float, int]] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, n: Optional[int] = None, @@ -338,6 +338,8 @@ def completion( if mock_response: return mock_completion(model, messages, stream=stream, mock_response=mock_response) + if timeout is None: + timeout = 600 # set timeout for 10 minutes by default timeout = float(timeout) try: if base_url: diff --git a/litellm/router.py b/litellm/router.py index 1acc3ffd50..d645e082fa 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -299,4 +299,121 @@ class Router: for k, v in self.default_litellm_params.items(): if k not in data: # prioritize model-specific params > default router params data[k] = v - return await litellm.aembedding(**{**data, "input": input, "caching": self.cache_responses, **kwargs}) \ No newline at end of file + return await litellm.aembedding(**{**data, "input": input, "caching": self.cache_responses, **kwargs}) + + def deployment_callback( + self, + kwargs, # kwargs to completion + completion_response, # response from completion + start_time, end_time # start/end time + ): + """ + Function LiteLLM submits a callback to after a successful + completion. Purpose of this is ti update TPM/RPM usage per model + """ + model_name = kwargs.get('model', None) # i.e. gpt35turbo + custom_llm_provider = kwargs.get("litellm_params", {}).get('custom_llm_provider', None) # i.e. azure + if custom_llm_provider: + model_name = f"{custom_llm_provider}/{model_name}" + total_tokens = completion_response['usage']['total_tokens'] + self._set_deployment_usage(model_name, total_tokens) + + def get_available_deployment(self, + model: str, + messages: Optional[List[Dict[str, str]]] = None, + input: Optional[Union[str, List]] = None): + """ + Returns a deployment with the lowest TPM/RPM usage. + """ + # get list of potential deployments + potential_deployments = [] + for item in self.model_list: + if item["model_name"] == model: + potential_deployments.append(item) + + # set first model as current model to calculate token count + deployment = potential_deployments[0] + + # get encoding + token_count = 0 + if messages is not None: + token_count = litellm.token_counter(model=deployment["model_name"], messages=messages) + elif input is not None: + if isinstance(input, List): + input_text = "".join(text for text in input) + else: + input_text = input + token_count = litellm.token_counter(model=deployment["model_name"], text=input_text) + + # ----------------------- + # Find lowest used model + # ---------------------- + lowest_tpm = float("inf") + deployment = None + + # Go through all the models to get tpm, rpm + for item in potential_deployments: + item_tpm, item_rpm = self._get_deployment_usage(deployment_name=item["litellm_params"]["model"]) + + if item_tpm == 0: + return item + elif item_tpm + token_count > item["tpm"] or item_rpm + 1 >= item["rpm"]: + continue + elif item_tpm < lowest_tpm: + lowest_tpm = item_tpm + deployment = item + + # if none, raise exception + if deployment is None: + raise ValueError("No models available.") + + # return model + return deployment + + def _get_deployment_usage( + self, + deployment_name: str + ): + # ------------ + # Setup values + # ------------ + current_minute = datetime.now().strftime("%H-%M") + tpm_key = f'{deployment_name}:tpm:{current_minute}' + rpm_key = f'{deployment_name}:rpm:{current_minute}' + + # ------------ + # Return usage + # ------------ + tpm = self.cache.get_cache(cache_key=tpm_key) or 0 + rpm = self.cache.get_cache(cache_key=rpm_key) or 0 + + return int(tpm), int(rpm) + + def increment(self, key: str, increment_value: int): + # get value + cached_value = self.cache.get_cache(cache_key=key) + # update value + try: + cached_value = cached_value + increment_value + except: + cached_value = increment_value + # save updated value + self.cache.add_cache(result=cached_value, cache_key=key, ttl=self.default_cache_time_seconds) + + def _set_deployment_usage( + self, + model_name: str, + total_tokens: int + ): + # ------------ + # Setup values + # ------------ + current_minute = datetime.now().strftime("%H-%M") + tpm_key = f'{model_name}:tpm:{current_minute}' + rpm_key = f'{model_name}:rpm:{current_minute}' + + # ------------ + # Update usage + # ------------ + self.increment(tpm_key, total_tokens) + self.increment(rpm_key, 1) \ No newline at end of file diff --git a/litellm/tests/test_async_fn.py b/litellm/tests/test_async_fn.py index e470865263..bd3976ec2a 100644 --- a/litellm/tests/test_async_fn.py +++ b/litellm/tests/test_async_fn.py @@ -25,14 +25,16 @@ def test_sync_response(): # test_sync_response() def test_sync_response_anyscale(): - litellm.set_verbose = True + litellm.set_verbose = False user_message = "Hello, how are you?" messages = [{"content": user_message, "role": "user"}] try: - response = completion(model="anyscale/mistralai/Mistral-7B-Instruct-v0.1", messages=messages, timeout=5) + response = completion(model="anyscale/mistralai/Mistral-7B-Instruct-v0.1", messages=messages) except Exception as e: pytest.fail(f"An exception occurred: {e}") - +test_sync_response() +print(f"STARTING ANYSCALE RESPONSE") +test_sync_response_anyscale() # test_sync_response_anyscale() def test_async_response_openai(): diff --git a/litellm/tests/test_langfuse.py b/litellm/tests/test_langfuse.py index 115c54c9d1..b861888d60 100644 --- a/litellm/tests/test_langfuse.py +++ b/litellm/tests/test_langfuse.py @@ -11,18 +11,25 @@ litellm.num_retries = 3 litellm.success_callback = ["langfuse"] # litellm.set_verbose = True import time +import pytest def test_langfuse_logging_async(): - litellm.set_verbose = True - async def _test_langfuse(): - return await litellm.acompletion( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content":"This is a test"}], - max_tokens=1000, - temperature=0.7, - ) - response = asyncio.run(_test_langfuse()) - print(f"response: {response}") + try: + litellm.set_verbose = True + async def _test_langfuse(): + return await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content":"This is a test"}], + max_tokens=1000, + temperature=0.7, + timeout=5 + ) + response = asyncio.run(_test_langfuse()) + print(f"response: {response}") + except litellm.Timeout as e: + pass + except Exception as e: + pytest.fail(f"An exception occurred - {e}") # test_langfuse_logging_async() @@ -37,6 +44,8 @@ def test_langfuse_logging(): temperature=0.2 ) print(response) + except litellm.Timeout as e: + pass except Exception as e: print(e) @@ -59,6 +68,8 @@ def test_langfuse_logging_stream(): for chunk in response: pass # print(chunk) + except litellm.Timeout as e: + pass except Exception as e: print(e) @@ -79,6 +90,8 @@ def test_langfuse_logging_custom_generation_name(): } ) print(response) + except litellm.Timeout as e: + pass except Exception as e: print(e) @@ -112,6 +125,8 @@ def test_langfuse_logging_function_calling(): functions=function1, ) print(response) + except litellm.Timeout as e: + pass except Exception as e: print(e)