Support Gemini audio token cost tracking + fix openai audio input token cost tracking (#9535)

* fix(vertex_and_google_ai_studio_gemini.py): log gemini audio tokens in usage object

enables accurate cost tracking

* refactor(vertex_ai/cost_calculator.py): refactor 128k+ token cost calculation to only run if model info has it

Google has moved away from this for gemini-2.0 models

* refactor(vertex_ai/cost_calculator.py): migrate to usage object for more flexible data passthrough

* fix(llm_cost_calc/utils.py): support audio token cost tracking in generic cost per token

enables vertex ai cost tracking to work with audio tokens

* fix(llm_cost_calc/utils.py): default to total prompt tokens if text tokens field not set

* refactor(llm_cost_calc/utils.py): move openai cost tracking to generic cost per token

more consistent behaviour across providers

* test: add unit test for gemini audio token cost calculation

* ci: bump ci config

* test: fix test
This commit is contained in:
Krish Dholakia 2025-03-26 17:26:25 -07:00 committed by GitHub
parent 04490c99d7
commit 4351c77253
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 253 additions and 117 deletions

View file

@ -275,15 +275,13 @@ def cost_per_token( # noqa: PLR0915
custom_llm_provider=custom_llm_provider,
prompt_characters=prompt_characters,
completion_characters=completion_characters,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
usage=usage_block,
)
elif cost_router == "cost_per_token":
return google_cost_per_token(
model=model_without_prefix,
custom_llm_provider=custom_llm_provider,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
usage=usage_block,
)
elif custom_llm_provider == "anthropic":
return anthropic_cost_per_token(model=model, usage=usage_block)

View file

@ -1,7 +1,7 @@
# What is this?
## Helper utilities for cost_per_token()
from typing import Optional, Tuple
from typing import Optional, Tuple, cast
import litellm
from litellm import verbose_logger
@ -143,26 +143,50 @@ def generic_cost_per_token(
### Cost of processing (non-cache hit + cache hit) + Cost of cache-writing (cache writing)
prompt_cost = 0.0
### PROCESSING COST
non_cache_hit_tokens = usage.prompt_tokens
text_tokens = usage.prompt_tokens
cache_hit_tokens = 0
if usage.prompt_tokens_details and usage.prompt_tokens_details.cached_tokens:
cache_hit_tokens = usage.prompt_tokens_details.cached_tokens
non_cache_hit_tokens = non_cache_hit_tokens - cache_hit_tokens
audio_tokens = 0
if usage.prompt_tokens_details:
cache_hit_tokens = (
cast(
Optional[int], getattr(usage.prompt_tokens_details, "cached_tokens", 0)
)
or 0
)
text_tokens = (
cast(
Optional[int], getattr(usage.prompt_tokens_details, "text_tokens", None)
)
or 0 # default to prompt tokens, if this field is not set
)
audio_tokens = (
cast(Optional[int], getattr(usage.prompt_tokens_details, "audio_tokens", 0))
or 0
)
## EDGE CASE - text tokens not set inside PromptTokensDetails
if text_tokens == 0:
text_tokens = usage.prompt_tokens - cache_hit_tokens - audio_tokens
prompt_base_cost = _get_prompt_token_base_cost(model_info=model_info, usage=usage)
prompt_cost = float(non_cache_hit_tokens) * prompt_base_cost
prompt_cost = float(text_tokens) * prompt_base_cost
_cache_read_input_token_cost = model_info.get("cache_read_input_token_cost")
### CACHE READ COST
if (
_cache_read_input_token_cost is not None
and usage.prompt_tokens_details
and usage.prompt_tokens_details.cached_tokens
and cache_hit_tokens is not None
and cache_hit_tokens > 0
):
prompt_cost += (
float(usage.prompt_tokens_details.cached_tokens)
* _cache_read_input_token_cost
)
prompt_cost += float(cache_hit_tokens) * _cache_read_input_token_cost
### AUDIO COST
audio_token_cost = model_info.get("input_cost_per_audio_token")
if audio_token_cost is not None and audio_tokens is not None and audio_tokens > 0:
prompt_cost += float(audio_tokens) * audio_token_cost
### CACHE WRITING COST
_cache_creation_input_token_cost = model_info.get("cache_creation_input_token_cost")
@ -175,6 +199,37 @@ def generic_cost_per_token(
completion_base_cost = _get_completion_token_base_cost(
model_info=model_info, usage=usage
)
completion_cost = usage["completion_tokens"] * completion_base_cost
text_tokens = usage.completion_tokens
audio_tokens = 0
if usage.completion_tokens_details is not None:
audio_tokens = (
cast(
Optional[int],
getattr(usage.completion_tokens_details, "audio_tokens", 0),
)
or 0
)
text_tokens = (
cast(
Optional[int],
getattr(usage.completion_tokens_details, "text_tokens", None),
)
or usage.completion_tokens # default to completion tokens, if this field is not set
)
## TEXT COST
completion_cost = float(text_tokens) * completion_base_cost
_output_cost_per_audio_token: Optional[float] = model_info.get(
"output_cost_per_audio_token"
)
## AUDIO COST
if (
_output_cost_per_audio_token is not None
and audio_tokens is not None
and audio_tokens > 0
):
completion_cost += float(audio_tokens) * _output_cost_per_audio_token
return prompt_cost, completion_cost

View file

@ -6,6 +6,7 @@ Helper util for handling openai-specific cost calculation
from typing import Literal, Optional, Tuple
from litellm._logging import verbose_logger
from litellm.litellm_core_utils.llm_cost_calc.utils import generic_cost_per_token
from litellm.types.utils import CallTypes, Usage
from litellm.utils import get_model_info
@ -28,52 +29,53 @@ def cost_per_token(model: str, usage: Usage) -> Tuple[float, float]:
Returns:
Tuple[float, float] - prompt_cost_in_usd, completion_cost_in_usd
"""
## GET MODEL INFO
model_info = get_model_info(model=model, custom_llm_provider="openai")
## CALCULATE INPUT COST
### Non-cached text tokens
non_cached_text_tokens = usage.prompt_tokens
cached_tokens: Optional[int] = None
if usage.prompt_tokens_details and usage.prompt_tokens_details.cached_tokens:
cached_tokens = usage.prompt_tokens_details.cached_tokens
non_cached_text_tokens = non_cached_text_tokens - cached_tokens
prompt_cost: float = non_cached_text_tokens * model_info["input_cost_per_token"]
## Prompt Caching cost calculation
if model_info.get("cache_read_input_token_cost") is not None and cached_tokens:
# Note: We read ._cache_read_input_tokens from the Usage - since cost_calculator.py standardizes the cache read tokens on usage._cache_read_input_tokens
prompt_cost += cached_tokens * (
model_info.get("cache_read_input_token_cost", 0) or 0
)
return generic_cost_per_token(
model=model, usage=usage, custom_llm_provider="openai"
)
# ### Non-cached text tokens
# non_cached_text_tokens = usage.prompt_tokens
# cached_tokens: Optional[int] = None
# if usage.prompt_tokens_details and usage.prompt_tokens_details.cached_tokens:
# cached_tokens = usage.prompt_tokens_details.cached_tokens
# non_cached_text_tokens = non_cached_text_tokens - cached_tokens
# prompt_cost: float = non_cached_text_tokens * model_info["input_cost_per_token"]
# ## Prompt Caching cost calculation
# if model_info.get("cache_read_input_token_cost") is not None and cached_tokens:
# # Note: We read ._cache_read_input_tokens from the Usage - since cost_calculator.py standardizes the cache read tokens on usage._cache_read_input_tokens
# prompt_cost += cached_tokens * (
# model_info.get("cache_read_input_token_cost", 0) or 0
# )
_audio_tokens: Optional[int] = (
usage.prompt_tokens_details.audio_tokens
if usage.prompt_tokens_details is not None
else None
)
_audio_cost_per_token: Optional[float] = model_info.get(
"input_cost_per_audio_token"
)
if _audio_tokens is not None and _audio_cost_per_token is not None:
audio_cost: float = _audio_tokens * _audio_cost_per_token
prompt_cost += audio_cost
# _audio_tokens: Optional[int] = (
# usage.prompt_tokens_details.audio_tokens
# if usage.prompt_tokens_details is not None
# else None
# )
# _audio_cost_per_token: Optional[float] = model_info.get(
# "input_cost_per_audio_token"
# )
# if _audio_tokens is not None and _audio_cost_per_token is not None:
# audio_cost: float = _audio_tokens * _audio_cost_per_token
# prompt_cost += audio_cost
## CALCULATE OUTPUT COST
completion_cost: float = (
usage["completion_tokens"] * model_info["output_cost_per_token"]
)
_output_cost_per_audio_token: Optional[float] = model_info.get(
"output_cost_per_audio_token"
)
_output_audio_tokens: Optional[int] = (
usage.completion_tokens_details.audio_tokens
if usage.completion_tokens_details is not None
else None
)
if _output_cost_per_audio_token is not None and _output_audio_tokens is not None:
audio_cost = _output_audio_tokens * _output_cost_per_audio_token
completion_cost += audio_cost
# ## CALCULATE OUTPUT COST
# completion_cost: float = (
# usage["completion_tokens"] * model_info["output_cost_per_token"]
# )
# _output_cost_per_audio_token: Optional[float] = model_info.get(
# "output_cost_per_audio_token"
# )
# _output_audio_tokens: Optional[int] = (
# usage.completion_tokens_details.audio_tokens
# if usage.completion_tokens_details is not None
# else None
# )
# if _output_cost_per_audio_token is not None and _output_audio_tokens is not None:
# audio_cost = _output_audio_tokens * _output_cost_per_audio_token
# completion_cost += audio_cost
return prompt_cost, completion_cost
# return prompt_cost, completion_cost
def cost_per_second(

View file

@ -4,7 +4,11 @@ from typing import Literal, Optional, Tuple, Union
import litellm
from litellm import verbose_logger
from litellm.litellm_core_utils.llm_cost_calc.utils import _is_above_128k
from litellm.litellm_core_utils.llm_cost_calc.utils import (
_is_above_128k,
generic_cost_per_token,
)
from litellm.types.utils import ModelInfo, Usage
"""
Gemini pricing covers:
@ -20,7 +24,7 @@ Vertex AI -> character based pricing
Google AI Studio -> token based pricing
"""
models_without_dynamic_pricing = ["gemini-1.0-pro", "gemini-pro"]
models_without_dynamic_pricing = ["gemini-1.0-pro", "gemini-pro", "gemini-2"]
def cost_router(
@ -46,14 +50,15 @@ def cost_router(
call_type == "embedding" or call_type == "aembedding"
):
return "cost_per_token"
elif custom_llm_provider == "vertex_ai" and ("gemini-2" in model):
return "cost_per_token"
return "cost_per_character"
def cost_per_character(
model: str,
custom_llm_provider: str,
prompt_tokens: float,
completion_tokens: float,
usage: Usage,
prompt_characters: Optional[float] = None,
completion_characters: Optional[float] = None,
) -> Tuple[float, float]:
@ -86,8 +91,7 @@ def cost_per_character(
prompt_cost, _ = cost_per_token(
model=model,
custom_llm_provider=custom_llm_provider,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
usage=usage,
)
else:
try:
@ -124,8 +128,7 @@ def cost_per_character(
prompt_cost, _ = cost_per_token(
model=model,
custom_llm_provider=custom_llm_provider,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
usage=usage,
)
## CALCULATE OUTPUT COST
@ -133,10 +136,10 @@ def cost_per_character(
_, completion_cost = cost_per_token(
model=model,
custom_llm_provider=custom_llm_provider,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
usage=usage,
)
else:
completion_tokens = usage.completion_tokens
try:
if (
_is_above_128k(tokens=completion_characters * 4) # 1 token = 4 char
@ -172,18 +175,54 @@ def cost_per_character(
_, completion_cost = cost_per_token(
model=model,
custom_llm_provider=custom_llm_provider,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
usage=usage,
)
return prompt_cost, completion_cost
def _handle_128k_pricing(
model_info: ModelInfo,
usage: Usage,
) -> Tuple[float, float]:
## CALCULATE INPUT COST
input_cost_per_token_above_128k_tokens = model_info.get(
"input_cost_per_token_above_128k_tokens"
)
output_cost_per_token_above_128k_tokens = model_info.get(
"output_cost_per_token_above_128k_tokens"
)
prompt_tokens = usage.prompt_tokens
completion_tokens = usage.completion_tokens
if (
_is_above_128k(tokens=prompt_tokens)
and input_cost_per_token_above_128k_tokens is not None
):
prompt_cost = prompt_tokens * input_cost_per_token_above_128k_tokens
else:
prompt_cost = prompt_tokens * model_info["input_cost_per_token"]
## CALCULATE OUTPUT COST
output_cost_per_token_above_128k_tokens = model_info.get(
"output_cost_per_token_above_128k_tokens"
)
if (
_is_above_128k(tokens=completion_tokens)
and output_cost_per_token_above_128k_tokens is not None
):
completion_cost = completion_tokens * output_cost_per_token_above_128k_tokens
else:
completion_cost = completion_tokens * model_info["output_cost_per_token"]
return prompt_cost, completion_cost
def cost_per_token(
model: str,
custom_llm_provider: str,
prompt_tokens: float,
completion_tokens: float,
usage: Usage,
) -> Tuple[float, float]:
"""
Calculates the cost per token for a given model, prompt tokens, and completion tokens.
@ -205,38 +244,24 @@ def cost_per_token(
model=model, custom_llm_provider=custom_llm_provider
)
## CALCULATE INPUT COST
## HANDLE 128k+ PRICING
input_cost_per_token_above_128k_tokens = model_info.get(
"input_cost_per_token_above_128k_tokens"
)
output_cost_per_token_above_128k_tokens = model_info.get(
"output_cost_per_token_above_128k_tokens"
)
if (
_is_above_128k(tokens=prompt_tokens)
and model not in models_without_dynamic_pricing
input_cost_per_token_above_128k_tokens is not None
or output_cost_per_token_above_128k_tokens is not None
):
assert (
"input_cost_per_token_above_128k_tokens" in model_info
and model_info["input_cost_per_token_above_128k_tokens"] is not None
), "model info for model={} does not have pricing for > 128k tokens\nmodel_info={}".format(
model, model_info
return _handle_128k_pricing(
model_info=model_info,
usage=usage,
)
prompt_cost = (
prompt_tokens * model_info["input_cost_per_token_above_128k_tokens"]
)
else:
prompt_cost = prompt_tokens * model_info["input_cost_per_token"]
## CALCULATE OUTPUT COST
if (
_is_above_128k(tokens=completion_tokens)
and model not in models_without_dynamic_pricing
):
assert (
"output_cost_per_token_above_128k_tokens" in model_info
and model_info["output_cost_per_token_above_128k_tokens"] is not None
), "model info for model={} does not have pricing for > 128k tokens\nmodel_info={}".format(
model, model_info
)
completion_cost = (
completion_tokens * model_info["output_cost_per_token_above_128k_tokens"]
)
else:
completion_cost = completion_tokens * model_info["output_cost_per_token"]
return prompt_cost, completion_cost
return generic_cost_per_token(
model=model,
custom_llm_provider=custom_llm_provider,
usage=usage,
)

View file

@ -643,16 +643,25 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
completion_response: GenerateContentResponseBody,
) -> Usage:
cached_tokens: Optional[int] = None
audio_tokens: Optional[int] = None
text_tokens: Optional[int] = None
prompt_tokens_details: Optional[PromptTokensDetailsWrapper] = None
if "cachedContentTokenCount" in completion_response["usageMetadata"]:
cached_tokens = completion_response["usageMetadata"][
"cachedContentTokenCount"
]
if "promptTokensDetails" in completion_response["usageMetadata"]:
for detail in completion_response["usageMetadata"]["promptTokensDetails"]:
if detail["modality"] == "AUDIO":
audio_tokens = detail["tokenCount"]
elif detail["modality"] == "TEXT":
text_tokens = detail["tokenCount"]
if cached_tokens is not None:
prompt_tokens_details = PromptTokensDetailsWrapper(
cached_tokens=cached_tokens,
)
prompt_tokens_details = PromptTokensDetailsWrapper(
cached_tokens=cached_tokens,
audio_tokens=audio_tokens,
text_tokens=text_tokens,
)
## GET USAGE ##
usage = Usage(
prompt_tokens=completion_response["usageMetadata"].get(
@ -791,6 +800,7 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
model_response.choices.append(choice)
usage = self._calculate_usage(completion_response=completion_response)
setattr(model_response, "usage", usage)
## ADD GROUNDING METADATA ##

View file

@ -179,11 +179,17 @@ class TTL(TypedDict, total=False):
nano: float
class PromptTokensDetails(TypedDict):
modality: Literal["TEXT", "AUDIO", "IMAGE", "VIDEO"]
tokenCount: int
class UsageMetadata(TypedDict, total=False):
promptTokenCount: int
totalTokenCount: int
candidatesTokenCount: int
cachedContentTokenCount: int
promptTokensDetails: List[PromptTokensDetails]
class CachedContent(TypedDict, total=False):

View file

@ -12,7 +12,9 @@ from unittest.mock import MagicMock, patch
from pydantic import BaseModel
import litellm
from litellm.cost_calculator import response_cost_calculator
from litellm.types.utils import ModelResponse, PromptTokensDetailsWrapper, Usage
def test_cost_calculator_with_response_cost_in_additional_headers():
@ -32,3 +34,40 @@ def test_cost_calculator_with_response_cost_in_additional_headers():
)
assert result == 1000
def test_cost_calculator_with_usage():
from litellm import get_model_info
os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True"
litellm.model_cost = litellm.get_model_cost_map(url="")
usage = Usage(
prompt_tokens=100,
completion_tokens=100,
prompt_tokens_details=PromptTokensDetailsWrapper(
text_tokens=10, audio_tokens=90
),
)
mr = ModelResponse(usage=usage, model="gemini-2.0-flash-001")
result = response_cost_calculator(
response_object=mr,
model="",
custom_llm_provider="vertex_ai",
call_type="acompletion",
optional_params={},
cache_hit=None,
base_model=None,
)
model_info = litellm.model_cost["gemini-2.0-flash-001"]
expected_cost = (
usage.prompt_tokens_details.audio_tokens
* model_info["input_cost_per_audio_token"]
+ usage.prompt_tokens_details.text_tokens * model_info["input_cost_per_token"]
+ usage.completion_tokens * model_info["output_cost_per_token"]
)
assert result == expected_cost, f"Got {result}, Expected {expected_cost}"

View file

@ -2454,6 +2454,14 @@ def test_completion_cost_params_gemini_3():
os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True"
litellm.model_cost = litellm.get_model_cost_map(url="")
usage = Usage(
completion_tokens=2,
prompt_tokens=3771,
total_tokens=3773,
completion_tokens_details=None,
prompt_tokens_details=None,
)
response = ModelResponse(
id="chatcmpl-61043504-4439-48be-9996-e29bdee24dc3",
choices=[
@ -2472,13 +2480,7 @@ def test_completion_cost_params_gemini_3():
model="gemini-1.5-flash",
object="chat.completion",
system_fingerprint=None,
usage=Usage(
completion_tokens=2,
prompt_tokens=3771,
total_tokens=3773,
completion_tokens_details=None,
prompt_tokens_details=None,
),
usage=usage,
vertex_ai_grounding_metadata=[],
vertex_ai_safety_results=[
[
@ -2501,10 +2503,9 @@ def test_completion_cost_params_gemini_3():
**{
"model": "gemini-1.5-flash",
"custom_llm_provider": "vertex_ai",
"prompt_tokens": 3771,
"completion_tokens": 2,
"prompt_characters": None,
"completion_characters": 3,
"usage": usage,
}
)