diff --git a/litellm/llms/base_llm/responses/transformation.py b/litellm/llms/base_llm/responses/transformation.py index ef82b25ddb..bca8e3c7ed 100644 --- a/litellm/llms/base_llm/responses/transformation.py +++ b/litellm/llms/base_llm/responses/transformation.py @@ -9,6 +9,7 @@ from litellm.types.llms.openai import ( ResponsesAPIOptionalRequestParams, ResponsesAPIRequestParams, ResponsesAPIResponse, + ResponsesAPIStreamingResponse, ) from litellm.types.router import GenericLiteLLMParams from litellm.types.utils import ModelInfo @@ -107,6 +108,18 @@ class BaseResponsesAPIConfig(ABC): ) -> ResponsesAPIResponse: pass + @abstractmethod + def transform_streaming_response( + self, + model: str, + parsed_chunk: dict, + logging_obj: LiteLLMLoggingObj, + ) -> ResponsesAPIStreamingResponse: + """ + Transform a parsed streaming response chunk into a ResponsesAPIStreamingResponse + """ + pass + def get_error_class( self, error_message: str, status_code: int, headers: Union[dict, httpx.Headers] ) -> BaseLLMException: diff --git a/litellm/llms/custom_httpx/llm_http_handler.py b/litellm/llms/custom_httpx/llm_http_handler.py index 5ce05f4da7..a303c4572c 100644 --- a/litellm/llms/custom_httpx/llm_http_handler.py +++ b/litellm/llms/custom_httpx/llm_http_handler.py @@ -1030,6 +1030,7 @@ class BaseLLMHTTPHandler: response=response, model=model, logging_obj=logging_obj, + responses_api_provider_config=responses_api_provider_config, ) else: # For non-streaming, proceed as before diff --git a/litellm/llms/openai/responses/transformation.py b/litellm/llms/openai/responses/transformation.py index c30a425790..cecc9efe76 100644 --- a/litellm/llms/openai/responses/transformation.py +++ b/litellm/llms/openai/responses/transformation.py @@ -10,6 +10,7 @@ from litellm.types.llms.openai import ( ResponsesAPIOptionalRequestParams, ResponsesAPIRequestParams, ResponsesAPIResponse, + ResponsesAPIStreamingResponse, ) from litellm.types.router import GenericLiteLLMParams @@ -126,3 +127,15 @@ class OpenAIResponsesAPIConfig(BaseResponsesAPIConfig): api_base = api_base.rstrip("/") return f"{api_base}/responses" + + def transform_streaming_response( + self, + model: str, + parsed_chunk: dict, + logging_obj: LiteLLMLoggingObj, + ) -> ResponsesAPIStreamingResponse: + """ + Transform a parsed streaming response chunk into a ResponsesAPIStreamingResponse + """ + # Convert the dictionary to a properly typed ResponsesAPIStreamingResponse + return ResponsesAPIStreamingResponse(**parsed_chunk) diff --git a/litellm/responses/streaming_iterator.py b/litellm/responses/streaming_iterator.py index ad3fa59b26..ae592d8c41 100644 --- a/litellm/responses/streaming_iterator.py +++ b/litellm/responses/streaming_iterator.py @@ -4,10 +4,12 @@ from typing import Any, AsyncIterator, Dict, Optional, Union import httpx from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj +from litellm.llms.base_llm.responses.transformation import BaseResponsesAPIConfig from litellm.types.llms.openai import ( ResponsesAPIResponse, ResponsesAPIStreamingResponse, ) +from litellm.utils import CustomStreamWrapper class ResponsesAPIStreamingIterator: @@ -22,13 +24,15 @@ class ResponsesAPIStreamingIterator: self, response: httpx.Response, model: str, - logging_obj: Optional[LiteLLMLoggingObj] = None, + responses_api_provider_config: BaseResponsesAPIConfig, + logging_obj: LiteLLMLoggingObj, ): self.response = response self.model = model self.logging_obj = logging_obj self.stream_iterator = response.aiter_lines() self.finished = False + self.responses_api_provider_config = responses_api_provider_config def __aiter__(self): return self @@ -49,8 +53,9 @@ class ResponsesAPIStreamingIterator: return await self.__anext__() # Handle SSE format (data: {...}) - if chunk.startswith("data: "): - chunk = chunk[6:] # Remove "data: " prefix + chunk = CustomStreamWrapper._strip_sse_data_from_chunk(chunk) + if chunk is None: + return await self.__anext__() # Handle "[DONE]" marker if chunk == "[DONE]": @@ -61,28 +66,16 @@ class ResponsesAPIStreamingIterator: # Parse the JSON chunk parsed_chunk = json.loads(chunk) - # Log the chunk if logging is enabled - if self.logging_obj: - self.logging_obj.post_call( - input="", - api_key="", - original_response=parsed_chunk, - additional_args={ - "complete_streaming_chunk": parsed_chunk, - }, - ) - # Format as ResponsesAPIStreamingResponse if isinstance(parsed_chunk, dict): - # If the chunk already has a 'type' field, it's already in the right format - if "type" in parsed_chunk: - return ResponsesAPIStreamingResponse(**parsed_chunk) - # Otherwise, wrap it as a response - else: - return ResponsesAPIStreamingResponse( - type="response", - response=ResponsesAPIResponse(**parsed_chunk), + openai_responses_api_chunk: ResponsesAPIStreamingResponse = ( + self.responses_api_provider_config.transform_streaming_response( + model=self.model, + parsed_chunk=parsed_chunk, + logging_obj=self.logging_obj, ) + ) + return openai_responses_api_chunk return ResponsesAPIStreamingResponse( type="response", response=parsed_chunk