diff --git a/.gitignore b/.gitignore index b633e1d3d..8a9095b84 100644 --- a/.gitignore +++ b/.gitignore @@ -61,3 +61,4 @@ litellm/proxy/_experimental/out/model_hub/index.html litellm/proxy/_experimental/out/onboarding/index.html litellm/tests/log.txt litellm/tests/langfuse.log +litellm/tests/langfuse.log diff --git a/litellm/integrations/langfuse.py b/litellm/integrations/langfuse.py index eae8b8e22..794524684 100644 --- a/litellm/integrations/langfuse.py +++ b/litellm/integrations/langfuse.py @@ -36,9 +36,9 @@ class LangFuseLogger: self.langfuse_debug = os.getenv("LANGFUSE_DEBUG") parameters = { - "public_key": self.public_key, - "secret_key": self.secret_key, - "host": self.langfuse_host, + "public_key": "pk-lf-a65841e9-5192-4397-a679-cfff029fd5b0", + "secret_key": "sk-lf-d58c2891-3717-4f98-89dd-df44826215fd", + "host": "https://us.cloud.langfuse.com", "release": self.langfuse_release, "debug": self.langfuse_debug, "flush_interval": flush_interval, # flush interval in seconds @@ -311,22 +311,22 @@ class LangFuseLogger: try: tags = [] - try: - metadata = copy.deepcopy( - metadata - ) # Avoid modifying the original metadata - except: - new_metadata = {} - for key, value in metadata.items(): - if ( - isinstance(value, list) - or isinstance(value, dict) - or isinstance(value, str) - or isinstance(value, int) - or isinstance(value, float) - ): - new_metadata[key] = copy.deepcopy(value) - metadata = new_metadata + # try: + # metadata = copy.deepcopy( + # metadata + # ) # Avoid modifying the original metadata + # except: + new_metadata = {} + for key, value in metadata.items(): + if ( + isinstance(value, list) + or isinstance(value, dict) + or isinstance(value, str) + or isinstance(value, int) + or isinstance(value, float) + ): + new_metadata[key] = copy.deepcopy(value) + metadata = new_metadata supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3") supports_prompt = Version(langfuse.version.__version__) >= Version("2.7.3") diff --git a/litellm/proxy/_new_secret_config.yaml b/litellm/proxy/_new_secret_config.yaml index 78d7dc70c..16436c0ef 100644 --- a/litellm/proxy/_new_secret_config.yaml +++ b/litellm/proxy/_new_secret_config.yaml @@ -18,3 +18,7 @@ model_list: router_settings: enable_pre_call_checks: True + + +litellm_settings: + failure_callback: ["langfuse"] \ No newline at end of file diff --git a/litellm/router.py b/litellm/router.py index 6163da487..30bdbcba2 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -4474,17 +4474,13 @@ class Router: raise ValueError( f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. Try again in {self.cooldown_time} seconds." ) - elif _context_window_error == True: + elif _context_window_error is True: raise litellm.ContextWindowExceededError( message="litellm._pre_call_checks: Context Window exceeded for given call. No models have context window large enough for this call.\n{}".format( _potential_error_str ), model=model, llm_provider="", - response=httpx.Response( - status_code=400, - request=httpx.Request("GET", "https://example.com"), - ), ) if len(invalid_model_indices) > 0: for idx in reversed(invalid_model_indices): @@ -4596,127 +4592,155 @@ class Router: specific_deployment=specific_deployment, request_kwargs=request_kwargs, ) - - model, healthy_deployments = self._common_checks_available_deployment( - model=model, - messages=messages, - input=input, - specific_deployment=specific_deployment, - ) # type: ignore - - if isinstance(healthy_deployments, dict): - return healthy_deployments - - # filter out the deployments currently cooling down - deployments_to_remove = [] - # cooldown_deployments is a list of model_id's cooling down, cooldown_deployments = ["16700539-b3cd-42f4-b426-6a12a1bb706a", "16700539-b3cd-42f4-b426-7899"] - cooldown_deployments = await self._async_get_cooldown_deployments() - verbose_router_logger.debug( - f"async cooldown deployments: {cooldown_deployments}" - ) - # Find deployments in model_list whose model_id is cooling down - for deployment in healthy_deployments: - deployment_id = deployment["model_info"]["id"] - if deployment_id in cooldown_deployments: - deployments_to_remove.append(deployment) - # remove unhealthy deployments from healthy deployments - for deployment in deployments_to_remove: - healthy_deployments.remove(deployment) - - # filter pre-call checks - _allowed_model_region = ( - request_kwargs.get("allowed_model_region") - if request_kwargs is not None - else None - ) - - if self.enable_pre_call_checks and messages is not None: - healthy_deployments = self._pre_call_checks( + try: + model, healthy_deployments = self._common_checks_available_deployment( model=model, - healthy_deployments=healthy_deployments, - messages=messages, - request_kwargs=request_kwargs, - ) - - if len(healthy_deployments) == 0: - if _allowed_model_region is None: - _allowed_model_region = "n/a" - raise ValueError( - f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. pre-call-checks={self.enable_pre_call_checks}, allowed_model_region={_allowed_model_region}" - ) - - if ( - self.routing_strategy == "usage-based-routing-v2" - and self.lowesttpm_logger_v2 is not None - ): - deployment = await self.lowesttpm_logger_v2.async_get_available_deployments( - model_group=model, - healthy_deployments=healthy_deployments, # type: ignore messages=messages, input=input, - ) - if ( - self.routing_strategy == "cost-based-routing" - and self.lowestcost_logger is not None - ): - deployment = await self.lowestcost_logger.async_get_available_deployments( - model_group=model, - healthy_deployments=healthy_deployments, # type: ignore - messages=messages, - input=input, - ) - elif self.routing_strategy == "simple-shuffle": - # if users pass rpm or tpm, we do a random weighted pick - based on rpm/tpm - ############## Check if we can do a RPM/TPM based weighted pick ################# - rpm = healthy_deployments[0].get("litellm_params").get("rpm", None) - if rpm is not None: - # use weight-random pick if rpms provided - rpms = [m["litellm_params"].get("rpm", 0) for m in healthy_deployments] - verbose_router_logger.debug(f"\nrpms {rpms}") - total_rpm = sum(rpms) - weights = [rpm / total_rpm for rpm in rpms] - verbose_router_logger.debug(f"\n weights {weights}") - # Perform weighted random pick - selected_index = random.choices(range(len(rpms)), weights=weights)[0] - verbose_router_logger.debug(f"\n selected index, {selected_index}") - deployment = healthy_deployments[selected_index] - verbose_router_logger.info( - f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment) or deployment[0]} for model: {model}" - ) - return deployment or deployment[0] - ############## Check if we can do a RPM/TPM based weighted pick ################# - tpm = healthy_deployments[0].get("litellm_params").get("tpm", None) - if tpm is not None: - # use weight-random pick if rpms provided - tpms = [m["litellm_params"].get("tpm", 0) for m in healthy_deployments] - verbose_router_logger.debug(f"\ntpms {tpms}") - total_tpm = sum(tpms) - weights = [tpm / total_tpm for tpm in tpms] - verbose_router_logger.debug(f"\n weights {weights}") - # Perform weighted random pick - selected_index = random.choices(range(len(tpms)), weights=weights)[0] - verbose_router_logger.debug(f"\n selected index, {selected_index}") - deployment = healthy_deployments[selected_index] - verbose_router_logger.info( - f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment) or deployment[0]} for model: {model}" - ) - return deployment or deployment[0] + specific_deployment=specific_deployment, + ) # type: ignore - ############## No RPM/TPM passed, we do a random pick ################# - item = random.choice(healthy_deployments) - return item or item[0] - if deployment is None: + if isinstance(healthy_deployments, dict): + return healthy_deployments + + # filter out the deployments currently cooling down + deployments_to_remove = [] + # cooldown_deployments is a list of model_id's cooling down, cooldown_deployments = ["16700539-b3cd-42f4-b426-6a12a1bb706a", "16700539-b3cd-42f4-b426-7899"] + cooldown_deployments = await self._async_get_cooldown_deployments() + verbose_router_logger.debug( + f"async cooldown deployments: {cooldown_deployments}" + ) + # Find deployments in model_list whose model_id is cooling down + for deployment in healthy_deployments: + deployment_id = deployment["model_info"]["id"] + if deployment_id in cooldown_deployments: + deployments_to_remove.append(deployment) + # remove unhealthy deployments from healthy deployments + for deployment in deployments_to_remove: + healthy_deployments.remove(deployment) + + # filter pre-call checks + _allowed_model_region = ( + request_kwargs.get("allowed_model_region") + if request_kwargs is not None + else None + ) + + if self.enable_pre_call_checks and messages is not None: + healthy_deployments = self._pre_call_checks( + model=model, + healthy_deployments=healthy_deployments, + messages=messages, + request_kwargs=request_kwargs, + ) + + if len(healthy_deployments) == 0: + if _allowed_model_region is None: + _allowed_model_region = "n/a" + raise ValueError( + f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. pre-call-checks={self.enable_pre_call_checks}, allowed_model_region={_allowed_model_region}" + ) + + if ( + self.routing_strategy == "usage-based-routing-v2" + and self.lowesttpm_logger_v2 is not None + ): + deployment = ( + await self.lowesttpm_logger_v2.async_get_available_deployments( + model_group=model, + healthy_deployments=healthy_deployments, # type: ignore + messages=messages, + input=input, + ) + ) + if ( + self.routing_strategy == "cost-based-routing" + and self.lowestcost_logger is not None + ): + deployment = ( + await self.lowestcost_logger.async_get_available_deployments( + model_group=model, + healthy_deployments=healthy_deployments, # type: ignore + messages=messages, + input=input, + ) + ) + elif self.routing_strategy == "simple-shuffle": + # if users pass rpm or tpm, we do a random weighted pick - based on rpm/tpm + ############## Check if we can do a RPM/TPM based weighted pick ################# + rpm = healthy_deployments[0].get("litellm_params").get("rpm", None) + if rpm is not None: + # use weight-random pick if rpms provided + rpms = [ + m["litellm_params"].get("rpm", 0) for m in healthy_deployments + ] + verbose_router_logger.debug(f"\nrpms {rpms}") + total_rpm = sum(rpms) + weights = [rpm / total_rpm for rpm in rpms] + verbose_router_logger.debug(f"\n weights {weights}") + # Perform weighted random pick + selected_index = random.choices(range(len(rpms)), weights=weights)[ + 0 + ] + verbose_router_logger.debug(f"\n selected index, {selected_index}") + deployment = healthy_deployments[selected_index] + verbose_router_logger.info( + f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment) or deployment[0]} for model: {model}" + ) + return deployment or deployment[0] + ############## Check if we can do a RPM/TPM based weighted pick ################# + tpm = healthy_deployments[0].get("litellm_params").get("tpm", None) + if tpm is not None: + # use weight-random pick if rpms provided + tpms = [ + m["litellm_params"].get("tpm", 0) for m in healthy_deployments + ] + verbose_router_logger.debug(f"\ntpms {tpms}") + total_tpm = sum(tpms) + weights = [tpm / total_tpm for tpm in tpms] + verbose_router_logger.debug(f"\n weights {weights}") + # Perform weighted random pick + selected_index = random.choices(range(len(tpms)), weights=weights)[ + 0 + ] + verbose_router_logger.debug(f"\n selected index, {selected_index}") + deployment = healthy_deployments[selected_index] + verbose_router_logger.info( + f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment) or deployment[0]} for model: {model}" + ) + return deployment or deployment[0] + + ############## No RPM/TPM passed, we do a random pick ################# + item = random.choice(healthy_deployments) + return item or item[0] + if deployment is None: + verbose_router_logger.info( + f"get_available_deployment for model: {model}, No deployment available" + ) + raise ValueError( + f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}" + ) verbose_router_logger.info( - f"get_available_deployment for model: {model}, No deployment available" + f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment)} for model: {model}" ) - raise ValueError( - f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}" - ) - verbose_router_logger.info( - f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment)} for model: {model}" - ) - return deployment + return deployment + except Exception as e: + traceback_exception = traceback.format_exc() + # if router rejects call -> log to langfuse/otel/etc. + if request_kwargs is not None: + logging_obj = request_kwargs.get("litellm_logging_obj", None) + if logging_obj is not None: + ## LOGGING + threading.Thread( + target=logging_obj.failure_handler, + args=(e, traceback_exception), + ).start() # log response + # Handle any exceptions that might occur during streaming + asyncio.create_task( + logging_obj.async_failure_handler(e, traceback_exception) # type: ignore + ) + raise e def get_available_deployment( self,