Merge pull request #1356 from BerriAI/litellm_improve_proxy_logs

[Feat] Improve Proxy Logging
This commit is contained in:
Ishaan Jaff 2024-01-08 14:41:01 +05:30 committed by GitHub
commit a70626d6e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 323 additions and 174 deletions

View file

@ -82,6 +82,15 @@ Cli arguments, --host, --port, --num_workers
litellm --debug
```
#### --detailed_debug
- **Default:** `False`
- **Type:** `bool` (Flag)
- Enable debugging mode for the input.
- **Usage:**
```shell
litellm --detailed_debug
``
#### --temperature
- **Default:** `None`
- **Type:** `float`

View file

@ -947,6 +947,13 @@ Run the proxy with `--debug` to easily view debug logs
litellm --model gpt-3.5-turbo --debug
```
### Detailed Debug Logs
Run the proxy with `--detailed_debug` to view detailed debug logs
```shell
litellm --model gpt-3.5-turbo --detailed_debug
```
When making requests you should see the POST request sent by LiteLLM to the LLM on the Terminal output
```shell
POST Request Sent from LiteLLM:
@ -1281,6 +1288,14 @@ LiteLLM proxy adds **0.00325 seconds** latency as compared to using the Raw Open
```shell
litellm --debug
```
#### --detailed_debug
- **Default:** `False`
- **Type:** `bool` (Flag)
- Enable debugging mode for the input.
- **Usage:**
```shell
litellm --detailed_debug
```
#### --temperature
- **Default:** `None`

View file

@ -1,5 +1,17 @@
import logging
set_verbose = False
# Create a handler for the logger (you may need to adapt this based on your needs)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
# Create a formatter and set it for the handler
formatter = logging.Formatter("\033[92m%(name)s - %(levelname)s\033[0m: %(message)s")
handler.setFormatter(formatter)
def print_verbose(print_statement):
try:
@ -7,3 +19,11 @@ def print_verbose(print_statement):
print(print_statement) # noqa
except:
pass
verbose_proxy_logger = logging.getLogger("LiteLLM Proxy")
verbose_router_logger = logging.getLogger("LiteLLM Router")
verbose_logger = logging.getLogger("LiteLLM")
# Add the handler to the logger
verbose_router_logger.addHandler(handler)

View file

