refactor(utils.py): refactor Logging to it's own class. Cut down utils.py to <10k lines.

Easier debugging

 Reference: https://github.com/BerriAI/litellm/issues/4206
This commit is contained in:
Krrish Dholakia 2024-06-15 10:57:20 -07:00
parent 290bcc09e0
commit 4f91205530
20 changed files with 4517 additions and 2963 deletions

View file

@ -24,10 +24,10 @@ repos:
language: system
types: [python]
files: ^litellm/
# - id: check-file-length
# name: Check file length
# entry: python check_file_length.py
# args: ["10000"] # set your desired maximum number of lines
# language: python
# files: litellm/.*\.py
# exclude: ^litellm/tests/
- id: check-file-length
name: Check file length
entry: python check_file_length.py
args: ["10000"] # set your desired maximum number of lines
language: python
files: litellm/.*\.py
exclude: ^litellm/tests/

View file

@ -723,12 +723,10 @@ from .utils import (
token_counter,
create_pretrained_tokenizer,
create_tokenizer,
cost_per_token,
supports_function_calling,
supports_parallel_function_calling,
supports_vision,
get_litellm_params,
Logging,
acreate,
get_model_list,
get_max_tokens,
@ -748,9 +746,10 @@ from .utils import (
get_first_chars_messages,
ModelResponse,
ImageResponse,
ImageObject,
get_provider_fields,
)
from .types.utils import ImageObject
from .llms.huggingface_restapi import HuggingfaceConfig
from .llms.anthropic import AnthropicConfig
from .llms.databricks import DatabricksConfig, DatabricksEmbeddingConfig
@ -827,4 +826,5 @@ from .router import Router
from .assistants.main import *
from .batches.main import *
from .scheduler import *
from .cost_calculator import response_cost_calculator
from .cost_calculator import response_cost_calculator, cost_per_token
from litellm.litellm_core_utils.litellm_logging import Logging

View file

@ -3,10 +3,17 @@ from logging import Formatter
import traceback
set_verbose = False
if set_verbose is True:
logging.warning(
"`litellm.set_verbose` is deprecated. Please set `os.environ['LITELLM_LOG'] = 'DEBUG'` for debug logs."
)
json_logs = bool(os.getenv("JSON_LOGS", False))
# Create a handler for the logger (you may need to adapt this based on your needs)
log_level = os.getenv("LITELLM_LOG", "ERROR")
numeric_level: str = getattr(logging, log_level.upper())
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setLevel(numeric_level)
class JsonFormatter(Formatter):

View file

@ -1,6 +1,6 @@
# What is this?
## File for 'response_cost' calculation in Logging
from typing import Optional, Union, Literal, List
from typing import Optional, Union, Literal, List, Tuple
import litellm._logging
from litellm.utils import (
ModelResponse,
@ -9,7 +9,6 @@ from litellm.utils import (
TranscriptionResponse,
TextCompletionResponse,
CallTypes,
cost_per_token,
print_verbose,
CostPerToken,
token_counter,
@ -18,6 +17,224 @@ import litellm
from litellm import verbose_logger
def _cost_per_token_custom_pricing_helper(
prompt_tokens=0,
completion_tokens=0,
response_time_ms=None,
### CUSTOM PRICING ###
custom_cost_per_token: Optional[CostPerToken] = None,
custom_cost_per_second: Optional[float] = None,
) -> Optional[Tuple[float, float]]:
"""Internal helper function for calculating cost, if custom pricing given"""
if custom_cost_per_token is None and custom_cost_per_second is None:
return None
if custom_cost_per_token is not None:
input_cost = custom_cost_per_token["input_cost_per_token"] * prompt_tokens
output_cost = custom_cost_per_token["output_cost_per_token"] * completion_tokens
return input_cost, output_cost
elif custom_cost_per_second is not None:
output_cost = custom_cost_per_second * response_time_ms / 1000 # type: ignore
return 0, output_cost
return None
def cost_per_token(
model: str = "",
prompt_tokens=0,
completion_tokens=0,
response_time_ms=None,
custom_llm_provider=None,
region_name=None,
### CUSTOM PRICING ###
custom_cost_per_token: Optional[CostPerToken] = None,
custom_cost_per_second: Optional[float] = None,
) -> Tuple[float, float]:
"""
Calculates the cost per token for a given model, prompt tokens, and completion tokens.
Parameters:
model (str): The name of the model to use. Default is ""
prompt_tokens (int): The number of tokens in the prompt.
completion_tokens (int): The number of tokens in the completion.
response_time (float): The amount of time, in milliseconds, it took the call to complete.
custom_llm_provider (str): The llm provider to whom the call was made (see init.py for full list)
custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call.
custom_cost_per_second: Optional[float]: the cost per second for the llm api call.
Returns:
tuple: A tuple containing the cost in USD dollars for prompt tokens and completion tokens, respectively.
"""
if model is None:
raise Exception("Invalid arg. Model cannot be none.")
## CUSTOM PRICING ##
response_cost = _cost_per_token_custom_pricing_helper(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
response_time_ms=response_time_ms,
custom_cost_per_second=custom_cost_per_second,
custom_cost_per_token=custom_cost_per_token,
)
if response_cost is not None:
return response_cost[0], response_cost[1]
# given
prompt_tokens_cost_usd_dollar: float = 0
completion_tokens_cost_usd_dollar: float = 0
model_cost_ref = litellm.model_cost
model_with_provider = model
if custom_llm_provider is not None:
model_with_provider = custom_llm_provider + "/" + model
if region_name is not None:
model_with_provider_and_region = (
f"{custom_llm_provider}/{region_name}/{model}"
)
if (
model_with_provider_and_region in model_cost_ref
): # use region based pricing, if it's available
model_with_provider = model_with_provider_and_region
model_without_prefix = model
model_parts = model.split("/")
if len(model_parts) > 1:
model_without_prefix = model_parts[1]
else:
model_without_prefix = model
"""
Code block that formats model to lookup in litellm.model_cost
Option1. model = "bedrock/ap-northeast-1/anthropic.claude-instant-v1". This is the most accurate since it is region based. Should always be option 1
Option2. model = "openai/gpt-4" - model = provider/model
Option3. model = "anthropic.claude-3" - model = model
"""
if (
model_with_provider in model_cost_ref
): # Option 2. use model with provider, model = "openai/gpt-4"
model = model_with_provider
elif model in model_cost_ref: # Option 1. use model passed, model="gpt-4"
model = model
elif (
model_without_prefix in model_cost_ref
): # Option 3. if user passed model="bedrock/anthropic.claude-3", use model="anthropic.claude-3"
model = model_without_prefix
# see this https://learn.microsoft.com/en-us/azure/ai-services/openai/concepts/models
print_verbose(f"Looking up model={model} in model_cost_map")
if model in model_cost_ref:
print_verbose(f"Success: model={model} in model_cost_map")
print_verbose(
f"prompt_tokens={prompt_tokens}; completion_tokens={completion_tokens}"
)
if (
model_cost_ref[model].get("input_cost_per_token", None) is not None
and model_cost_ref[model].get("output_cost_per_token", None) is not None
):
## COST PER TOKEN ##
prompt_tokens_cost_usd_dollar = (
model_cost_ref[model]["input_cost_per_token"] * prompt_tokens
)
completion_tokens_cost_usd_dollar = (
model_cost_ref[model]["output_cost_per_token"] * completion_tokens
)
elif (
model_cost_ref[model].get("output_cost_per_second", None) is not None
and response_time_ms is not None
):
print_verbose(
f"For model={model} - output_cost_per_second: {model_cost_ref[model].get('output_cost_per_second')}; response time: {response_time_ms}"
)
## COST PER SECOND ##
prompt_tokens_cost_usd_dollar = 0
completion_tokens_cost_usd_dollar = (
model_cost_ref[model]["output_cost_per_second"]
* response_time_ms
/ 1000
)
elif (
model_cost_ref[model].get("input_cost_per_second", None) is not None
and response_time_ms is not None
):
print_verbose(
f"For model={model} - input_cost_per_second: {model_cost_ref[model].get('input_cost_per_second')}; response time: {response_time_ms}"
)
## COST PER SECOND ##
prompt_tokens_cost_usd_dollar = (
model_cost_ref[model]["input_cost_per_second"] * response_time_ms / 1000
)
completion_tokens_cost_usd_dollar = 0.0
print_verbose(
f"Returned custom cost for model={model} - prompt_tokens_cost_usd_dollar: {prompt_tokens_cost_usd_dollar}, completion_tokens_cost_usd_dollar: {completion_tokens_cost_usd_dollar}"
)
return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
elif "ft:gpt-3.5-turbo" in model:
print_verbose(f"Cost Tracking: {model} is an OpenAI FinteTuned LLM")
# fuzzy match ft:gpt-3.5-turbo:abcd-id-cool-litellm
prompt_tokens_cost_usd_dollar = (
model_cost_ref["ft:gpt-3.5-turbo"]["input_cost_per_token"] * prompt_tokens
)
completion_tokens_cost_usd_dollar = (
model_cost_ref["ft:gpt-3.5-turbo"]["output_cost_per_token"]
* completion_tokens
)
return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
elif "ft:davinci-002" in model:
print_verbose(f"Cost Tracking: {model} is an OpenAI FinteTuned LLM")
# fuzzy match ft:davinci-002:abcd-id-cool-litellm
prompt_tokens_cost_usd_dollar = (
model_cost_ref["ft:davinci-002"]["input_cost_per_token"] * prompt_tokens
)
completion_tokens_cost_usd_dollar = (
model_cost_ref["ft:davinci-002"]["output_cost_per_token"]
* completion_tokens
)
return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
elif "ft:babbage-002" in model:
print_verbose(f"Cost Tracking: {model} is an OpenAI FinteTuned LLM")
# fuzzy match ft:babbage-002:abcd-id-cool-litellm
prompt_tokens_cost_usd_dollar = (
model_cost_ref["ft:babbage-002"]["input_cost_per_token"] * prompt_tokens
)
completion_tokens_cost_usd_dollar = (
model_cost_ref["ft:babbage-002"]["output_cost_per_token"]
* completion_tokens
)
return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
elif model in litellm.azure_llms:
verbose_logger.debug(f"Cost Tracking: {model} is an Azure LLM")
model = litellm.azure_llms[model]
verbose_logger.debug(
f"applying cost={model_cost_ref[model]['input_cost_per_token']} for prompt_tokens={prompt_tokens}"
)
prompt_tokens_cost_usd_dollar = (
model_cost_ref[model]["input_cost_per_token"] * prompt_tokens
)
verbose_logger.debug(
f"applying cost={model_cost_ref[model]['output_cost_per_token']} for completion_tokens={completion_tokens}"
)
completion_tokens_cost_usd_dollar = (
model_cost_ref[model]["output_cost_per_token"] * completion_tokens
)
return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
elif model in litellm.azure_embedding_models:
verbose_logger.debug(f"Cost Tracking: {model} is an Azure Embedding Model")
model = litellm.azure_embedding_models[model]
prompt_tokens_cost_usd_dollar = (
model_cost_ref[model]["input_cost_per_token"] * prompt_tokens
)
completion_tokens_cost_usd_dollar = (
model_cost_ref[model]["output_cost_per_token"] * completion_tokens
)
return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
else:
# if model is not in model_prices_and_context_window.json. Raise an exception-let users know
error_str = f"Model not in model_prices_and_context_window.json. You passed model={model}. Register pricing for model - https://docs.litellm.ai/docs/proxy/custom_pricing\n"
raise litellm.exceptions.NotFoundError( # type: ignore
message=error_str,
model=model,
llm_provider="",
)
# Extract the number of billion parameters from the model name
# only used for together_computer LLMs
def get_model_params_and_category(model_name) -> str:

View file

@ -0,0 +1,41 @@
# What is this?
## Helper utilities for the model response objects
def map_finish_reason(
finish_reason: str,
): # openai supports 5 stop sequences - 'stop', 'length', 'function_call', 'content_filter', 'null'
# anthropic mapping
if finish_reason == "stop_sequence":
return "stop"
# cohere mapping - https://docs.cohere.com/reference/generate
elif finish_reason == "COMPLETE":
return "stop"
elif finish_reason == "MAX_TOKENS": # cohere + vertex ai
return "length"
elif finish_reason == "ERROR_TOXIC":
return "content_filter"
elif (
finish_reason == "ERROR"
): # openai currently doesn't support an 'error' finish reason
return "stop"
# huggingface mapping https://huggingface.github.io/text-generation-inference/#/Text%20Generation%20Inference/generate_stream
elif finish_reason == "eos_token" or finish_reason == "stop_sequence":
return "stop"
elif (
finish_reason == "FINISH_REASON_UNSPECIFIED" or finish_reason == "STOP"
): # vertex ai - got from running `print(dir(response_obj.candidates[0].finish_reason))`: ['FINISH_REASON_UNSPECIFIED', 'MAX_TOKENS', 'OTHER', 'RECITATION', 'SAFETY', 'STOP',]
return "stop"
elif finish_reason == "SAFETY": # vertex ai
return "content_filter"
elif finish_reason == "STOP": # vertex ai
return "stop"
elif finish_reason == "end_turn" or finish_reason == "stop_sequence": # anthropic
return "stop"
elif finish_reason == "max_tokens": # anthropic
return "length"
elif finish_reason == "tool_use": # anthropic
return "tool_calls"
elif finish_reason == "content_filtered":
return "content_filter"
return finish_reason

File diff suppressed because it is too large Load diff

View file

@ -12,7 +12,9 @@ from typing import TYPE_CHECKING, Any
import litellm
if TYPE_CHECKING:
from litellm.utils import Logging as _LiteLLMLoggingObject
from litellm.litellm_core_utils.litellm_logging import (
Logging as _LiteLLMLoggingObject,
)
LiteLLMLoggingObject = _LiteLLMLoggingObject
else:

View file

@ -5,7 +5,9 @@ import requests, copy # type: ignore
import time
from functools import partial
from typing import Callable, Optional, List, Union
from litellm.utils import ModelResponse, Usage, map_finish_reason, CustomStreamWrapper
import litellm.litellm_core_utils
from litellm.utils import ModelResponse, Usage, CustomStreamWrapper
from litellm.litellm_core_utils.core_helpers import map_finish_reason
import litellm
from .prompt_templates.factory import prompt_factory, custom_prompt
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
@ -201,7 +203,7 @@ class AnthropicChatCompletion(BaseLLM):
response: Union[requests.Response, httpx.Response],
model_response: ModelResponse,
stream: bool,
logging_obj: litellm.utils.Logging,
logging_obj: litellm.litellm_core_utils.litellm_logging.Logging,
optional_params: dict,
api_key: str,
data: Union[dict, str],
@ -316,7 +318,7 @@ class AnthropicChatCompletion(BaseLLM):
response: Union[requests.Response, httpx.Response],
model_response: ModelResponse,
stream: bool,
logging_obj: litellm.utils.Logging,
logging_obj: litellm.litellm_core_utils.litellm_logging.Logging,
optional_params: dict,
api_key: str,
data: Union[dict, str],

View file

@ -2,7 +2,7 @@
import litellm
import httpx, requests
from typing import Optional, Union
from litellm.utils import Logging
from litellm.litellm_core_utils.litellm_logging import Logging
class BaseLLM:

View file

@ -5,12 +5,10 @@ import time, uuid
from typing import Callable, Optional, Any, Union, List
import litellm
from litellm.utils import (
ModelResponse,
get_secret,
Usage,
ImageResponse,
map_finish_reason,
)
from litellm.litellm_core_utils.model_response_helpers import map_finish_reason
from litellm.types.utils import ImageResponse, ModelResponse, Usage
from .prompt_templates.factory import (
prompt_factory,
custom_prompt,
@ -633,7 +631,11 @@ def init_bedrock_client(
config = boto3.session.Config()
### CHECK STS ###
if aws_web_identity_token is not None and aws_role_name is not None and aws_session_name is not None:
if (
aws_web_identity_token is not None
and aws_role_name is not None
and aws_session_name is not None
):
oidc_token = get_secret(aws_web_identity_token)
if oidc_token is None:
@ -642,9 +644,7 @@ def init_bedrock_client(
status_code=401,
)
sts_client = boto3.client(
"sts"
)
sts_client = boto3.client("sts")
# https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts/client/assume_role_with_web_identity.html

View file

@ -22,13 +22,12 @@ from typing import (
from litellm.utils import (
ModelResponse,
Usage,
map_finish_reason,
CustomStreamWrapper,
Message,
Choices,
get_secret,
Logging,
)
from litellm.litellm_core_utils.core_helpers import map_finish_reason
from litellm.litellm_core_utils.litellm_logging import Logging
from litellm.types.utils import Message, Choices
import litellm, uuid
from .prompt_templates.factory import (
prompt_factory,
@ -57,6 +56,7 @@ from litellm.caching import DualCache
iam_cache = DualCache()
class AmazonCohereChatConfig:
"""
Reference - https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-cohere-command-r-plus.html
@ -327,13 +327,19 @@ class BedrockLLM(BaseLLM):
) = params_to_check
### CHECK STS ###
if aws_web_identity_token is not None and aws_role_name is not None and aws_session_name is not None:
iam_creds_cache_key = json.dumps({
"aws_web_identity_token": aws_web_identity_token,
"aws_role_name": aws_role_name,
"aws_session_name": aws_session_name,
"aws_region_name": aws_region_name,
})
if (
aws_web_identity_token is not None
and aws_role_name is not None
and aws_session_name is not None
):
iam_creds_cache_key = json.dumps(
{
"aws_web_identity_token": aws_web_identity_token,
"aws_role_name": aws_role_name,
"aws_session_name": aws_session_name,
"aws_region_name": aws_region_name,
}
)
iam_creds_dict = iam_cache.get_cache(iam_creds_cache_key)
if iam_creds_dict is None:
@ -348,7 +354,7 @@ class BedrockLLM(BaseLLM):
sts_client = boto3.client(
"sts",
region_name=aws_region_name,
endpoint_url=f"https://sts.{aws_region_name}.amazonaws.com"
endpoint_url=f"https://sts.{aws_region_name}.amazonaws.com",
)
# https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html
@ -362,12 +368,18 @@ class BedrockLLM(BaseLLM):
iam_creds_dict = {
"aws_access_key_id": sts_response["Credentials"]["AccessKeyId"],
"aws_secret_access_key": sts_response["Credentials"]["SecretAccessKey"],
"aws_secret_access_key": sts_response["Credentials"][
"SecretAccessKey"
],
"aws_session_token": sts_response["Credentials"]["SessionToken"],
"region_name": aws_region_name,
}
iam_cache.set_cache(key=iam_creds_cache_key, value=json.dumps(iam_creds_dict), ttl=3600 - 60)
iam_cache.set_cache(
key=iam_creds_cache_key,
value=json.dumps(iam_creds_dict),
ttl=3600 - 60,
)
session = boto3.Session(**iam_creds_dict)
@ -1433,13 +1445,19 @@ class BedrockConverseLLM(BaseLLM):
) = params_to_check
### CHECK STS ###
if aws_web_identity_token is not None and aws_role_name is not None and aws_session_name is not None:
iam_creds_cache_key = json.dumps({
"aws_web_identity_token": aws_web_identity_token,
"aws_role_name": aws_role_name,
"aws_session_name": aws_session_name,
"aws_region_name": aws_region_name,
})
if (
aws_web_identity_token is not None
and aws_role_name is not None
and aws_session_name is not None
):
iam_creds_cache_key = json.dumps(
{
"aws_web_identity_token": aws_web_identity_token,
"aws_role_name": aws_role_name,
"aws_session_name": aws_session_name,
"aws_region_name": aws_region_name,
}
)
iam_creds_dict = iam_cache.get_cache(iam_creds_cache_key)
if iam_creds_dict is None:
@ -1454,7 +1472,7 @@ class BedrockConverseLLM(BaseLLM):
sts_client = boto3.client(
"sts",
region_name=aws_region_name,
endpoint_url=f"https://sts.{aws_region_name}.amazonaws.com"
endpoint_url=f"https://sts.{aws_region_name}.amazonaws.com",
)
# https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html
@ -1468,12 +1486,18 @@ class BedrockConverseLLM(BaseLLM):
iam_creds_dict = {
"aws_access_key_id": sts_response["Credentials"]["AccessKeyId"],
"aws_secret_access_key": sts_response["Credentials"]["SecretAccessKey"],
"aws_secret_access_key": sts_response["Credentials"][
"SecretAccessKey"
],
"aws_session_token": sts_response["Credentials"]["SessionToken"],
"region_name": aws_region_name,
}
iam_cache.set_cache(key=iam_creds_cache_key, value=json.dumps(iam_creds_dict), ttl=3600 - 60)
iam_cache.set_cache(
key=iam_creds_cache_key,
value=json.dumps(iam_creds_dict),
ttl=3600 - 60,
)
session = boto3.Session(**iam_creds_dict)

View file

@ -10,10 +10,10 @@ from typing import Callable, Optional, List, Union, Tuple, Literal
from litellm.utils import (
ModelResponse,
Usage,
map_finish_reason,
CustomStreamWrapper,
EmbeddingResponse,
)
from litellm.litellm_core_utils.core_helpers import map_finish_reason
import litellm
from .prompt_templates.factory import prompt_factory, custom_prompt
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
@ -289,7 +289,7 @@ class DatabricksChatCompletion(BaseLLM):
response: Union[requests.Response, httpx.Response],
model_response: ModelResponse,
stream: bool,
logging_obj: litellm.utils.Logging,
logging_obj: litellm.litellm_core_utils.litellm_logging.Logging,
optional_params: dict,
api_key: str,
data: Union[dict, str],

View file

@ -12,11 +12,11 @@ from typing import Callable, Optional, List, Literal, Union
from litellm.utils import (
ModelResponse,
Usage,
map_finish_reason,
CustomStreamWrapper,
Message,
Choices,
)
from litellm.litellm_core_utils.core_helpers import map_finish_reason
import litellm
from .prompt_templates.factory import prompt_factory, custom_prompt
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
@ -198,7 +198,7 @@ class PredibaseChatCompletion(BaseLLM):
response: Union[requests.Response, httpx.Response],
model_response: ModelResponse,
stream: bool,
logging_obj: litellm.utils.Logging,
logging_obj: litellm.litellm_core_utils.litellm_logging.Logging,
optional_params: dict,
api_key: str,
data: Union[dict, str],

View file

@ -4,7 +4,6 @@ from enum import Enum
import requests, copy # type: ignore
import time
from typing import Callable, Optional, List
from litellm.utils import ModelResponse, Usage, map_finish_reason, CustomStreamWrapper
import litellm
from .prompt_templates.factory import prompt_factory, custom_prompt
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler

View file

@ -5,7 +5,8 @@ import requests # type: ignore
import time
from typing import Callable, Optional, Union, List, Literal, Any
from pydantic import BaseModel
from litellm.utils import ModelResponse, Usage, CustomStreamWrapper, map_finish_reason
from litellm.utils import ModelResponse, Usage, CustomStreamWrapper
from litellm.litellm_core_utils.model_response_helpers import map_finish_reason
import litellm, uuid
import httpx, inspect # type: ignore
from litellm.types.llms.vertex_ai import *

View file

@ -6,7 +6,8 @@ from enum import Enum
import requests, copy # type: ignore
import time, uuid
from typing import Callable, Optional, List
from litellm.utils import ModelResponse, Usage, map_finish_reason, CustomStreamWrapper
from litellm.utils import ModelResponse, Usage, CustomStreamWrapper
from litellm.litellm_core_utils.model_response_helpers import map_finish_reason
import litellm
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
from .prompt_templates.factory import (

View file

@ -8,7 +8,10 @@ from enum import Enum
import requests # type: ignore
import time
from typing import Callable, Optional, Union, List, Any, Tuple
from litellm.utils import ModelResponse, Usage, CustomStreamWrapper, map_finish_reason
import litellm.litellm_core_utils
import litellm.litellm_core_utils.litellm_logging
from litellm.utils import ModelResponse, Usage, CustomStreamWrapper
from litellm.litellm_core_utils.core_helpers import map_finish_reason
import litellm, uuid
import httpx, inspect # type: ignore
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
@ -320,7 +323,7 @@ class VertexLLM(BaseLLM):
model: str,
response: httpx.Response,
model_response: ModelResponse,
logging_obj: litellm.utils.Logging,
logging_obj: litellm.litellm_core_utils.litellm_logging.Logging,
optional_params: dict,
api_key: str,
data: Union[dict, str],

View file

@ -12,6 +12,8 @@ import litellm
import backoff
import traceback
from pydantic import BaseModel
import litellm.litellm_core_utils
import litellm.litellm_core_utils.litellm_logging
from litellm.proxy._types import (
UserAPIKeyAuth,
DynamoDBArgs,
@ -331,7 +333,9 @@ class ProxyLogging:
return data
except Exception as e:
if "litellm_logging_obj" in data:
logging_obj: litellm.utils.Logging = data["litellm_logging_obj"]
logging_obj: litellm.litellm_core_utils.litellm_logging.Logging = data[
"litellm_logging_obj"
]
## ASYNC FAILURE HANDLER ##
error_message = ""

View file

@ -3,6 +3,15 @@ from typing_extensions import TypedDict
from enum import Enum
from typing_extensions import override, Required, Dict
from .llms.openai import ChatCompletionUsageBlock, ChatCompletionToolCallChunk
from ..litellm_core_utils.model_response_helpers import map_finish_reason
from openai._models import BaseModel as OpenAIObject
from pydantic import ConfigDict
import uuid
import json
def _generate_id(): # private helper function
return "chatcmpl-" + str(uuid.uuid4())
class LiteLLMCommonStrings(Enum):
@ -48,3 +57,904 @@ class GenericStreamingChunk(TypedDict):
finish_reason: Required[str]
usage: Optional[ChatCompletionUsageBlock]
index: int
from enum import Enum
class CallTypes(Enum):
embedding = "embedding"
aembedding = "aembedding"
completion = "completion"
acompletion = "acompletion"
atext_completion = "atext_completion"
text_completion = "text_completion"
image_generation = "image_generation"
aimage_generation = "aimage_generation"
moderation = "moderation"
amoderation = "amoderation"
atranscription = "atranscription"
transcription = "transcription"
aspeech = "aspeech"
speech = "speech"
class TopLogprob(OpenAIObject):
token: str
"""The token."""
bytes: Optional[List[int]] = None
"""A list of integers representing the UTF-8 bytes representation of the token.
Useful in instances where characters are represented by multiple tokens and
their byte representations must be combined to generate the correct text
representation. Can be `null` if there is no bytes representation for the token.
"""
logprob: float
"""The log probability of this token, if it is within the top 20 most likely
tokens.
Otherwise, the value `-9999.0` is used to signify that the token is very
unlikely.
"""
class ChatCompletionTokenLogprob(OpenAIObject):
token: str
"""The token."""
bytes: Optional[List[int]] = None
"""A list of integers representing the UTF-8 bytes representation of the token.
Useful in instances where characters are represented by multiple tokens and
their byte representations must be combined to generate the correct text
representation. Can be `null` if there is no bytes representation for the token.
"""
logprob: float
"""The log probability of this token, if it is within the top 20 most likely
tokens.
Otherwise, the value `-9999.0` is used to signify that the token is very
unlikely.
"""
top_logprobs: List[TopLogprob]
"""List of the most likely tokens and their log probability, at this token
position.
In rare cases, there may be fewer than the number of requested `top_logprobs`
returned.
"""
class ChoiceLogprobs(OpenAIObject):
content: Optional[List[ChatCompletionTokenLogprob]] = None
"""A list of message content tokens with log probability information."""
class FunctionCall(OpenAIObject):
arguments: str
name: Optional[str] = None
class Function(OpenAIObject):
arguments: str
name: Optional[str] = None
def __init__(
self,
arguments: Union[Dict, str],
name: Optional[str] = None,
**params,
):
if isinstance(arguments, Dict):
arguments = json.dumps(arguments)
else:
arguments = arguments
name = name
# Build a dictionary with the structure your BaseModel expects
data = {"arguments": arguments, "name": name, **params}
super(Function, self).__init__(**data)
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class ChatCompletionDeltaToolCall(OpenAIObject):
id: Optional[str] = None
function: Function
type: Optional[str] = None
index: int
class HiddenParams(OpenAIObject):
original_response: Optional[str] = None
model_id: Optional[str] = None # used in Router for individual deployments
api_base: Optional[str] = None # returns api base used for making completion call
model_config = ConfigDict(extra="allow", protected_namespaces=())
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class ChatCompletionMessageToolCall(OpenAIObject):
def __init__(
self,
function: Union[Dict, Function],
id: Optional[str] = None,
type: Optional[str] = None,
**params,
):
super(ChatCompletionMessageToolCall, self).__init__(**params)
if isinstance(function, Dict):
self.function = Function(**function)
else:
self.function = function
if id is not None:
self.id = id
else:
self.id = f"{uuid.uuid4()}"
if type is not None:
self.type = type
else:
self.type = "function"
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class Message(OpenAIObject):
def __init__(
self,
content: Optional[str] = "default",
role="assistant",
logprobs=None,
function_call=None,
tool_calls=None,
**params,
):
super(Message, self).__init__(**params)
self.content = content
self.role = role
if function_call is not None:
self.function_call = FunctionCall(**function_call)
if tool_calls is not None:
self.tool_calls = []
for tool_call in tool_calls:
self.tool_calls.append(ChatCompletionMessageToolCall(**tool_call))
if logprobs is not None:
self._logprobs = ChoiceLogprobs(**logprobs)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class Delta(OpenAIObject):
def __init__(
self,
content=None,
role=None,
function_call=None,
tool_calls=None,
**params,
):
super(Delta, self).__init__(**params)
self.content = content
self.role = role
if function_call is not None and isinstance(function_call, dict):
self.function_call = FunctionCall(**function_call)
else:
self.function_call = function_call
if tool_calls is not None and isinstance(tool_calls, list):
self.tool_calls = []
for tool_call in tool_calls:
if isinstance(tool_call, dict):
if tool_call.get("index", None) is None:
tool_call["index"] = 0
self.tool_calls.append(ChatCompletionDeltaToolCall(**tool_call))
elif isinstance(tool_call, ChatCompletionDeltaToolCall):
self.tool_calls.append(tool_call)
else:
self.tool_calls = tool_calls
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class Choices(OpenAIObject):
def __init__(
self,
finish_reason=None,
index=0,
message: Optional[Union[Message, dict]] = None,
logprobs=None,
enhancements=None,
**params,
):
super(Choices, self).__init__(**params)
if finish_reason is not None:
self.finish_reason = map_finish_reason(
finish_reason
) # set finish_reason for all responses
else:
self.finish_reason = "stop"
self.index = index
if message is None:
self.message = Message()
else:
if isinstance(message, Message):
self.message = message
elif isinstance(message, dict):
self.message = Message(**message)
if logprobs is not None:
self.logprobs = logprobs
if enhancements is not None:
self.enhancements = enhancements
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class Usage(OpenAIObject):
def __init__(
self, prompt_tokens=None, completion_tokens=None, total_tokens=None, **params
):
super(Usage, self).__init__(**params)
if prompt_tokens:
self.prompt_tokens = prompt_tokens
if completion_tokens:
self.completion_tokens = completion_tokens
if total_tokens:
self.total_tokens = total_tokens
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class StreamingChoices(OpenAIObject):
def __init__(
self,
finish_reason=None,
index=0,
delta: Optional[Delta] = None,
logprobs=None,
enhancements=None,
**params,
):
super(StreamingChoices, self).__init__(**params)
if finish_reason:
self.finish_reason = finish_reason
else:
self.finish_reason = None
self.index = index
if delta is not None:
if isinstance(delta, Delta):
self.delta = delta
elif isinstance(delta, dict):
self.delta = Delta(**delta)
else:
self.delta = Delta()
if enhancements is not None:
self.enhancements = enhancements
if logprobs is not None and isinstance(logprobs, dict):
self.logprobs = ChoiceLogprobs(**logprobs)
else:
self.logprobs = logprobs # type: ignore
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class ModelResponse(OpenAIObject):
id: str
"""A unique identifier for the completion."""
choices: List[Union[Choices, StreamingChoices]]
"""The list of completion choices the model generated for the input prompt."""
created: int
"""The Unix timestamp (in seconds) of when the completion was created."""
model: Optional[str] = None
"""The model used for completion."""
object: str
"""The object type, which is always "text_completion" """
system_fingerprint: Optional[str] = None
"""This fingerprint represents the backend configuration that the model runs with.
Can be used in conjunction with the `seed` request parameter to understand when
backend changes have been made that might impact determinism.
"""
_hidden_params: dict = {}
def __init__(
self,
id=None,
choices=None,
created=None,
model=None,
object=None,
system_fingerprint=None,
usage=None,
stream=None,
stream_options=None,
response_ms=None,
hidden_params=None,
**params,
):
if stream is not None and stream is True:
object = "chat.completion.chunk"
if choices is not None and isinstance(choices, list):
new_choices = []
for choice in choices:
if isinstance(choice, StreamingChoices):
_new_choice = choice
elif isinstance(choice, dict):
_new_choice = StreamingChoices(**choice)
new_choices.append(_new_choice)
choices = new_choices
else:
choices = [StreamingChoices()]
else:
object = "chat.completion"
if choices is not None and isinstance(choices, list):
new_choices = []
for choice in choices:
if isinstance(choice, Choices):
_new_choice = choice
elif isinstance(choice, dict):
_new_choice = Choices(**choice)
new_choices.append(_new_choice)
choices = new_choices
else:
choices = [Choices()]
if id is None:
id = _generate_id()
else:
id = id
if created is None:
created = int(time.time())
else:
created = created
model = model
if usage is not None:
if isinstance(usage, dict):
usage = Usage(**usage)
else:
usage = usage
elif stream is None or stream is False:
usage = Usage()
if hidden_params:
self._hidden_params = hidden_params
init_values = {
"id": id,
"choices": choices,
"created": created,
"model": model,
"object": object,
"system_fingerprint": system_fingerprint,
}
if usage is not None:
init_values["usage"] = usage
super().__init__(
**init_values,
**params,
)
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class Embedding(OpenAIObject):
embedding: Union[list, str] = []
index: int
object: str
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class EmbeddingResponse(OpenAIObject):
model: Optional[str] = None
"""The model used for embedding."""
data: Optional[List] = None
"""The actual embedding value"""
object: str
"""The object type, which is always "embedding" """
usage: Optional[Usage] = None
"""Usage statistics for the embedding request."""
_hidden_params: dict = {}
def __init__(
self,
model=None,
usage=None,
stream=False,
response_ms=None,
data=None,
**params,
):
object = "list"
if response_ms:
_response_ms = response_ms
else:
_response_ms = None
if data:
data = data
else:
data = None
if usage:
usage = usage
else:
usage = Usage()
model = model
super().__init__(model=model, object=object, data=data, usage=usage)
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class Logprobs(OpenAIObject):
text_offset: List[int]
token_logprobs: List[float]
tokens: List[str]
top_logprobs: List[Dict[str, float]]
class TextChoices(OpenAIObject):
def __init__(self, finish_reason=None, index=0, text=None, logprobs=None, **params):
super(TextChoices, self).__init__(**params)
if finish_reason:
self.finish_reason = map_finish_reason(finish_reason)
else:
self.finish_reason = None
self.index = index
if text is not None:
self.text = text
else:
self.text = None
if logprobs is None:
self.logprobs = None
else:
if isinstance(logprobs, dict):
self.logprobs = Logprobs(**logprobs)
else:
self.logprobs = logprobs
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class TextCompletionResponse(OpenAIObject):
"""
{
"id": response["id"],
"object": "text_completion",
"created": response["created"],
"model": response["model"],
"choices": [
{
"text": response["choices"][0]["message"]["content"],
"index": response["choices"][0]["index"],
"logprobs": transformed_logprobs,
"finish_reason": response["choices"][0]["finish_reason"]
}
],
"usage": response["usage"]
}
"""
id: str
object: str
created: int
model: Optional[str]
choices: List[TextChoices]
usage: Optional[Usage]
_response_ms: Optional[int] = None
_hidden_params: HiddenParams
def __init__(
self,
id=None,
choices=None,
created=None,
model=None,
usage=None,
stream=False,
response_ms=None,
object=None,
**params,
):
if stream:
object = "text_completion.chunk"
choices = [TextChoices()]
else:
object = "text_completion"
if choices is not None and isinstance(choices, list):
new_choices = []
for choice in choices:
if isinstance(choice, TextChoices):
_new_choice = choice
elif isinstance(choice, dict):
_new_choice = TextChoices(**choice)
new_choices.append(_new_choice)
choices = new_choices
else:
choices = [TextChoices()]
if object is not None:
object = object
if id is None:
id = _generate_id()
else:
id = id
if created is None:
created = int(time.time())
else:
created = created
model = model
if usage:
usage = usage
else:
usage = Usage()
super(TextCompletionResponse, self).__init__(
id=id,
object=object,
created=created,
model=model,
choices=choices,
usage=usage,
**params,
)
if response_ms:
self._response_ms = response_ms
else:
self._response_ms = None
self._hidden_params = HiddenParams()
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
class ImageObject(OpenAIObject):
"""
Represents the url or the content of an image generated by the OpenAI API.
Attributes:
b64_json: The base64-encoded JSON of the generated image, if response_format is b64_json.
url: The URL of the generated image, if response_format is url (default).
revised_prompt: The prompt that was used to generate the image, if there was any revision to the prompt.
https://platform.openai.com/docs/api-reference/images/object
"""
b64_json: Optional[str] = None
url: Optional[str] = None
revised_prompt: Optional[str] = None
def __init__(self, b64_json=None, url=None, revised_prompt=None):
super().__init__(b64_json=b64_json, url=url, revised_prompt=revised_prompt)
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class ImageResponse(OpenAIObject):
created: Optional[int] = None
data: Optional[List[ImageObject]] = None
usage: Optional[dict] = None
_hidden_params: dict = {}
def __init__(self, created=None, data=None, response_ms=None):
if response_ms:
_response_ms = response_ms
else:
_response_ms = None
if data:
data = data
else:
data = None
if created:
created = created
else:
created = None
super().__init__(data=data, created=created)
self.usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class TranscriptionResponse(OpenAIObject):
text: Optional[str] = None
_hidden_params: dict = {}
def __init__(self, text=None):
super().__init__(text=text)
def __contains__(self, key):
# Define custom behavior for the 'in' operator
return hasattr(self, key)
def get(self, key, default=None):
# Custom .get() method to access attributes with a default value if the attribute doesn't exist
return getattr(self, key, default)
def __getitem__(self, key):
# Allow dictionary-style access to attributes
return getattr(self, key)
def __setitem__(self, key, value):
# Allow dictionary-style assignment of attributes
setattr(self, key, value)
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()

File diff suppressed because it is too large Load diff