add async streaming support

This commit is contained in:
Ishaan Jaff 2025-03-11 20:00:42 -07:00
parent d09bd6c353
commit 7b1172d9b9
4 changed files with 42 additions and 22 deletions

View file

@ -9,6 +9,7 @@ from litellm.types.llms.openai import (
ResponsesAPIOptionalRequestParams, ResponsesAPIOptionalRequestParams,
ResponsesAPIRequestParams, ResponsesAPIRequestParams,
ResponsesAPIResponse, ResponsesAPIResponse,
ResponsesAPIStreamingResponse,
) )
from litellm.types.router import GenericLiteLLMParams from litellm.types.router import GenericLiteLLMParams
from litellm.types.utils import ModelInfo from litellm.types.utils import ModelInfo
@ -107,6 +108,18 @@ class BaseResponsesAPIConfig(ABC):
) -> ResponsesAPIResponse: ) -> ResponsesAPIResponse:
pass pass
@abstractmethod
def transform_streaming_response(
self,
model: str,
parsed_chunk: dict,
logging_obj: LiteLLMLoggingObj,
) -> ResponsesAPIStreamingResponse:
"""
Transform a parsed streaming response chunk into a ResponsesAPIStreamingResponse
"""
pass
def get_error_class( def get_error_class(
self, error_message: str, status_code: int, headers: Union[dict, httpx.Headers] self, error_message: str, status_code: int, headers: Union[dict, httpx.Headers]
) -> BaseLLMException: ) -> BaseLLMException:

View file

@ -1030,6 +1030,7 @@ class BaseLLMHTTPHandler:
response=response, response=response,
model=model, model=model,
logging_obj=logging_obj, logging_obj=logging_obj,
responses_api_provider_config=responses_api_provider_config,
) )
else: else:
# For non-streaming, proceed as before # For non-streaming, proceed as before

View file

@ -10,6 +10,7 @@ from litellm.types.llms.openai import (
ResponsesAPIOptionalRequestParams, ResponsesAPIOptionalRequestParams,
ResponsesAPIRequestParams, ResponsesAPIRequestParams,
ResponsesAPIResponse, ResponsesAPIResponse,
ResponsesAPIStreamingResponse,
) )
from litellm.types.router import GenericLiteLLMParams from litellm.types.router import GenericLiteLLMParams
@ -126,3 +127,15 @@ class OpenAIResponsesAPIConfig(BaseResponsesAPIConfig):
api_base = api_base.rstrip("/") api_base = api_base.rstrip("/")
return f"{api_base}/responses" return f"{api_base}/responses"
def transform_streaming_response(
self,
model: str,
parsed_chunk: dict,
logging_obj: LiteLLMLoggingObj,
) -> ResponsesAPIStreamingResponse:
"""
Transform a parsed streaming response chunk into a ResponsesAPIStreamingResponse
"""
# Convert the dictionary to a properly typed ResponsesAPIStreamingResponse
return ResponsesAPIStreamingResponse(**parsed_chunk)

View file

@ -4,10 +4,12 @@ from typing import Any, AsyncIterator, Dict, Optional, Union
import httpx import httpx
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.llms.base_llm.responses.transformation import BaseResponsesAPIConfig
from litellm.types.llms.openai import ( from litellm.types.llms.openai import (
ResponsesAPIResponse, ResponsesAPIResponse,
ResponsesAPIStreamingResponse, ResponsesAPIStreamingResponse,
) )
from litellm.utils import CustomStreamWrapper
class ResponsesAPIStreamingIterator: class ResponsesAPIStreamingIterator:
@ -22,13 +24,15 @@ class ResponsesAPIStreamingIterator:
self, self,
response: httpx.Response, response: httpx.Response,
model: str, model: str,
logging_obj: Optional[LiteLLMLoggingObj] = None, responses_api_provider_config: BaseResponsesAPIConfig,
logging_obj: LiteLLMLoggingObj,
): ):
self.response = response self.response = response
self.model = model self.model = model
self.logging_obj = logging_obj self.logging_obj = logging_obj
self.stream_iterator = response.aiter_lines() self.stream_iterator = response.aiter_lines()
self.finished = False self.finished = False
self.responses_api_provider_config = responses_api_provider_config
def __aiter__(self): def __aiter__(self):
return self return self
@ -49,8 +53,9 @@ class ResponsesAPIStreamingIterator:
return await self.__anext__() return await self.__anext__()
# Handle SSE format (data: {...}) # Handle SSE format (data: {...})
if chunk.startswith("data: "): chunk = CustomStreamWrapper._strip_sse_data_from_chunk(chunk)
chunk = chunk[6:] # Remove "data: " prefix if chunk is None:
return await self.__anext__()
# Handle "[DONE]" marker # Handle "[DONE]" marker
if chunk == "[DONE]": if chunk == "[DONE]":
@ -61,28 +66,16 @@ class ResponsesAPIStreamingIterator:
# Parse the JSON chunk # Parse the JSON chunk
parsed_chunk = json.loads(chunk) parsed_chunk = json.loads(chunk)
# Log the chunk if logging is enabled
if self.logging_obj:
self.logging_obj.post_call(
input="",
api_key="",
original_response=parsed_chunk,
additional_args={
"complete_streaming_chunk": parsed_chunk,
},
)
# Format as ResponsesAPIStreamingResponse # Format as ResponsesAPIStreamingResponse
if isinstance(parsed_chunk, dict): if isinstance(parsed_chunk, dict):
# If the chunk already has a 'type' field, it's already in the right format openai_responses_api_chunk: ResponsesAPIStreamingResponse = (
if "type" in parsed_chunk: self.responses_api_provider_config.transform_streaming_response(
return ResponsesAPIStreamingResponse(**parsed_chunk) model=self.model,
# Otherwise, wrap it as a response parsed_chunk=parsed_chunk,
else: logging_obj=self.logging_obj,
return ResponsesAPIStreamingResponse(
type="response",
response=ResponsesAPIResponse(**parsed_chunk),
) )
)
return openai_responses_api_chunk
return ResponsesAPIStreamingResponse( return ResponsesAPIStreamingResponse(
type="response", response=parsed_chunk type="response", response=parsed_chunk