fix(utils.py): fix cost calculation for openai-compatible streaming object

This commit is contained in:
Krrish Dholakia 2024-06-04 10:36:25 -07:00
parent 7b474ec267
commit 52a2f5150c
9 changed files with 189 additions and 79 deletions

5
.gitignore vendored
View file

@ -55,4 +55,7 @@ litellm/proxy/_super_secret_config.yaml
litellm/proxy/_super_secret_config.yaml
litellm/proxy/myenv/bin/activate
litellm/proxy/myenv/bin/Activate.ps1
myenv/*
myenv/*
litellm/proxy/_experimental/out/404/index.html
litellm/proxy/_experimental/out/model_hub/index.html
litellm/proxy/_experimental/out/onboarding/index.html

View file

@ -815,3 +815,4 @@ from .router import Router
from .assistants.main import *
from .batches.main import *
from .scheduler import *
from .cost_calculator import response_cost_calculator

View file

@ -0,0 +1,80 @@
# What is this?
## File for 'response_cost' calculation in Logging
from typing import Optional, Union, Literal
from litellm.utils import (
ModelResponse,
EmbeddingResponse,
ImageResponse,
TranscriptionResponse,
TextCompletionResponse,
CallTypes,
completion_cost,
print_verbose,
)
import litellm
def response_cost_calculator(
response_object: Union[
ModelResponse,
EmbeddingResponse,
ImageResponse,
TranscriptionResponse,
TextCompletionResponse,
],
model: str,
custom_llm_provider: str,
call_type: Literal[
"embedding",
"aembedding",
"completion",
"acompletion",
"atext_completion",
"text_completion",
"image_generation",
"aimage_generation",
"moderation",
"amoderation",
"atranscription",
"transcription",
"aspeech",
"speech",
],
optional_params: dict,
cache_hit: Optional[bool] = None,
base_model: Optional[str] = None,
custom_pricing: Optional[bool] = None,
) -> Optional[float]:
try:
response_cost: float = 0.0
if cache_hit is not None and cache_hit == True:
response_cost = 0.0
else:
response_object._hidden_params["optional_params"] = optional_params
if isinstance(response_object, ImageResponse):
response_cost = completion_cost(
completion_response=response_object,
model=model,
call_type=call_type,
custom_llm_provider=custom_llm_provider,
)
else:
if (
model in litellm.model_cost
and custom_pricing is not None
and custom_llm_provider == True
): # override defaults if custom pricing is set
base_model = model
# base_model defaults to None if not set on model_info
response_cost = completion_cost(
completion_response=response_object,
call_type=call_type,
model=base_model,
custom_llm_provider=custom_llm_provider,
)
return response_cost
except litellm.NotFoundError as e:
print_verbose(
f"Model={model} for LLM Provider={custom_llm_provider} not found in completion cost map."
)
return None

View file

@ -84,9 +84,9 @@ class AsyncHTTPHandler:
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
raise
raise e
except Exception as e:
raise
raise e
def __del__(self) -> None:
try:

View file

@ -3724,7 +3724,7 @@ async def amoderation(input: str, model: str, api_key: Optional[str] = None, **k
##### Image Generation #######################
@client
async def aimage_generation(*args, **kwargs):
async def aimage_generation(*args, **kwargs) -> ImageResponse:
"""
Asynchronously calls the `image_generation` function with the given arguments and keyword arguments.
@ -3757,6 +3757,8 @@ async def aimage_generation(*args, **kwargs):
if isinstance(init_response, dict) or isinstance(
init_response, ImageResponse
): ## CACHING SCENARIO
if isinstance(init_response, dict):
init_response = ImageResponse(**init_response)
response = init_response
elif asyncio.iscoroutine(init_response):
response = await init_response
@ -3792,7 +3794,7 @@ def image_generation(
litellm_logging_obj=None,
custom_llm_provider=None,
**kwargs,
):
) -> ImageResponse:
"""
Maps the https://api.openai.com/v1/images/generations endpoint.
@ -4533,7 +4535,7 @@ def stream_chunk_builder_text_completion(chunks: list, messages: Optional[List]
def stream_chunk_builder(
chunks: list, messages: Optional[list] = None, start_time=None, end_time=None
):
) -> Union[ModelResponse, TextCompletionResponse]:
model_response = litellm.ModelResponse()
### SORT CHUNKS BASED ON CREATED ORDER ##
print_verbose("Goes into checking if chunk has hiddden created at param")

View file

@ -5,6 +5,9 @@ model_list:
model: openai/my-fake-model
rpm: 800
model_name: gpt-3.5-turbo-fake-model
- model_name: llama3-70b-8192
litellm_params:
model: groq/llama3-70b-8192
# - litellm_params:
# api_base: https://my-endpoint-europe-berri-992.openai.azure.com/
# api_key: os.environ/AZURE_EUROPE_API_KEY

View file

@ -470,3 +470,51 @@ def test_replicate_llama3_cost_tracking():
5,
)
assert cost == expected_cost
@pytest.mark.parametrize("is_streaming", [True]) # False
def test_groq_response_cost_tracking(is_streaming):
from litellm.utils import (
ModelResponse,
Choices,
Message,
Usage,
CallTypes,
StreamingChoices,
Delta,
)
response = ModelResponse(
id="chatcmpl-876cce24-e520-4cf8-8649-562a9be11c02",
choices=[
Choices(
finish_reason="stop",
index=0,
message=Message(
content="Hi! I'm an AI, so I don't have emotions or feelings like humans do, but I'm functioning properly and ready to help with any questions or topics you'd like to discuss! How can I assist you today?",
role="assistant",
),
)
],
created=1717519830,
model="llama3-70b-8192",
object="chat.completion",
system_fingerprint="fp_c1a4bcec29",
usage=Usage(completion_tokens=46, prompt_tokens=17, total_tokens=63),
)
response._hidden_params["custom_llm_provider"] = "groq"
print(response)
response_cost = litellm.response_cost_calculator(
response_object=response,
model="groq/llama3-70b-8192",
custom_llm_provider="groq",
call_type=CallTypes.acompletion.value,
optional_params={},
)
assert isinstance(response_cost, float)
assert response_cost > 0.0
print(f"response_cost: {response_cost}")
assert False

View file

@ -885,6 +885,7 @@ def test_completion_mistral_api_mistral_large_function_call_with_streaming():
idx = 0
for chunk in response:
print(f"chunk in response: {chunk}")
assert chunk._hidden_params["custom_llm_provider"] == "mistral"
if idx == 0:
assert (
chunk.choices[0].delta.tool_calls[0].function.arguments is not None
@ -898,7 +899,6 @@ def test_completion_mistral_api_mistral_large_function_call_with_streaming():
elif chunk.choices[0].finish_reason is not None: # last chunk
validate_final_streaming_function_calling_chunk(chunk=chunk)
idx += 1
# raise Exception("it worked!")
except Exception as e:
pytest.fail(f"Error occurred: {e}")

View file

@ -1499,51 +1499,21 @@ class Logging:
)
and self.stream != True
): # handle streaming separately
try:
if self.model_call_details.get("cache_hit", False) == True:
self.model_call_details["response_cost"] = 0.0
else:
result._hidden_params["optional_params"] = self.optional_params
if (
self.call_type == CallTypes.aimage_generation.value
or self.call_type == CallTypes.image_generation.value
):
self.model_call_details["response_cost"] = (
litellm.completion_cost(
completion_response=result,
model=self.model,
call_type=self.call_type,
custom_llm_provider=self.model_call_details.get(
"custom_llm_provider", None
), # set for img gen models
)
)
else:
base_model: Optional[str] = None
# check if base_model set on azure
base_model = _get_base_model_from_metadata(
model_call_details=self.model_call_details
)
# litellm model name
litellm_model = self.model_call_details["model"]
if (
litellm_model in litellm.model_cost
and self.custom_pricing == True
):
base_model = litellm_model
# base_model defaults to None if not set on model_info
self.model_call_details["response_cost"] = (
litellm.completion_cost(
completion_response=result,
call_type=self.call_type,
model=base_model,
)
)
except litellm.NotFoundError as e:
verbose_logger.debug(
f"Model={self.model} not found in completion cost map."
self.model_call_details["response_cost"] = (
litellm.response_cost_calculator(
response_object=result,
model=self.model,
cache_hit=self.model_call_details.get("cache_hit", False),
custom_llm_provider=self.model_call_details.get(
"custom_llm_provider", None
),
base_model=_get_base_model_from_metadata(
model_call_details=self.model_call_details
),
call_type=self.call_type,
optional_params=self.optional_params,
)
self.model_call_details["response_cost"] = None
)
else: # streaming chunks + image gen.
self.model_call_details["response_cost"] = None
@ -1607,29 +1577,21 @@ class Logging:
self.model_call_details["complete_streaming_response"] = (
complete_streaming_response
)
try:
if self.model_call_details.get("cache_hit", False) == True:
self.model_call_details["response_cost"] = 0.0
else:
# check if base_model set on azure
base_model = _get_base_model_from_metadata(
self.model_call_details["response_cost"] = (
litellm.response_cost_calculator(
response_object=complete_streaming_response,
model=self.model,
cache_hit=self.model_call_details.get("cache_hit", False),
custom_llm_provider=self.model_call_details.get(
"custom_llm_provider", None
),
base_model=_get_base_model_from_metadata(
model_call_details=self.model_call_details
)
# base_model defaults to None if not set on model_info
self.model_call_details["response_cost"] = (
litellm.completion_cost(
completion_response=complete_streaming_response,
model=base_model,
)
)
verbose_logger.debug(
f"Model={self.model}; cost={self.model_call_details['response_cost']}"
),
call_type=self.call_type,
optional_params=self.optional_params,
)
except litellm.NotFoundError as e:
verbose_logger.debug(
f"Model={self.model} not found in completion cost map."
)
self.model_call_details["response_cost"] = None
)
if self.dynamic_success_callbacks is not None and isinstance(
self.dynamic_success_callbacks, list
):
@ -4576,16 +4538,20 @@ def completion_cost(
completion="",
total_time=0.0, # used for replicate, sagemaker
call_type: Literal[
"completion",
"acompletion",
"embedding",
"aembedding",
"completion",
"acompletion",
"atext_completion",
"text_completion",
"image_generation",
"aimage_generation",
"transcription",
"moderation",
"amoderation",
"atranscription",
"transcription",
"aspeech",
"speech",
] = "completion",
### REGION ###
custom_llm_provider=None,
@ -11096,8 +11062,16 @@ class CustomStreamWrapper:
return ""
def model_response_creator(self):
_model = self.model
_received_llm_provider = self.custom_llm_provider
_logging_obj_llm_provider = self.logging_obj.model_call_details.get("custom_llm_provider", None) # type: ignore
if (
_received_llm_provider == "openai"
and _received_llm_provider != _logging_obj_llm_provider
):
_model = "{}/{}".format(_logging_obj_llm_provider, _model)
model_response = ModelResponse(
stream=True, model=self.model, stream_options=self.stream_options
stream=True, model=_model, stream_options=self.stream_options
)
if self.response_id is not None:
model_response.id = self.response_id
@ -11105,10 +11079,9 @@ class CustomStreamWrapper:
self.response_id = model_response.id
if self.system_fingerprint is not None:
model_response.system_fingerprint = self.system_fingerprint
model_response._hidden_params["custom_llm_provider"] = self.custom_llm_provider
model_response._hidden_params["custom_llm_provider"] = _logging_obj_llm_provider
model_response._hidden_params["created_at"] = time.time()
model_response.choices = [StreamingChoices()]
model_response.choices[0].finish_reason = None
model_response.choices = [StreamingChoices(finish_reason=None)]
return model_response
def is_delta_empty(self, delta: Delta) -> bool: