fix(router.py): log rejected router requests to langfuse

Fixes issue where rejected requests weren't being logged
This commit is contained in:
Krrish Dholakia 2024-06-24 17:52:01 -07:00
parent 341c7857c1
commit a4bea47a2d
4 changed files with 167 additions and 138 deletions

1
.gitignore vendored
View file

@ -61,3 +61,4 @@ litellm/proxy/_experimental/out/model_hub/index.html
litellm/proxy/_experimental/out/onboarding/index.html litellm/proxy/_experimental/out/onboarding/index.html
litellm/tests/log.txt litellm/tests/log.txt
litellm/tests/langfuse.log litellm/tests/langfuse.log
litellm/tests/langfuse.log

View file

@ -36,9 +36,9 @@ class LangFuseLogger:
self.langfuse_debug = os.getenv("LANGFUSE_DEBUG") self.langfuse_debug = os.getenv("LANGFUSE_DEBUG")
parameters = { parameters = {
"public_key": self.public_key, "public_key": "pk-lf-a65841e9-5192-4397-a679-cfff029fd5b0",
"secret_key": self.secret_key, "secret_key": "sk-lf-d58c2891-3717-4f98-89dd-df44826215fd",
"host": self.langfuse_host, "host": "https://us.cloud.langfuse.com",
"release": self.langfuse_release, "release": self.langfuse_release,
"debug": self.langfuse_debug, "debug": self.langfuse_debug,
"flush_interval": flush_interval, # flush interval in seconds "flush_interval": flush_interval, # flush interval in seconds
@ -311,11 +311,11 @@ class LangFuseLogger:
try: try:
tags = [] tags = []
try: # try:
metadata = copy.deepcopy( # metadata = copy.deepcopy(
metadata # metadata
) # Avoid modifying the original metadata # ) # Avoid modifying the original metadata
except: # except:
new_metadata = {} new_metadata = {}
for key, value in metadata.items(): for key, value in metadata.items():
if ( if (

View file

@ -18,3 +18,7 @@ model_list:
router_settings: router_settings:
enable_pre_call_checks: True enable_pre_call_checks: True
litellm_settings:
failure_callback: ["langfuse"]

View file

@ -4474,17 +4474,13 @@ class Router:
raise ValueError( raise ValueError(
f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. Try again in {self.cooldown_time} seconds." f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. Try again in {self.cooldown_time} seconds."
) )
elif _context_window_error == True: elif _context_window_error is True:
raise litellm.ContextWindowExceededError( raise litellm.ContextWindowExceededError(
message="litellm._pre_call_checks: Context Window exceeded for given call. No models have context window large enough for this call.\n{}".format( message="litellm._pre_call_checks: Context Window exceeded for given call. No models have context window large enough for this call.\n{}".format(
_potential_error_str _potential_error_str
), ),
model=model, model=model,
llm_provider="", llm_provider="",
response=httpx.Response(
status_code=400,
request=httpx.Request("GET", "https://example.com"),
),
) )
if len(invalid_model_indices) > 0: if len(invalid_model_indices) > 0:
for idx in reversed(invalid_model_indices): for idx in reversed(invalid_model_indices):
@ -4596,7 +4592,7 @@ class Router:
specific_deployment=specific_deployment, specific_deployment=specific_deployment,
request_kwargs=request_kwargs, request_kwargs=request_kwargs,
) )
try:
model, healthy_deployments = self._common_checks_available_deployment( model, healthy_deployments = self._common_checks_available_deployment(
model=model, model=model,
messages=messages, messages=messages,
@ -4649,35 +4645,43 @@ class Router:
self.routing_strategy == "usage-based-routing-v2" self.routing_strategy == "usage-based-routing-v2"
and self.lowesttpm_logger_v2 is not None and self.lowesttpm_logger_v2 is not None
): ):
deployment = await self.lowesttpm_logger_v2.async_get_available_deployments( deployment = (
await self.lowesttpm_logger_v2.async_get_available_deployments(
model_group=model, model_group=model,
healthy_deployments=healthy_deployments, # type: ignore healthy_deployments=healthy_deployments, # type: ignore
messages=messages, messages=messages,
input=input, input=input,
) )
)
if ( if (
self.routing_strategy == "cost-based-routing" self.routing_strategy == "cost-based-routing"
and self.lowestcost_logger is not None and self.lowestcost_logger is not None
): ):
deployment = await self.lowestcost_logger.async_get_available_deployments( deployment = (
await self.lowestcost_logger.async_get_available_deployments(
model_group=model, model_group=model,
healthy_deployments=healthy_deployments, # type: ignore healthy_deployments=healthy_deployments, # type: ignore
messages=messages, messages=messages,
input=input, input=input,
) )
)
elif self.routing_strategy == "simple-shuffle": elif self.routing_strategy == "simple-shuffle":
# if users pass rpm or tpm, we do a random weighted pick - based on rpm/tpm # if users pass rpm or tpm, we do a random weighted pick - based on rpm/tpm
############## Check if we can do a RPM/TPM based weighted pick ################# ############## Check if we can do a RPM/TPM based weighted pick #################
rpm = healthy_deployments[0].get("litellm_params").get("rpm", None) rpm = healthy_deployments[0].get("litellm_params").get("rpm", None)
if rpm is not None: if rpm is not None:
# use weight-random pick if rpms provided # use weight-random pick if rpms provided
rpms = [m["litellm_params"].get("rpm", 0) for m in healthy_deployments] rpms = [
m["litellm_params"].get("rpm", 0) for m in healthy_deployments
]
verbose_router_logger.debug(f"\nrpms {rpms}") verbose_router_logger.debug(f"\nrpms {rpms}")
total_rpm = sum(rpms) total_rpm = sum(rpms)
weights = [rpm / total_rpm for rpm in rpms] weights = [rpm / total_rpm for rpm in rpms]
verbose_router_logger.debug(f"\n weights {weights}") verbose_router_logger.debug(f"\n weights {weights}")
# Perform weighted random pick # Perform weighted random pick
selected_index = random.choices(range(len(rpms)), weights=weights)[0] selected_index = random.choices(range(len(rpms)), weights=weights)[
0
]
verbose_router_logger.debug(f"\n selected index, {selected_index}") verbose_router_logger.debug(f"\n selected index, {selected_index}")
deployment = healthy_deployments[selected_index] deployment = healthy_deployments[selected_index]
verbose_router_logger.info( verbose_router_logger.info(
@ -4688,13 +4692,17 @@ class Router:
tpm = healthy_deployments[0].get("litellm_params").get("tpm", None) tpm = healthy_deployments[0].get("litellm_params").get("tpm", None)
if tpm is not None: if tpm is not None:
# use weight-random pick if rpms provided # use weight-random pick if rpms provided
tpms = [m["litellm_params"].get("tpm", 0) for m in healthy_deployments] tpms = [
m["litellm_params"].get("tpm", 0) for m in healthy_deployments
]
verbose_router_logger.debug(f"\ntpms {tpms}") verbose_router_logger.debug(f"\ntpms {tpms}")
total_tpm = sum(tpms) total_tpm = sum(tpms)
weights = [tpm / total_tpm for tpm in tpms] weights = [tpm / total_tpm for tpm in tpms]
verbose_router_logger.debug(f"\n weights {weights}") verbose_router_logger.debug(f"\n weights {weights}")
# Perform weighted random pick # Perform weighted random pick
selected_index = random.choices(range(len(tpms)), weights=weights)[0] selected_index = random.choices(range(len(tpms)), weights=weights)[
0
]
verbose_router_logger.debug(f"\n selected index, {selected_index}") verbose_router_logger.debug(f"\n selected index, {selected_index}")
deployment = healthy_deployments[selected_index] deployment = healthy_deployments[selected_index]
verbose_router_logger.info( verbose_router_logger.info(
@ -4717,6 +4725,22 @@ class Router:
) )
return deployment return deployment
except Exception as e:
traceback_exception = traceback.format_exc()
# if router rejects call -> log to langfuse/otel/etc.
if request_kwargs is not None:
logging_obj = request_kwargs.get("litellm_logging_obj", None)
if logging_obj is not None:
## LOGGING
threading.Thread(
target=logging_obj.failure_handler,
args=(e, traceback_exception),
).start() # log response
# Handle any exceptions that might occur during streaming
asyncio.create_task(
logging_obj.async_failure_handler(e, traceback_exception) # type: ignore
)
raise e
def get_available_deployment( def get_available_deployment(
self, self,