litellm-mirror/enterprise/enterprise_hooks/session_handler.py
2025-04-26 19:34:37 -07:00

132 lines
5.5 KiB
Python

from litellm.proxy._types import SpendLogsPayload
from litellm.integrations.custom_logger import CustomLogger
from litellm._logging import verbose_proxy_logger
from typing import Optional, List, Union
import json
from litellm.types.utils import ModelResponse, Message
from litellm.types.llms.openai import (
AllMessageValues,
ChatCompletionResponseMessage,
GenericChatCompletionMessage,
ResponseInputParam,
)
from litellm.types.utils import ChatCompletionMessageToolCall
from litellm.responses.utils import ResponsesAPIRequestUtils
from litellm.responses.litellm_completion_transformation.transformation import ChatCompletionSession
class _ENTERPRISE_ResponsesSessionHandler:
@staticmethod
async def get_chat_completion_message_history_for_previous_response_id(
previous_response_id: str,
) -> ChatCompletionSession:
"""
Return the chat completion message history for a previous response id
"""
from litellm.responses.litellm_completion_transformation.transformation import LiteLLMCompletionResponsesConfig
all_spend_logs: List[SpendLogsPayload] = await _ENTERPRISE_ResponsesSessionHandler.get_all_spend_logs_for_previous_response_id(previous_response_id)
litellm_session_id: Optional[str] = None
if len(all_spend_logs) > 0:
litellm_session_id = all_spend_logs[0].get("session_id")
chat_completion_message_history: List[
Union[
AllMessageValues,
GenericChatCompletionMessage,
ChatCompletionMessageToolCall,
ChatCompletionResponseMessage,
Message,
]
] = []
for spend_log in all_spend_logs:
proxy_server_request: Union[str, dict] = spend_log.get("proxy_server_request") or "{}"
proxy_server_request_dict: Optional[dict] = None
response_input_param: Optional[Union[str, ResponseInputParam]] = None
if isinstance(proxy_server_request, dict):
proxy_server_request_dict = proxy_server_request
else:
proxy_server_request_dict = json.loads(proxy_server_request)
############################################################
# Add Input messages for this Spend Log
############################################################
if proxy_server_request_dict:
_response_input_param = proxy_server_request_dict.get("input", None)
if isinstance(_response_input_param, str):
response_input_param = _response_input_param
elif isinstance(_response_input_param, dict):
response_input_param = ResponseInputParam(**_response_input_param)
if response_input_param:
chat_completion_messages = LiteLLMCompletionResponsesConfig.transform_responses_api_input_to_messages(
input=response_input_param,
responses_api_request=proxy_server_request_dict or {}
)
chat_completion_message_history.extend(chat_completion_messages)
############################################################
# Add Output messages for this Spend Log
############################################################
_response_output = spend_log.get("response", "{}")
if isinstance(_response_output, dict):
# transform `ChatCompletion Response` to `ResponsesAPIResponse`
model_response = ModelResponse(**_response_output)
for choice in model_response.choices:
if hasattr(choice, "message"):
chat_completion_message_history.append(choice.message)
verbose_proxy_logger.debug("chat_completion_message_history %s", json.dumps(chat_completion_message_history, indent=4, default=str))
return ChatCompletionSession(
messages=chat_completion_message_history,
litellm_session_id=litellm_session_id
)
@staticmethod
async def get_all_spend_logs_for_previous_response_id(
previous_response_id: str
) -> List[SpendLogsPayload]:
"""
Get all spend logs for a previous response id
SQL query
SELECT session_id FROM spend_logs WHERE response_id = previous_response_id, SELECT * FROM spend_logs WHERE session_id = session_id
"""
from litellm.proxy.proxy_server import prisma_client
decoded_response_id = ResponsesAPIRequestUtils._decode_responses_api_response_id(previous_response_id)
previous_response_id = decoded_response_id.get("response_id", previous_response_id)
if prisma_client is None:
return []
query = """
WITH matching_session AS (
SELECT session_id
FROM "LiteLLM_SpendLogs"
WHERE request_id = $1
)
SELECT *
FROM "LiteLLM_SpendLogs"
WHERE session_id IN (SELECT session_id FROM matching_session)
ORDER BY "endTime" ASC;
"""
spend_logs = await prisma_client.db.query_raw(
query,
previous_response_id
)
verbose_proxy_logger.debug(
"Found the following spend logs for previous response id %s: %s",
previous_response_id,
json.dumps(spend_logs, indent=4, default=str)
)
return spend_logs