@ -88,6 +88,13 @@ def is_port_in_use(port):
@click.option(
"--debug", default=False, is_flag=True, type=bool, help="To debug the input"
)
@click.option(
"--detailed_debug",
default=False,
is_flag=True,
type=bool,
help="To view detailed debug logs",
)
@click.option(
"--use_queue",
default=False,
@ -139,12 +146,6 @@ def is_port_in_use(port):
type=bool,
help="Print LiteLLM version",
)
@click.option(
"--logs",
flag_value=False,
type=int,
help='Gets the "n" most recent logs. By default gets most recent log.',
)
@click.option(
"--health",
flag_value=True,
@ -179,6 +180,7 @@ def run_server(
headers,
save,
debug,
detailed_debug,
temperature,
max_tokens,
request_timeout,
@ -187,7 +189,6 @@ def run_server(
config,
max_budget,
telemetry,
logs,
test,
local,
num_workers,
@ -212,32 +213,6 @@ def run_server(
# this is just a local/relative import error, user git cloned litellm
from proxy_server import app, save_worker_config, usage_telemetry
feature_telemetry = usage_telemetry
if logs is not None:
if logs == 0: # default to 1
logs = 1
try:
with open("api_log.json") as f:
data = json.load(f)
# convert keys to datetime objects
log_times = {
datetime.strptime(k, "%Y%m%d%H%M%S%f"): v for k, v in data.items()
}
# sort by timestamp
sorted_times = sorted(
log_times.items(), key=operator.itemgetter(0), reverse=True
)
# get n recent logs
recent_logs = {
k.strftime("%Y%m%d%H%M%S%f"): v for k, v in sorted_times[:logs]
}
print(json.dumps(recent_logs, indent=4)) # noqa
except:
raise Exception("LiteLLM: No logs saved!")
return
if version == True:
pkg_version = importlib.metadata.version("litellm")
click.echo(f"\nLiteLLM: Current Version = {pkg_version}\n")
@ -377,6 +352,7 @@ def run_server(
api_base=api_base,
api_version=api_version,
debug=debug,
detailed_debug=detailed_debug,
temperature=temperature,
max_tokens=max_tokens,
request_timeout=request_timeout,

View file

@ -1,5 +1,5 @@
model_list:
- model_name: Azure OpenAI GPT-4 Canada-East (External)
- model_name: azure-gpt-3.5
litellm_params:
model: azure/chatgpt-v-2
api_base: os.environ/AZURE_API_BASE
@ -49,6 +49,7 @@ model_list:
mode: completion
litellm_settings:
fallbacks: [{"openai-gpt-3.5": ["azure-gpt-3.5"]}]
# cache: True
# setting callback class
# callbacks: custom_callbacks.proxy_handler_instance # sets litellm.callbacks = [proxy_handler_instance]

View file

@ -19,6 +19,7 @@ try:
import backoff
import yaml
import orjson
import logging
except ImportError as e:
raise ImportError(f"Missing dependency {e}. Run `pip install 'litellm[proxy]'`")
@ -77,6 +78,7 @@ import pydantic
from litellm.proxy._types import *
from litellm.caching import DualCache
from litellm.proxy.health_check import perform_health_check
from litellm._logging import verbose_router_logger, verbose_proxy_logger
litellm.suppress_debug_info = True
from fastapi import (
@ -153,16 +155,7 @@ proxy_logging_obj = ProxyLogging(user_api_key_cache=user_api_key_cache)
async_result = None
celery_app_conn = None
celery_fn = None # Redis Queue for handling requests
#### HELPER FUNCTIONS ####
def print_verbose(print_statement):
try:
global user_debug
if user_debug:
print(print_statement) # noqa
except:
pass
### logger ###
def usage_telemetry(
@ -245,24 +238,26 @@ async def user_api_key_auth(
## check for cache hit (In-Memory Cache)
valid_token = user_api_key_cache.get_cache(key=api_key)
print_verbose(f"valid_token from cache: {valid_token}")
verbose_proxy_logger.debug(f"valid_token from cache: {valid_token}")
if valid_token is None:
## check db
print_verbose(f"api key: {api_key}")
verbose_proxy_logger.debug(f"api key: {api_key}")
valid_token = await prisma_client.get_data(
token=api_key, expires=datetime.utcnow().replace(tzinfo=timezone.utc)
)
print_verbose(f"valid token from prisma: {valid_token}")
verbose_proxy_logger.debug(f"valid token from prisma: {valid_token}")
user_api_key_cache.set_cache(key=api_key, value=valid_token, ttl=60)
elif valid_token is not None:
print_verbose(f"API Key Cache Hit!")
verbose_proxy_logger.debug(f"API Key Cache Hit!")
if valid_token:
litellm.model_alias_map = valid_token.aliases
config = valid_token.config
if config != {}:
model_list = config.get("model_list", [])
llm_model_list = model_list
print_verbose(f"\n new llm router model list {llm_model_list}")
verbose_proxy_logger.debug(
f"\n new llm router model list {llm_model_list}"
)
if (
len(valid_token.models) == 0
): # assume an empty model list means all models are allowed to be called
@ -296,7 +291,7 @@ async def user_api_key_auth(
else:
raise Exception(f"Invalid token")
except Exception as e:
# print_verbose(f"An exception occurred - {traceback.format_exc()}")
# verbose_proxy_logger.debug(f"An exception occurred - {traceback.format_exc()}")
traceback.print_exc()
if isinstance(e, HTTPException):
raise e
@ -316,7 +311,7 @@ def prisma_setup(database_url: Optional[str]):
database_url=database_url, proxy_logging_obj=proxy_logging_obj
)
except Exception as e:
print_verbose(
verbose_proxy_logger.debug(
f"Error when initializing prisma, Ensure you run pip install prisma {str(e)}"
)
@ -358,7 +353,7 @@ def load_from_azure_key_vault(use_azure_key_vault: bool = False):
f"Missing KVUri or client_id or client_secret or tenant_id from environment"
)
except Exception as e:
print_verbose(
verbose_proxy_logger.debug(
"Error when loading keys from Azure Key Vault. Ensure you run `pip install azure-identity azure-keyvault-secrets`"
)
@ -367,7 +362,7 @@ def cost_tracking():
global prisma_client
if prisma_client is not None:
if isinstance(litellm.success_callback, list):
print_verbose("setting litellm success callback to track cost")
verbose_proxy_logger.debug("setting litellm success callback to track cost")
if (track_cost_callback) not in litellm.success_callback: # type: ignore
litellm.success_callback.append(track_cost_callback) # type: ignore
@ -381,7 +376,7 @@ async def track_cost_callback(
global prisma_client
try:
# check if it has collected an entire stream response
print_verbose(
verbose_proxy_logger.debug(
f"kwargs stream: {kwargs.get('stream', None)} + complete streaming response: {kwargs.get('complete_streaming_response', None)}"
)
if "complete_streaming_response" in kwargs:
@ -390,7 +385,7 @@ async def track_cost_callback(
response_cost = litellm.completion_cost(
completion_response=completion_response
)
print_verbose(f"streaming response_cost {response_cost}")
verbose_proxy_logger.debug(f"streaming response_cost {response_cost}")
user_api_key = kwargs["litellm_params"]["metadata"].get(
"user_api_key", None
)
@ -416,12 +411,14 @@ async def track_cost_callback(
token=user_api_key, response_cost=response_cost, user_id=user_id
)
except Exception as e:
print_verbose(f"error in tracking cost callback - {str(e)}")
verbose_proxy_logger.debug(f"error in tracking cost callback - {str(e)}")
async def update_prisma_database(token, response_cost, user_id=None):
try:
print_verbose(f"Enters prisma db call, token: {token}; user_id: {user_id}")
verbose_proxy_logger.debug(
f"Enters prisma db call, token: {token}; user_id: {user_id}"
)
### UPDATE USER SPEND ###
async def _update_user_db():
@ -436,7 +433,7 @@ async def update_prisma_database(token, response_cost, user_id=None):
# Calculate the new cost by adding the existing cost and response_cost
new_spend = existing_spend + response_cost
print_verbose(f"new cost: {new_spend}")
verbose_proxy_logger.debug(f"new cost: {new_spend}")
# Update the cost column for the given user id
await prisma_client.update_data(user_id=user_id, data={"spend": new_spend})
@ -444,7 +441,7 @@ async def update_prisma_database(token, response_cost, user_id=None):
async def _update_key_db():
# Fetch the existing cost for the given token
existing_spend_obj = await prisma_client.get_data(token=token)
print_verbose(f"existing spend: {existing_spend_obj}")
verbose_proxy_logger.debug(f"existing spend: {existing_spend_obj}")
if existing_spend_obj is None:
existing_spend = 0
else:
@ -452,7 +449,7 @@ async def update_prisma_database(token, response_cost, user_id=None):
# Calculate the new cost by adding the existing cost and response_cost
new_spend = existing_spend + response_cost
print_verbose(f"new cost: {new_spend}")
verbose_proxy_logger.debug(f"new cost: {new_spend}")
# Update the cost column for the given token
await prisma_client.update_data(token=token, data={"spend": new_spend})
@ -461,7 +458,9 @@ async def update_prisma_database(token, response_cost, user_id=None):
tasks.append(_update_key_db())
await asyncio.gather(*tasks)
except Exception as e:
print_verbose(f"Error updating Prisma database: {traceback.format_exc()}")
verbose_proxy_logger.debug(
f"Error updating Prisma database: {traceback.format_exc()}"
)
pass
@ -472,7 +471,7 @@ def run_ollama_serve():
with open(os.devnull, "w") as devnull:
process = subprocess.Popen(command, stdout=devnull, stderr=devnull)
except Exception as e:
print_verbose(
verbose_proxy_logger.debug(
f"""
LiteLLM Warning: proxy started with `ollama` model\n`ollama serve` failed with Exception{e}. \nEnsure you run `ollama serve`
"""
@ -612,7 +611,7 @@ class ProxyConfig:
printed_yaml = copy.deepcopy(config)
printed_yaml.pop("environment_variables", None)
print_verbose(
verbose_proxy_logger.debug(
f"Loaded config YAML (api_key and environment_variables are not shown):\n{json.dumps(printed_yaml, indent=2)}"
)
@ -643,7 +642,7 @@ class ProxyConfig:
cache_type = cache_params.get("type", "redis")
print_verbose(f"passed cache type={cache_type}")
verbose_proxy_logger.debug(f"passed cache type={cache_type}")
if cache_type == "redis":
cache_host = litellm.get_secret("REDIS_HOST", None)
@ -687,14 +686,16 @@ class ProxyConfig:
litellm.callbacks = [
get_instance_fn(value=value, config_file_path=config_file_path)
]
print_verbose(
verbose_proxy_logger.debug(
f"{blue_color_code} Initialized Callbacks - {litellm.callbacks} {reset_color_code}"
)
elif key == "post_call_rules":
litellm.post_call_rules = [
get_instance_fn(value=value, config_file_path=config_file_path)
]
print_verbose(f"litellm.post_call_rules: {litellm.post_call_rules}")
verbose_proxy_logger.debug(
f"litellm.post_call_rules: {litellm.post_call_rules}"
)
elif key == "success_callback":
litellm.success_callback = []
@ -708,7 +709,7 @@ class ProxyConfig:
# these are litellm callbacks - "langfuse", "sentry", "wandb"
else:
litellm.success_callback.append(callback)
print_verbose(
verbose_proxy_logger.debug(
f"{blue_color_code} Initialized Success Callbacks - {litellm.success_callback} {reset_color_code}"
)
elif key == "failure_callback":
@ -724,7 +725,7 @@ class ProxyConfig:
# these are litellm callbacks - "langfuse", "sentry", "wandb"
else:
litellm.failure_callback.append(callback)
print_verbose(
verbose_proxy_logger.debug(
f"{blue_color_code} Initialized Success Callbacks - {litellm.failure_callback} {reset_color_code}"
)
elif key == "cache_params":
@ -764,9 +765,9 @@ class ProxyConfig:
### CONNECT TO DATABASE ###
database_url = general_settings.get("database_url", None)
if database_url and database_url.startswith("os.environ/"):
print_verbose(f"GOING INTO LITELLM.GET_SECRET!")
verbose_proxy_logger.debug(f"GOING INTO LITELLM.GET_SECRET!")
database_url = litellm.get_secret(database_url)
print_verbose(f"RETRIEVED DB URL: {database_url}")
verbose_proxy_logger.debug(f"RETRIEVED DB URL: {database_url}")
prisma_setup(database_url=database_url)
## COST TRACKING ##
cost_tracking()
@ -902,7 +903,7 @@ async def generate_key_helper_fn(
"max_budget": max_budget,
"user_email": user_email,
}
print_verbose("PrismaClient: Before Insert Data")
verbose_proxy_logger.debug("PrismaClient: Before Insert Data")
new_verification_token = await prisma_client.insert_data(
data=verification_token_data
)
@ -943,6 +944,7 @@ async def initialize(
api_base=None,
api_version=None,
debug=False,
detailed_debug=False,
temperature=None,
max_tokens=None,
request_timeout=600,
@ -955,12 +957,22 @@ async def initialize(
use_queue=False,
config=None,
):
global user_model, user_api_base, user_debug, user_max_tokens, user_request_timeout, user_temperature, user_telemetry, user_headers, experimental, llm_model_list, llm_router, general_settings, master_key, user_custom_auth, prisma_client
global user_model, user_api_base, user_debug, user_detailed_debug, user_user_max_tokens, user_request_timeout, user_temperature, user_telemetry, user_headers, experimental, llm_model_list, llm_router, general_settings, master_key, user_custom_auth, prisma_client
generate_feedback_box()
user_model = model
user_debug = debug
if debug == True: # this needs to be first, so users can see Router init debugg
from litellm._logging import verbose_router_logger, verbose_proxy_logger
import logging
verbose_router_logger.setLevel(level=logging.INFO) # set router logs to info
if detailed_debug == True:
from litellm._logging import verbose_router_logger, verbose_proxy_logger
import logging
verbose_router_logger.setLevel(level=logging.DEBUG) # set router logs to info
litellm.set_verbose = True
dynamic_config = {"general": {}, user_model: {}}
if config:
(
@ -1031,9 +1043,9 @@ async def initialize(
# for streaming
def data_generator(response):
print_verbose("inside generator")
verbose_proxy_logger.debug("inside generator")
for chunk in response:
print_verbose(f"returned chunk: {chunk}")
verbose_proxy_logger.debug(f"returned chunk: {chunk}")
try:
yield f"data: {json.dumps(chunk.dict())}\n\n"
except:
@ -1041,11 +1053,11 @@ def data_generator(response):
async def async_data_generator(response, user_api_key_dict):
print_verbose("inside generator")
verbose_proxy_logger.debug("inside generator")
try:
start_time = time.time()
async for chunk in response:
print_verbose(f"returned chunk: {chunk}")
verbose_proxy_logger.debug(f"returned chunk: {chunk}")
try:
yield f"data: {json.dumps(chunk.dict())}\n\n"
except Exception as e:
@ -1110,7 +1122,7 @@ async def startup_event():
### LOAD CONFIG ###
worker_config = litellm.get_secret("WORKER_CONFIG")
print_verbose(f"worker_config: {worker_config}")
verbose_proxy_logger.debug(f"worker_config: {worker_config}")
# check if it's a valid file path
if os.path.isfile(worker_config):
await initialize(**worker_config)
@ -1125,7 +1137,7 @@ async def startup_event():
_run_background_health_check()
) # start the background health check coroutine.
print_verbose(f"prisma client - {prisma_client}")
verbose_proxy_logger.debug(f"prisma client - {prisma_client}")
if prisma_client:
await prisma_client.connect()
@ -1152,7 +1164,7 @@ def model_list():
all_models = list(set(all_models + [m["model_name"] for m in llm_model_list]))
if user_model is not None:
all_models += [user_model]
print_verbose(f"all_models: {all_models}")
verbose_proxy_logger.debug(f"all_models: {all_models}")
### CHECK OLLAMA MODELS ###
try:
response = requests.get("http://0.0.0.0:11434/api/tags")
@ -1271,7 +1283,7 @@ async def completion(
else:
model_id = ""
print_verbose(f"final response: {response}")
verbose_proxy_logger.debug(f"final response: {response}")
if (
"stream" in data and data["stream"] == True
): # use generate_responses to stream responses
@ -1296,8 +1308,8 @@ async def completion(
fastapi_response.headers["x-litellm-model-id"] = model_id
return response
except Exception as e:
print_verbose(f"EXCEPTION RAISED IN PROXY MAIN.PY")
print_verbose(
verbose_proxy_logger.debug(f"EXCEPTION RAISED IN PROXY MAIN.PY")
verbose_proxy_logger.debug(
f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`"
)
traceback.print_exc()
@ -1352,13 +1364,13 @@ async def chat_completion(
## Cache Controls
headers = request.headers
print_verbose(f"Request Headers: {headers}")
verbose_proxy_logger.debug(f"Request Headers: {headers}")
cache_control_header = headers.get("Cache-Control", None)
if cache_control_header:
cache_dict = parse_cache_control(cache_control_header)
data["ttl"] = cache_dict.get("s-maxage")
print_verbose(f"receiving data: {data}")
verbose_proxy_logger.debug(f"receiving data: {data}")
data["model"] = (
general_settings.get("completion_model", None) # server default
or user_model # model name passed via cli args
@ -1372,7 +1384,7 @@ async def chat_completion(
data["user"] = user_api_key_dict.user_id
if "metadata" in data:
print_verbose(f'received metadata: {data["metadata"]}')
verbose_proxy_logger.debug(f'received metadata: {data["metadata"]}')
data["metadata"]["user_api_key"] = user_api_key_dict.api_key
data["metadata"]["user_api_key_user_id"] = user_api_key_dict.user_id
data["metadata"]["headers"] = dict(request.headers)
@ -1463,7 +1475,7 @@ async def chat_completion(
await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e
)
print_verbose(
verbose_proxy_logger.debug(
f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`"
)
router_model_names = (
@ -1472,17 +1484,17 @@ async def chat_completion(
else []
)
if llm_router is not None and data.get("model", "") in router_model_names:
print_verbose("Results from router")
print_verbose("\nRouter stats")
print_verbose("\nTotal Calls made")
verbose_proxy_logger.debug("Results from router")
verbose_proxy_logger.debug("\nRouter stats")
verbose_proxy_logger.debug("\nTotal Calls made")
for key, value in llm_router.total_calls.items():
print_verbose(f"{key}: {value}")
print_verbose("\nSuccess Calls made")
verbose_proxy_logger.debug(f"{key}: {value}")
verbose_proxy_logger.debug("\nSuccess Calls made")
for key, value in llm_router.success_calls.items():
print_verbose(f"{key}: {value}")
print_verbose("\nFail Calls made")
verbose_proxy_logger.debug(f"{key}: {value}")
verbose_proxy_logger.debug("\nFail Calls made")
for key, value in llm_router.fail_calls.items():
print_verbose(f"{key}: {value}")
verbose_proxy_logger.debug(f"{key}: {value}")
if user_debug:
traceback.print_exc()
@ -1785,7 +1797,7 @@ async def generate_key_fn(
- expires: (datetime) Datetime object for when key expires.
- user_id: (str) Unique user id - used for tracking spend across multiple keys for same user id.
"""
print_verbose("entered /key/generate")
verbose_proxy_logger.debug("entered /key/generate")
data_json = data.json() # type: ignore
response = await generate_key_helper_fn(**data_json)
return GenerateKeyResponse(
@ -2001,9 +2013,9 @@ async def add_new_model(model_params: ModelParams):
# Load existing config
config = await proxy_config.get_config()
print_verbose(f"User config path: {user_config_file_path}")
verbose_proxy_logger.debug(f"User config path: {user_config_file_path}")
print_verbose(f"Loaded config: {config}")
verbose_proxy_logger.debug(f"Loaded config: {config}")
# Add the new model to the config
model_info = model_params.model_info.json()
model_info = {k: v for k, v in model_info.items() if v is not None}
@ -2015,7 +2027,7 @@ async def add_new_model(model_params: ModelParams):
}
)
print_verbose(f"updated model list: {config['model_list']}")
verbose_proxy_logger.debug(f"updated model list: {config['model_list']}")
# Save new config
await proxy_config.save_config(new_config=config)
@ -2065,7 +2077,7 @@ async def model_info_v1(request: Request):
# don't return the api key
model["litellm_params"].pop("api_key", None)
print_verbose(f"all_models: {all_models}")
verbose_proxy_logger.debug(f"all_models: {all_models}")
return {"data": all_models}
@ -2130,7 +2142,7 @@ async def _litellm_chat_completions_worker(data, user_api_key_dict):
user_api_key_dict=user_api_key_dict, data=data, call_type="completion"
)
print_verbose(f"_litellm_chat_completions_worker started")
verbose_proxy_logger.debug(f"_litellm_chat_completions_worker started")
### ROUTE THE REQUEST ###
router_model_names = (
[m["model_name"] for m in llm_model_list]
@ -2156,17 +2168,17 @@ async def _litellm_chat_completions_worker(data, user_api_key_dict):
else: # router is not set
response = await litellm.acompletion(**data)
print_verbose(f"final response: {response}")
verbose_proxy_logger.debug(f"final response: {response}")
return response
except HTTPException as e:
print_verbose(
verbose_proxy_logger.debug(
f"EXCEPTION RAISED IN _litellm_chat_completions_worker - {e.status_code}; {e.detail}"
)
if (
e.status_code == 429
and "Max parallel request limit reached" in e.detail
):
print_verbose(f"Max parallel request limit reached!")
verbose_proxy_logger.debug(f"Max parallel request limit reached!")
timeout = litellm._calculate_retry_after(
remaining_retries=3, max_retries=3, min_timeout=1
)
@ -2206,7 +2218,7 @@ async def async_queue_request(
"body": copy.copy(data), # use copy instead of deepcopy
}
print_verbose(f"receiving data: {data}")
verbose_proxy_logger.debug(f"receiving data: {data}")
data["model"] = (
general_settings.get("completion_model", None) # server default
or user_model # model name passed via cli args
@ -2220,7 +2232,7 @@ async def async_queue_request(
data["user"] = user_api_key_dict.user_id
if "metadata" in data:
print_verbose(f'received metadata: {data["metadata"]}')
verbose_proxy_logger.debug(f'received metadata: {data["metadata"]}')
data["metadata"]["user_api_key"] = user_api_key_dict.api_key
data["metadata"]["headers"] = dict(request.headers)
data["metadata"]["user_api_key_user_id"] = user_api_key_dict.user_id
@ -2294,7 +2306,7 @@ async def update_config(config_info: ConfigYAML):
config = await proxy_config.get_config()
backup_config = copy.deepcopy(config)
print_verbose(f"Loaded config: {config}")
verbose_proxy_logger.debug(f"Loaded config: {config}")
# update the general settings
if config_info.general_settings is not None:
@ -2478,7 +2490,7 @@ async def get_routes():
async def shutdown_event():
global prisma_client, master_key, user_custom_auth
if prisma_client:
print_verbose("Disconnecting from Prisma")
verbose_proxy_logger.debug("Disconnecting from Prisma")
await prisma_client.disconnect()
## RESET CUSTOM VARIABLES ##

View file

@ -26,6 +26,8 @@ from litellm.llms.custom_httpx.azure_dall_e_2 import (
)
from litellm.utils import ModelResponse, CustomStreamWrapper
import copy
from litellm._logging import verbose_router_logger
import logging
class Router:
@ -223,7 +225,7 @@ class Router:
litellm.failure_callback.append(self.deployment_callback_on_failure)
else:
litellm.failure_callback = [self.deployment_callback_on_failure]
self.print_verbose(
verbose_router_logger.debug(
f"Intialized router with Routing strategy: {self.routing_strategy}\n"
)
@ -316,7 +318,7 @@ class Router:
async def _acompletion(self, model: str, messages: List[Dict[str, str]], **kwargs):
model_name = None
try:
self.print_verbose(
verbose_router_logger.debug(
f"Inside _acompletion()- model: {model}; kwargs: {kwargs}"
)
deployment = self.get_available_deployment(
@ -363,8 +365,14 @@ class Router:
}
)
self.success_calls[model_name] += 1
verbose_router_logger.info(
f"litellm.acompletion(model={model_name})\033[32m 200 OK\033[0m"
)
return response
except Exception as e:
verbose_router_logger.info(
f"litellm.acompletion(model={model_name})\033[31m Exception {str(e)}\033[0m"
)
if model_name is not None:
self.fail_calls[model_name] += 1
raise e
@ -385,7 +393,7 @@ class Router:
def _image_generation(self, prompt: str, model: str, **kwargs):
try:
self.print_verbose(
verbose_router_logger.debug(
f"Inside _image_generation()- model: {model}; kwargs: {kwargs}"
)
deployment = self.get_available_deployment(
@ -432,8 +440,14 @@ class Router:
}
)
self.success_calls[model_name] += 1
verbose_router_logger.info(
f"litellm.image_generation(model={model_name})\033[32m 200 OK\033[0m"
)
return response
except Exception as e:
verbose_router_logger.info(
f"litellm.image_generation(model={model_name})\033[31m Exception {str(e)}\033[0m"
)
if model_name is not None:
self.fail_calls[model_name] += 1
raise e
@ -454,7 +468,7 @@ class Router:
async def _aimage_generation(self, prompt: str, model: str, **kwargs):
try:
self.print_verbose(
verbose_router_logger.debug(
f"Inside _image_generation()- model: {model}; kwargs: {kwargs}"
)
deployment = self.get_available_deployment(
@ -501,8 +515,14 @@ class Router:
}
)
self.success_calls[model_name] += 1
verbose_router_logger.info(
f"litellm.aimage_generation(model={model_name})\033[32m 200 OK\033[0m"
)
return response
except Exception as e:
verbose_router_logger.info(
f"litellm.aimage_generation(model={model_name})\033[31m Exception {str(e)}\033[0m"
)
if model_name is not None:
self.fail_calls[model_name] += 1
raise e
@ -576,7 +596,7 @@ class Router:
async def _atext_completion(self, model: str, prompt: str, **kwargs):
try:
self.print_verbose(
verbose_router_logger.debug(
f"Inside _atext_completion()- model: {model}; kwargs: {kwargs}"
)
deployment = self.get_available_deployment(
@ -623,8 +643,14 @@ class Router:
}
)
self.success_calls[model_name] += 1
verbose_router_logger.info(
f"litellm.atext_completion(model={model_name})\033[32m 200 OK\033[0m"
)
return response
except Exception as e:
verbose_router_logger.info(
f"litellm.atext_completion(model={model_name})\033[31m Exception {str(e)}\033[0m"
)
if model_name is not None:
self.fail_calls[model_name] += 1
raise e
@ -696,7 +722,7 @@ class Router:
async def _aembedding(self, input: Union[str, List], model: str, **kwargs):
try:
self.print_verbose(
verbose_router_logger.debug(
f"Inside _aembedding()- model: {model}; kwargs: {kwargs}"
)
deployment = self.get_available_deployment(
@ -743,8 +769,14 @@ class Router:
}
)
self.success_calls[model_name] += 1
verbose_router_logger.info(
f"litellm.aembedding(model={model_name})\033[32m 200 OK\033[0m"
)
return response
except Exception as e:
verbose_router_logger.info(
f"litellm.aembedding(model={model_name})\033[31m Exception {str(e)}\033[0m"
)
if model_name is not None:
self.fail_calls[model_name] += 1
raise e
@ -761,19 +793,18 @@ class Router:
)
try:
response = await self.async_function_with_retries(*args, **kwargs)
self.print_verbose(f"Async Response: {response}")
verbose_router_logger.debug(f"Async Response: {response}")
return response
except Exception as e:
self.print_verbose(
f"An exception occurs: {e}\n\n Traceback{traceback.format_exc()}"
)
verbose_router_logger.debug(f"Traceback{traceback.format_exc()}")
original_exception = e
fallback_model_group = None
try:
if (
hasattr(e, "status_code") and e.status_code == 400
): # don't retry a malformed request
raise e
self.print_verbose(f"Trying to fallback b/w models")
verbose_router_logger.debug(f"Trying to fallback b/w models")
if (
isinstance(e, litellm.ContextWindowExceededError)
and context_window_fallbacks is not None
@ -802,11 +833,16 @@ class Router:
except Exception as e:
pass
elif fallbacks is not None:
self.print_verbose(f"inside model fallbacks: {fallbacks}")
verbose_router_logger.debug(f"inside model fallbacks: {fallbacks}")
for item in fallbacks:
if list(item.keys())[0] == model_group:
fallback_model_group = item[model_group]
break
if fallback_model_group is None:
verbose_router_logger.info(
f"No fallback model group found for original model_group={model_group}. Fallbacks={fallbacks}"
)
raise original_exception
for mg in fallback_model_group:
"""
Iterate through the model groups and try calling that deployment
@ -814,6 +850,9 @@ class Router:
try:
## LOGGING
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
verbose_router_logger.info(
f"Falling back to model_group = {mg}"
)
kwargs["model"] = mg
kwargs["metadata"]["model_group"] = mg
response = await self.async_function_with_retries(
@ -823,12 +862,12 @@ class Router:
except Exception as e:
raise e
except Exception as e:
self.print_verbose(f"An exception occurred - {str(e)}")
verbose_router_logger.debug(f"An exception occurred - {str(e)}")
traceback.print_exc()
raise original_exception
async def async_function_with_retries(self, *args, **kwargs):
self.print_verbose(
verbose_router_logger.debug(
f"Inside async function with retries: args - {args}; kwargs - {kwargs}"
)
original_function = kwargs.pop("original_function")
@ -836,7 +875,7 @@ class Router:
context_window_fallbacks = kwargs.pop(
"context_window_fallbacks", self.context_window_fallbacks
)
self.print_verbose(
verbose_router_logger.debug(
f"async function w/ retries: original_function - {original_function}"
)
num_retries = kwargs.pop("num_retries")
@ -891,7 +930,7 @@ class Router:
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
for current_attempt in range(num_retries):
self.print_verbose(
verbose_router_logger.debug(
f"retrying request. Current attempt - {current_attempt}; num retries: {num_retries}"
)
try:
@ -952,9 +991,9 @@ class Router:
return response
except Exception as e:
original_exception = e
self.print_verbose(f"An exception occurs {original_exception}")
verbose_router_logger.debug(f"An exception occurs {original_exception}")
try:
self.print_verbose(
verbose_router_logger.debug(
f"Trying to fallback b/w models. Initial model group: {model_group}"
)
if (
@ -986,7 +1025,7 @@ class Router:
except Exception as e:
pass
elif fallbacks is not None:
self.print_verbose(f"inside model fallbacks: {fallbacks}")
verbose_router_logger.debug(f"inside model fallbacks: {fallbacks}")
fallback_model_group = None
for item in fallbacks:
if list(item.keys())[0] == model_group:
@ -1016,7 +1055,7 @@ class Router:
"""
Try calling the model 3 times. Shuffle between available deployments.
"""
self.print_verbose(
verbose_router_logger.debug(
f"Inside function with retries: args - {args}; kwargs - {kwargs}"
)
original_function = kwargs.pop("original_function")
@ -1031,7 +1070,9 @@ class Router:
return response
except Exception as e:
original_exception = e
self.print_verbose(f"num retries in function with retries: {num_retries}")
verbose_router_logger.debug(
f"num retries in function with retries: {num_retries}"
)
### CHECK IF RATE LIMIT / CONTEXT WINDOW ERROR
if (
isinstance(original_exception, litellm.ContextWindowExceededError)
@ -1046,7 +1087,7 @@ class Router:
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
### RETRY
for current_attempt in range(num_retries):
self.print_verbose(
verbose_router_logger.debug(
f"retrying request. Current attempt - {current_attempt}; retries left: {num_retries}"
)
try:
@ -1129,10 +1170,10 @@ class Router:
deployment_exceptions = self.model_exception_map.get(deployment, [])
deployment_exceptions.append(exception_str)
self.model_exception_map[deployment] = deployment_exceptions
self.print_verbose("\nEXCEPTION FOR DEPLOYMENTS\n")
self.print_verbose(self.model_exception_map)
verbose_router_logger.debug("\nEXCEPTION FOR DEPLOYMENTS\n")
verbose_router_logger.debug(self.model_exception_map)
for model in self.model_exception_map:
self.print_verbose(
verbose_router_logger.debug(
f"Model {model} had {len(self.model_exception_map[model])} exception"
)
if custom_llm_provider:
@ -1184,7 +1225,7 @@ class Router:
# cooldown deployment
current_fails = self.failed_calls.get_cache(key=deployment) or 0
updated_fails = current_fails + 1
self.print_verbose(
verbose_router_logger.debug(
f"Attempting to add {deployment} to cooldown list. updated_fails: {updated_fails}; self.allowed_fails: {self.allowed_fails}"
)
if updated_fails > self.allowed_fails:
@ -1192,7 +1233,7 @@ class Router:
cooldown_key = f"{current_minute}:cooldown_models" # group cooldown models by minute to reduce number of redis calls
cached_value = self.cache.get_cache(key=cooldown_key)
self.print_verbose(f"adding {deployment} to cooldown models")
verbose_router_logger.debug(f"adding {deployment} to cooldown models")
# update value
try:
if deployment in cached_value:
@ -1221,7 +1262,7 @@ class Router:
# ----------------------
cooldown_models = self.cache.get_cache(key=cooldown_key) or []
self.print_verbose(f"retrieve cooldown models: {cooldown_models}")
verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}")
return cooldown_models
def set_client(self, model: dict):
@ -1371,7 +1412,7 @@ class Router:
local_only=True,
) # cache for 1 hr
else:
self.print_verbose(
verbose_router_logger.debug(
f"Initializing Azure OpenAI Client for {model_name}, Api Base: {str(api_base)}, Api Key:{api_key}"
)
@ -1463,7 +1504,7 @@ class Router:
) # cache for 1 hr
else:
self.print_verbose(
verbose_router_logger.debug(
f"Initializing OpenAI Client for {model_name}, Api Base:{str(api_base)}, Api Key:{api_key}"
)
cache_key = f"{model_id}_async_client"
@ -1569,7 +1610,7 @@ class Router:
self.set_client(model=model)
self.print_verbose(f"\nInitialized Model List {self.model_list}")
verbose_router_logger.debug(f"\nInitialized Model List {self.model_list}")
self.model_names = [m["model_name"] for m in model_list]
def get_model_names(self):
@ -1631,13 +1672,6 @@ class Router:
client = self.cache.get_cache(key=cache_key)
return client
def print_verbose(self, print_statement):
try:
if self.set_verbose or litellm.set_verbose:
print(f"LiteLLM.Router: {print_statement}") # noqa
except:
pass
def get_available_deployment(
self,
model: str,
@ -1665,7 +1699,7 @@ class Router:
# check if aliases set on litellm model alias map
if model in self.model_group_alias:
self.print_verbose(
verbose_router_logger.debug(
f"Using a model alias. Got Request for {model}, sending requests to {self.model_group_alias.get(model)}"
)
model = self.model_group_alias[model]
@ -1679,13 +1713,15 @@ class Router:
m for m in self.model_list if m["litellm_params"]["model"] == model
]
self.print_verbose(f"initial list of deployments: {healthy_deployments}")
verbose_router_logger.debug(
f"initial list of deployments: {healthy_deployments}"
)
# filter out the deployments currently cooling down
deployments_to_remove = []
# cooldown_deployments is a list of model_id's cooling down, cooldown_deployments = ["16700539-b3cd-42f4-b426-6a12a1bb706a", "16700539-b3cd-42f4-b426-7899"]
cooldown_deployments = self._get_cooldown_deployments()
self.print_verbose(f"cooldown deployments: {cooldown_deployments}")
verbose_router_logger.debug(f"cooldown deployments: {cooldown_deployments}")
# Find deployments in model_list whose model_id is cooling down
for deployment in healthy_deployments:
deployment_id = deployment["model_info"]["id"]
@ -1695,7 +1731,7 @@ class Router:
for deployment in deployments_to_remove:
healthy_deployments.remove(deployment)
self.print_verbose(
verbose_router_logger.debug(
f"healthy deployments: length {len(healthy_deployments)} {healthy_deployments}"
)
if len(healthy_deployments) == 0:
@ -1715,13 +1751,13 @@ class Router:
if rpm is not None:
# use weight-random pick if rpms provided
rpms = [m["litellm_params"].get("rpm", 0) for m in healthy_deployments]
self.print_verbose(f"\nrpms {rpms}")
verbose_router_logger.debug(f"\nrpms {rpms}")
total_rpm = sum(rpms)
weights = [rpm / total_rpm for rpm in rpms]
self.print_verbose(f"\n weights {weights}")
verbose_router_logger.debug(f"\n weights {weights}")
# Perform weighted random pick
selected_index = random.choices(range(len(rpms)), weights=weights)[0]
self.print_verbose(f"\n selected index, {selected_index}")
verbose_router_logger.debug(f"\n selected index, {selected_index}")
deployment = healthy_deployments[selected_index]
return deployment or deployment[0]
############## Check if we can do a RPM/TPM based weighted pick #################
@ -1729,13 +1765,13 @@ class Router:
if tpm is not None:
# use weight-random pick if rpms provided
tpms = [m["litellm_params"].get("tpm", 0) for m in healthy_deployments]
self.print_verbose(f"\ntpms {tpms}")
verbose_router_logger.debug(f"\ntpms {tpms}")
total_tpm = sum(tpms)
weights = [tpm / total_tpm for tpm in tpms]
self.print_verbose(f"\n weights {weights}")
verbose_router_logger.debug(f"\n weights {weights}")
# Perform weighted random pick
selected_index = random.choices(range(len(tpms)), weights=weights)[0]
self.print_verbose(f"\n selected index, {selected_index}")
verbose_router_logger.debug(f"\n selected index, {selected_index}")
deployment = healthy_deployments[selected_index]
return deployment or deployment[0]

View file

@ -1310,12 +1310,13 @@ def test_completion_together_ai_mixtral():
def test_completion_together_ai_yi_chat():
litellm.set_verbose = True
model_name = "together_ai/zero-one-ai/Yi-34B-Chat"
try:
messages = [
{"role": "user", "content": "What llm are you?"},
]
response = completion(model=model_name, messages=messages)
response = completion(model=model_name, messages=messages, max_tokens=5)
# Add any assertions here to check the response
print(response)
cost = completion_cost(completion_response=response)

View file

@ -0,0 +1,79 @@
import sys, os, time
import traceback, asyncio
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import litellm, asyncio, logging
from litellm import Router
# this tests debug logs from litellm router and litellm proxy server
from litellm._logging import verbose_router_logger
verbose_router_logger.setLevel(level=logging.INFO)
# this tests debug logs from litellm router and litellm proxy server
def test_async_fallbacks(caplog):
# THIS IS A PROD TEST - DO NOT DELETE THIS. Used for testing if litellm proxy verbose logs are human readable
litellm.set_verbose = False
model_list = [
{
"model_name": "azure/gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
"tpm": 240000,
"rpm": 1800,
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad-key",
},
"tpm": 1000000,
"rpm": 9000,
},
]
router = Router(
model_list=model_list,
fallbacks=[{"gpt-3.5-turbo": ["azure/gpt-3.5-turbo"]}],
)
user_message = "Hello, how are you?"
messages = [{"content": user_message, "role": "user"}]
async def _make_request():
try:
response = await router.acompletion(
model="gpt-3.5-turbo", messages=messages, max_tokens=1
)
router.reset()
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred: {e}")
finally:
router.reset()
asyncio.run(_make_request())
captured_logs = [rec.message for rec in caplog.records]
print("\n Captured caplog records - ", captured_logs)
# Define the expected log messages
# - error request, falling back notice, success notice
expected_logs = [
"litellm.acompletion(model=gpt-3.5-turbo)\x1b[31m Exception OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: bad-key. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}\x1b[0m",
"Falling back to model_group = azure/gpt-3.5-turbo",
"litellm.acompletion(model=azure/chatgpt-v-2)\x1b[32m 200 OK\x1b[0m",
]
# Assert that the captured logs match the expected log messages
assert captured_logs == expected_logs