forked from phoenix/litellm-mirror
fix(litellm_logging.py): fix calling success callback w/ stream_options true
Fixes https://github.com/BerriAI/litellm/issues/5118
This commit is contained in:
parent
9f0a05d406
commit
a26b23a3f4
4 changed files with 50 additions and 28 deletions
|
@ -1,12 +1,18 @@
|
||||||
# What is this?
|
# What is this?
|
||||||
## On Success events log cost to OpenMeter - https://github.com/BerriAI/litellm/issues/1268
|
## On Success events log cost to OpenMeter - https://github.com/BerriAI/litellm/issues/1268
|
||||||
|
|
||||||
import dotenv, os, json
|
import json
|
||||||
import litellm
|
import os
|
||||||
import traceback
|
import traceback
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import dotenv
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
import litellm
|
||||||
|
from litellm import verbose_logger
|
||||||
from litellm.integrations.custom_logger import CustomLogger
|
from litellm.integrations.custom_logger import CustomLogger
|
||||||
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
|
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
|
||||||
import uuid
|
|
||||||
|
|
||||||
|
|
||||||
def get_utc_datetime():
|
def get_utc_datetime():
|
||||||
|
@ -122,7 +128,11 @@ class OpenMeterLogger(CustomLogger):
|
||||||
)
|
)
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except Exception as e:
|
except httpx.HTTPStatusError as e:
|
||||||
if hasattr(response, "text"):
|
verbose_logger.error(
|
||||||
litellm.print_verbose(f"\nError Message: {response.text}")
|
"Failed OpenMeter logging - {}".format(e.response.text)
|
||||||
|
)
|
||||||
|
raise e
|
||||||
|
except Exception as e:
|
||||||
|
verbose_logger.error("Failed OpenMeter logging - {}".format(str(e)))
|
||||||
raise e
|
raise e
|
||||||
|
|
|
@ -638,6 +638,8 @@ class Logging:
|
||||||
verbose_logger.debug(f"success callbacks: {litellm.success_callback}")
|
verbose_logger.debug(f"success callbacks: {litellm.success_callback}")
|
||||||
## BUILD COMPLETE STREAMED RESPONSE
|
## BUILD COMPLETE STREAMED RESPONSE
|
||||||
complete_streaming_response = None
|
complete_streaming_response = None
|
||||||
|
if "complete_streaming_response" in self.model_call_details:
|
||||||
|
return # break out of this.
|
||||||
if self.stream and isinstance(result, ModelResponse):
|
if self.stream and isinstance(result, ModelResponse):
|
||||||
if (
|
if (
|
||||||
result.choices[0].finish_reason is not None
|
result.choices[0].finish_reason is not None
|
||||||
|
@ -1279,6 +1281,8 @@ class Logging:
|
||||||
)
|
)
|
||||||
## BUILD COMPLETE STREAMED RESPONSE
|
## BUILD COMPLETE STREAMED RESPONSE
|
||||||
complete_streaming_response = None
|
complete_streaming_response = None
|
||||||
|
if "async_complete_streaming_response" in self.model_call_details:
|
||||||
|
return # break out of this.
|
||||||
if self.stream:
|
if self.stream:
|
||||||
if result.choices[0].finish_reason is not None: # if it's the last chunk
|
if result.choices[0].finish_reason is not None: # if it's the last chunk
|
||||||
self.streaming_chunks.append(result)
|
self.streaming_chunks.append(result)
|
||||||
|
@ -1302,6 +1306,7 @@ class Logging:
|
||||||
self.streaming_chunks.append(result)
|
self.streaming_chunks.append(result)
|
||||||
if complete_streaming_response is not None:
|
if complete_streaming_response is not None:
|
||||||
print_verbose("Async success callbacks: Got a complete streaming response")
|
print_verbose("Async success callbacks: Got a complete streaming response")
|
||||||
|
|
||||||
self.model_call_details["async_complete_streaming_response"] = (
|
self.model_call_details["async_complete_streaming_response"] = (
|
||||||
complete_streaming_response
|
complete_streaming_response
|
||||||
)
|
)
|
||||||
|
@ -1431,7 +1436,7 @@ class Logging:
|
||||||
end_time=end_time,
|
end_time=end_time,
|
||||||
)
|
)
|
||||||
if isinstance(callback, CustomLogger): # custom logger class
|
if isinstance(callback, CustomLogger): # custom logger class
|
||||||
if self.stream == True:
|
if self.stream is True:
|
||||||
if (
|
if (
|
||||||
"async_complete_streaming_response"
|
"async_complete_streaming_response"
|
||||||
in self.model_call_details
|
in self.model_call_details
|
||||||
|
|
|
@ -1,24 +1,7 @@
|
||||||
general_settings:
|
|
||||||
store_model_in_db: true
|
|
||||||
database_connection_pool_limit: 20
|
|
||||||
|
|
||||||
model_list:
|
model_list:
|
||||||
- model_name: fake-openai-endpoint
|
- model_name: "*"
|
||||||
litellm_params:
|
litellm_params:
|
||||||
model: openai/my-fake-model
|
model: "*"
|
||||||
api_key: my-fake-key
|
|
||||||
api_base: https://exampleopenaiendpoint-production.up.railway.app/
|
|
||||||
litellm_settings:
|
|
||||||
drop_params: True
|
|
||||||
success_callback: ["prometheus"]
|
|
||||||
failure_callback: ["prometheus"]
|
|
||||||
service_callback: ["prometheus_system"]
|
|
||||||
_langfuse_default_tags: ["user_api_key_alias", "user_api_key_user_id", "user_api_key_user_email", "user_api_key_team_alias", "semantic-similarity", "proxy_base_url"]
|
|
||||||
|
|
||||||
router_settings:
|
litellm_settings:
|
||||||
routing_strategy: "latency-based-routing"
|
callbacks: ["openmeter"]
|
||||||
routing_strategy_args: {"ttl": 86400} # Average the last 10 calls to compute avg latency per model
|
|
||||||
allowed_fails: 1
|
|
||||||
num_retries: 3
|
|
||||||
retry_after: 5 # seconds to wait before retrying a failed request
|
|
||||||
cooldown_time: 30 # seconds to cooldown a deployment after failure
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ from pydantic import BaseModel
|
||||||
|
|
||||||
sys.path.insert(0, os.path.abspath("../.."))
|
sys.path.insert(0, os.path.abspath("../.."))
|
||||||
from typing import List, Literal, Optional, Union
|
from typing import List, Literal, Optional, Union
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
import litellm
|
import litellm
|
||||||
from litellm import Cache, completion, embedding
|
from litellm import Cache, completion, embedding
|
||||||
|
@ -518,6 +519,29 @@ async def test_async_chat_azure_stream():
|
||||||
# asyncio.run(test_async_chat_azure_stream())
|
# asyncio.run(test_async_chat_azure_stream())
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_async_chat_openai_stream_options():
|
||||||
|
try:
|
||||||
|
customHandler = CompletionCustomHandler()
|
||||||
|
litellm.callbacks = [customHandler]
|
||||||
|
with patch.object(
|
||||||
|
customHandler, "async_log_success_event", new=AsyncMock()
|
||||||
|
) as mock_client:
|
||||||
|
response = await litellm.acompletion(
|
||||||
|
model="gpt-3.5-turbo",
|
||||||
|
messages=[{"role": "user", "content": "Hi 👋 - i'm async openai"}],
|
||||||
|
stream=True,
|
||||||
|
stream_options={"include_usage": True},
|
||||||
|
)
|
||||||
|
|
||||||
|
async for chunk in response:
|
||||||
|
continue
|
||||||
|
|
||||||
|
mock_client.assert_awaited_once()
|
||||||
|
except Exception as e:
|
||||||
|
pytest.fail(f"An exception occurred: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
## Test Bedrock + sync
|
## Test Bedrock + sync
|
||||||
def test_chat_bedrock_stream():
|
def test_chat_bedrock_stream():
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue