litellm-mirror/enterprise/enterprise_hooks/session_handler.py
Ishaan Jaff 331e784db4
Some checks failed
Read Version from pyproject.toml / read-version (push) Successful in 49s
Helm unit test / unit-test (push) Successful in 55s
Publish Prisma Migrations / publish-migrations (push) Failing after 1m57s
[Feat] Responses API - Add session management support for non-openai models (#10321)
* add session id in spendLogs

* fix log proxy server request as independant field

* use trace id for SpendLogs

* add _ENTERPRISE_ResponsesSessionHandler

* use _ENTERPRISE_ResponsesSessionHandler

* working session_ids

* working session management

* working session_ids

* test_async_gcs_pub_sub_v1

* test_spend_logs_payload_e2e

* working session_ids

* test_get_standard_logging_payload_trace_id

* test_get_standard_logging_payload_trace_id

* test_gcs_pub_sub.py

* fix all linting errors

* test_spend_logs_payload_with_prompts_enabled

* _ENTERPRISE_ResponsesSessionHandler

* _ENTERPRISE_ResponsesSessionHandler

* expose session id on ui

* get spend logs by session

* add sessionSpendLogsCall

* add session handling

* session logs

* ui session details

* fix on rowExpandDetails

* ui working sessions
2025-04-25 23:24:24 -07:00

136 lines
5.6 KiB
Python

from litellm.proxy._types import SpendLogsPayload
from litellm.integrations.custom_logger import CustomLogger
from litellm._logging import verbose_proxy_logger
from typing import Optional, List, Union
import json
from litellm.types.utils import ModelResponse, Message
from litellm.types.llms.openai import (
AllMessageValues,
ChatCompletionResponseMessage,
GenericChatCompletionMessage,
ResponseInputParam,
)
from litellm.types.utils import ChatCompletionMessageToolCall
from litellm.responses.utils import ResponsesAPIRequestUtils
from typing import TypedDict
class ChatCompletionSession(TypedDict, total=False):
messages: List[Union[AllMessageValues, GenericChatCompletionMessage, ChatCompletionMessageToolCall, ChatCompletionResponseMessage, Message]]
litellm_session_id: Optional[str]
class _ENTERPRISE_ResponsesSessionHandler:
@staticmethod
async def get_chat_completion_message_history_for_previous_response_id(
previous_response_id: str,
) -> ChatCompletionSession:
"""
Return the chat completion message history for a previous response id
"""
from litellm.responses.litellm_completion_transformation.transformation import LiteLLMCompletionResponsesConfig
all_spend_logs: List[SpendLogsPayload] = await _ENTERPRISE_ResponsesSessionHandler.get_all_spend_logs_for_previous_response_id(previous_response_id)
litellm_session_id: Optional[str] = None
if len(all_spend_logs) > 0:
litellm_session_id = all_spend_logs[0].get("session_id")
chat_completion_message_history: List[
Union[
AllMessageValues,
GenericChatCompletionMessage,
ChatCompletionMessageToolCall,
ChatCompletionResponseMessage,
Message,
]
] = []
for spend_log in all_spend_logs:
proxy_server_request: Union[str, dict] = spend_log.get("proxy_server_request") or "{}"
proxy_server_request_dict: Optional[dict] = None
response_input_param: Optional[Union[str, ResponseInputParam]] = None
if isinstance(proxy_server_request, dict):
proxy_server_request_dict = proxy_server_request
else:
proxy_server_request_dict = json.loads(proxy_server_request)
############################################################
# Add Input messages for this Spend Log
############################################################
if proxy_server_request_dict:
_response_input_param = proxy_server_request_dict.get("input", None)
if isinstance(_response_input_param, str):
response_input_param = _response_input_param
elif isinstance(_response_input_param, dict):
response_input_param = ResponseInputParam(**_response_input_param)
if response_input_param:
chat_completion_messages = LiteLLMCompletionResponsesConfig.transform_responses_api_input_to_messages(
input=response_input_param,
responses_api_request=proxy_server_request_dict or {}
)
chat_completion_message_history.extend(chat_completion_messages)
############################################################
# Add Output messages for this Spend Log
############################################################
_response_output = spend_log.get("response", "{}")
if isinstance(_response_output, dict):
# transform `ChatCompletion Response` to `ResponsesAPIResponse`
model_response = ModelResponse(**_response_output)
for choice in model_response.choices:
if hasattr(choice, "message"):
chat_completion_message_history.append(choice.message)
verbose_proxy_logger.debug("chat_completion_message_history %s", json.dumps(chat_completion_message_history, indent=4, default=str))
return ChatCompletionSession(
messages=chat_completion_message_history,
litellm_session_id=litellm_session_id
)
@staticmethod
async def get_all_spend_logs_for_previous_response_id(
previous_response_id: str
) -> List[SpendLogsPayload]:
"""
Get all spend logs for a previous response id
SQL query
SELECT session_id FROM spend_logs WHERE response_id = previous_response_id, SELECT * FROM spend_logs WHERE session_id = session_id
"""
from litellm.proxy.proxy_server import prisma_client
decoded_response_id = ResponsesAPIRequestUtils._decode_responses_api_response_id(previous_response_id)
previous_response_id = decoded_response_id.get("response_id", previous_response_id)
if prisma_client is None:
return []
query = """
WITH matching_session AS (
SELECT session_id
FROM "LiteLLM_SpendLogs"
WHERE request_id = $1
)
SELECT *
FROM "LiteLLM_SpendLogs"
WHERE session_id IN (SELECT session_id FROM matching_session)
ORDER BY "endTime" ASC;
"""
spend_logs = await prisma_client.db.query_raw(
query,
previous_response_id
)
verbose_proxy_logger.debug(
"Found the following spend logs for previous response id %s: %s",
previous_response_id,
json.dumps(spend_logs, indent=4, default=str)
)
return spend_logs