litellm-mirror/litellm/responses/main.py
Ishaan Jaff 868cdd0226
[Feat] Add Support for DELETE /v1/responses/{response_id} on OpenAI, Azure OpenAI (#10205)
* add transform_delete_response_api_request to base responses config

* add transform_delete_response_api_request

* add delete_response_api_handler

* fixes for deleting responses, response API

* add adelete_responses

* add async test_basic_openai_responses_delete_endpoint

* test_basic_openai_responses_delete_endpoint

* working delete for streaming on responses API

* fixes azure transformation

* TestAnthropicResponsesAPITest

* fix code check

* fix linting

* fixes for get_complete_url

* test_basic_openai_responses_streaming_delete_endpoint

* streaming fixes
2025-04-22 18:27:03 -07:00

436 lines
16 KiB
Python

import asyncio
import contextvars
from functools import partial
from typing import Any, Coroutine, Dict, Iterable, List, Literal, Optional, Union
import httpx
import litellm
from litellm.constants import request_timeout
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.llms.base_llm.responses.transformation import BaseResponsesAPIConfig
from litellm.llms.custom_httpx.llm_http_handler import BaseLLMHTTPHandler
from litellm.responses.litellm_completion_transformation.handler import (
LiteLLMCompletionTransformationHandler,
)
from litellm.responses.utils import ResponsesAPIRequestUtils
from litellm.types.llms.openai import (
Reasoning,
ResponseIncludable,
ResponseInputParam,
ResponsesAPIOptionalRequestParams,
ResponsesAPIResponse,
ResponseTextConfigParam,
ToolChoice,
ToolParam,
)
from litellm.types.responses.main import *
from litellm.types.router import GenericLiteLLMParams
from litellm.utils import ProviderConfigManager, client
from .streaming_iterator import BaseResponsesAPIStreamingIterator
####### ENVIRONMENT VARIABLES ###################
# Initialize any necessary instances or variables here
base_llm_http_handler = BaseLLMHTTPHandler()
litellm_completion_transformation_handler = LiteLLMCompletionTransformationHandler()
#################################################
@client
async def aresponses(
input: Union[str, ResponseInputParam],
model: str,
include: Optional[List[ResponseIncludable]] = None,
instructions: Optional[str] = None,
max_output_tokens: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
parallel_tool_calls: Optional[bool] = None,
previous_response_id: Optional[str] = None,
reasoning: Optional[Reasoning] = None,
store: Optional[bool] = None,
stream: Optional[bool] = None,
temperature: Optional[float] = None,
text: Optional[ResponseTextConfigParam] = None,
tool_choice: Optional[ToolChoice] = None,
tools: Optional[Iterable[ToolParam]] = None,
top_p: Optional[float] = None,
truncation: Optional[Literal["auto", "disabled"]] = None,
user: Optional[str] = None,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
# LiteLLM specific params,
custom_llm_provider: Optional[str] = None,
**kwargs,
) -> Union[ResponsesAPIResponse, BaseResponsesAPIStreamingIterator]:
"""
Async: Handles responses API requests by reusing the synchronous function
"""
local_vars = locals()
try:
loop = asyncio.get_event_loop()
kwargs["aresponses"] = True
# get custom llm provider so we can use this for mapping exceptions
if custom_llm_provider is None:
_, custom_llm_provider, _, _ = litellm.get_llm_provider(
model=model, api_base=local_vars.get("base_url", None)
)
func = partial(
responses,
input=input,
model=model,
include=include,
instructions=instructions,
max_output_tokens=max_output_tokens,
metadata=metadata,
parallel_tool_calls=parallel_tool_calls,
previous_response_id=previous_response_id,
reasoning=reasoning,
store=store,
stream=stream,
temperature=temperature,
text=text,
tool_choice=tool_choice,
tools=tools,
top_p=top_p,
truncation=truncation,
user=user,
extra_headers=extra_headers,
extra_query=extra_query,
extra_body=extra_body,
timeout=timeout,
custom_llm_provider=custom_llm_provider,
**kwargs,
)
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response
# Update the responses_api_response_id with the model_id
if isinstance(response, ResponsesAPIResponse):
response = ResponsesAPIRequestUtils._update_responses_api_response_id_with_model_id(
responses_api_response=response,
litellm_metadata=kwargs.get("litellm_metadata", {}),
custom_llm_provider=custom_llm_provider,
)
return response
except Exception as e:
raise litellm.exception_type(
model=model,
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
def responses(
input: Union[str, ResponseInputParam],
model: str,
include: Optional[List[ResponseIncludable]] = None,
instructions: Optional[str] = None,
max_output_tokens: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
parallel_tool_calls: Optional[bool] = None,
previous_response_id: Optional[str] = None,
reasoning: Optional[Reasoning] = None,
store: Optional[bool] = None,
stream: Optional[bool] = None,
temperature: Optional[float] = None,
text: Optional[ResponseTextConfigParam] = None,
tool_choice: Optional[ToolChoice] = None,
tools: Optional[Iterable[ToolParam]] = None,
top_p: Optional[float] = None,
truncation: Optional[Literal["auto", "disabled"]] = None,
user: Optional[str] = None,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
# LiteLLM specific params,
custom_llm_provider: Optional[str] = None,
**kwargs,
):
"""
Synchronous version of the Responses API.
Uses the synchronous HTTP handler to make requests.
"""
local_vars = locals()
try:
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj") # type: ignore
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None)
_is_async = kwargs.pop("aresponses", False) is True
# get llm provider logic
litellm_params = GenericLiteLLMParams(**kwargs)
(
model,
custom_llm_provider,
dynamic_api_key,
dynamic_api_base,
) = litellm.get_llm_provider(
model=model,
custom_llm_provider=custom_llm_provider,
api_base=litellm_params.api_base,
api_key=litellm_params.api_key,
)
# get provider config
responses_api_provider_config: Optional[BaseResponsesAPIConfig] = (
ProviderConfigManager.get_provider_responses_api_config(
model=model,
provider=litellm.LlmProviders(custom_llm_provider),
)
)
local_vars.update(kwargs)
# Get ResponsesAPIOptionalRequestParams with only valid parameters
response_api_optional_params: ResponsesAPIOptionalRequestParams = (
ResponsesAPIRequestUtils.get_requested_response_api_optional_param(
local_vars
)
)
if responses_api_provider_config is None:
return litellm_completion_transformation_handler.response_api_handler(
model=model,
input=input,
responses_api_request=response_api_optional_params,
custom_llm_provider=custom_llm_provider,
_is_async=_is_async,
stream=stream,
**kwargs,
)
# Get optional parameters for the responses API
responses_api_request_params: Dict = (
ResponsesAPIRequestUtils.get_optional_params_responses_api(
model=model,
responses_api_provider_config=responses_api_provider_config,
response_api_optional_params=response_api_optional_params,
)
)
# Pre Call logging
litellm_logging_obj.update_environment_variables(
model=model,
user=user,
optional_params=dict(responses_api_request_params),
litellm_params={
"litellm_call_id": litellm_call_id,
**responses_api_request_params,
},
custom_llm_provider=custom_llm_provider,
)
# Call the handler with _is_async flag instead of directly calling the async handler
response = base_llm_http_handler.response_api_handler(
model=model,
input=input,
responses_api_provider_config=responses_api_provider_config,
response_api_optional_request_params=responses_api_request_params,
custom_llm_provider=custom_llm_provider,
litellm_params=litellm_params,
logging_obj=litellm_logging_obj,
extra_headers=extra_headers,
extra_body=extra_body,
timeout=timeout or request_timeout,
_is_async=_is_async,
client=kwargs.get("client"),
fake_stream=responses_api_provider_config.should_fake_stream(
model=model, stream=stream, custom_llm_provider=custom_llm_provider
),
litellm_metadata=kwargs.get("litellm_metadata", {}),
)
# Update the responses_api_response_id with the model_id
if isinstance(response, ResponsesAPIResponse):
response = ResponsesAPIRequestUtils._update_responses_api_response_id_with_model_id(
responses_api_response=response,
litellm_metadata=kwargs.get("litellm_metadata", {}),
custom_llm_provider=custom_llm_provider,
)
return response
except Exception as e:
raise litellm.exception_type(
model=model,
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
async def adelete_responses(
response_id: str,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
# LiteLLM specific params,
custom_llm_provider: Optional[str] = None,
**kwargs,
) -> DeleteResponseResult:
"""
Async version of the DELETE Responses API
DELETE /v1/responses/{response_id} endpoint in the responses API
"""
local_vars = locals()
try:
loop = asyncio.get_event_loop()
kwargs["adelete_responses"] = True
# get custom llm provider from response_id
decoded_response_id: DecodedResponseId = (
ResponsesAPIRequestUtils._decode_responses_api_response_id(
response_id=response_id,
)
)
response_id = decoded_response_id.get("response_id") or response_id
custom_llm_provider = (
decoded_response_id.get("custom_llm_provider") or custom_llm_provider
)
func = partial(
delete_responses,
response_id=response_id,
custom_llm_provider=custom_llm_provider,
extra_headers=extra_headers,
extra_query=extra_query,
extra_body=extra_body,
timeout=timeout,
**kwargs,
)
ctx = contextvars.copy_context()
func_with_context = partial(ctx.run, func)
init_response = await loop.run_in_executor(None, func_with_context)
if asyncio.iscoroutine(init_response):
response = await init_response
else:
response = init_response
return response
except Exception as e:
raise litellm.exception_type(
model=None,
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)
@client
def delete_responses(
response_id: str,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Optional[Dict[str, Any]] = None,
extra_query: Optional[Dict[str, Any]] = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
# LiteLLM specific params,
custom_llm_provider: Optional[str] = None,
**kwargs,
) -> Union[DeleteResponseResult, Coroutine[Any, Any, DeleteResponseResult]]:
"""
Synchronous version of the DELETE Responses API
DELETE /v1/responses/{response_id} endpoint in the responses API
"""
local_vars = locals()
try:
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj") # type: ignore
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None)
_is_async = kwargs.pop("adelete_responses", False) is True
# get llm provider logic
litellm_params = GenericLiteLLMParams(**kwargs)
# get custom llm provider from response_id
decoded_response_id: DecodedResponseId = (
ResponsesAPIRequestUtils._decode_responses_api_response_id(
response_id=response_id,
)
)
response_id = decoded_response_id.get("response_id") or response_id
custom_llm_provider = (
decoded_response_id.get("custom_llm_provider") or custom_llm_provider
)
if custom_llm_provider is None:
raise ValueError("custom_llm_provider is required but passed as None")
# get provider config
responses_api_provider_config: Optional[BaseResponsesAPIConfig] = (
ProviderConfigManager.get_provider_responses_api_config(
model=None,
provider=litellm.LlmProviders(custom_llm_provider),
)
)
if responses_api_provider_config is None:
raise ValueError(
f"DELETE responses is not supported for {custom_llm_provider}"
)
local_vars.update(kwargs)
# Pre Call logging
litellm_logging_obj.update_environment_variables(
model=None,
optional_params={
"response_id": response_id,
},
litellm_params={
"litellm_call_id": litellm_call_id,
},
custom_llm_provider=custom_llm_provider,
)
# Call the handler with _is_async flag instead of directly calling the async handler
response = base_llm_http_handler.delete_response_api_handler(
response_id=response_id,
custom_llm_provider=custom_llm_provider,
responses_api_provider_config=responses_api_provider_config,
litellm_params=litellm_params,
logging_obj=litellm_logging_obj,
extra_headers=extra_headers,
extra_body=extra_body,
timeout=timeout or request_timeout,
_is_async=_is_async,
client=kwargs.get("client"),
)
return response
except Exception as e:
raise litellm.exception_type(
model=None,
custom_llm_provider=custom_llm_provider,
original_exception=e,
completion_kwargs=local_vars,
extra_kwargs=kwargs,
)