diff --git a/litellm/responses/streaming_iterator.py b/litellm/responses/streaming_iterator.py index 7325e4e645..cc8711c3e1 100644 --- a/litellm/responses/streaming_iterator.py +++ b/litellm/responses/streaming_iterator.py @@ -6,7 +6,9 @@ from typing import Any, AsyncIterator, Dict, Optional, Union import httpx from litellm.constants import STREAM_SSE_DONE_STRING +from litellm.litellm_core_utils.asyncify import run_async_function from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj +from litellm.litellm_core_utils.thread_pool_executor import executor from litellm.llms.base_llm.responses.transformation import BaseResponsesAPIConfig from litellm.types.llms.openai import ( ResponsesAPIStreamEvents, @@ -72,7 +74,7 @@ class BaseResponsesAPIStreamingIterator: == ResponsesAPIStreamEvents.RESPONSE_COMPLETED ): self.completed_response = openai_responses_api_chunk - self._handle_completed_response() + self._handle_logging_completed_response() return openai_responses_api_chunk @@ -81,7 +83,7 @@ class BaseResponsesAPIStreamingIterator: # If we can't parse the chunk, continue return None - def _handle_completed_response(self): + def _handle_logging_completed_response(self): """Base implementation - should be overridden by subclasses""" pass @@ -128,7 +130,7 @@ class ResponsesAPIStreamingIterator(BaseResponsesAPIStreamingIterator): self.finished = True raise e - def _handle_completed_response(self): + def _handle_logging_completed_response(self): """Handle logging for completed responses in async context""" asyncio.create_task( self.logging_obj.async_success_handler( @@ -139,6 +141,14 @@ class ResponsesAPIStreamingIterator(BaseResponsesAPIStreamingIterator): ) ) + executor.submit( + self.logging_obj.success_handler, + result=self.completed_response, + cache_hit=None, + start_time=self.start_time, + end_time=datetime.now(), + ) + class SyncResponsesAPIStreamingIterator(BaseResponsesAPIStreamingIterator): """ @@ -181,11 +191,20 @@ class SyncResponsesAPIStreamingIterator(BaseResponsesAPIStreamingIterator): self.finished = True raise e - def _handle_completed_response(self): + def _handle_logging_completed_response(self): """Handle logging for completed responses in sync context""" - self.logging_obj.success_handler( + run_async_function( + async_function=self.logging_obj.async_success_handler, result=self.completed_response, start_time=self.start_time, end_time=datetime.now(), cache_hit=None, ) + + executor.submit( + self.logging_obj.success_handler, + result=self.completed_response, + cache_hit=None, + start_time=self.start_time, + end_time=datetime.now(), + )