forked from phoenix/litellm-mirror
perf improvement - use tasks for async logging pass through responses
This commit is contained in:
parent
e69678a9b3
commit
2c0551f0a8
2 changed files with 32 additions and 27 deletions
|
@ -528,16 +528,19 @@ async def pass_through_request( # noqa: PLR0915
|
|||
response_body: Optional[dict] = get_response_body(response)
|
||||
passthrough_logging_payload["response_body"] = response_body
|
||||
end_time = datetime.now()
|
||||
await pass_through_endpoint_logging.pass_through_async_success_handler(
|
||||
httpx_response=response,
|
||||
response_body=response_body,
|
||||
url_route=str(url),
|
||||
result="",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
logging_obj=logging_obj,
|
||||
cache_hit=False,
|
||||
**kwargs,
|
||||
|
||||
asyncio.create_task(
|
||||
pass_through_endpoint_logging.pass_through_async_success_handler(
|
||||
httpx_response=response,
|
||||
response_body=response_body,
|
||||
url_route=str(url),
|
||||
result="",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
logging_obj=logging_obj,
|
||||
cache_hit=False,
|
||||
**kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
return Response(
|
||||
|
|
|
@ -58,22 +58,24 @@ class PassThroughStreamingHandler:
|
|||
# After all chunks are processed, handle post-processing
|
||||
end_time = datetime.now()
|
||||
|
||||
await PassThroughStreamingHandler._route_streaming_logging_to_handler(
|
||||
litellm_logging_obj=litellm_logging_obj,
|
||||
passthrough_success_handler_obj=passthrough_success_handler_obj,
|
||||
url_route=url_route,
|
||||
request_body=request_body or {},
|
||||
endpoint_type=endpoint_type,
|
||||
start_time=start_time,
|
||||
raw_bytes=raw_bytes,
|
||||
end_time=end_time,
|
||||
asyncio.create_task(
|
||||
PassThroughStreamingHandler.handle_logging_collected_stream_response(
|
||||
litellm_logging_obj=litellm_logging_obj,
|
||||
passthrough_success_handler_obj=passthrough_success_handler_obj,
|
||||
url_route=url_route,
|
||||
request_body=request_body or {},
|
||||
endpoint_type=endpoint_type,
|
||||
start_time=start_time,
|
||||
raw_bytes=raw_bytes,
|
||||
end_time=end_time,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.error(f"Error in chunk_processor: {str(e)}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
async def _route_streaming_logging_to_handler(
|
||||
async def handle_logging_collected_stream_response(
|
||||
litellm_logging_obj: LiteLLMLoggingObj,
|
||||
passthrough_success_handler_obj: PassThroughEndpointLogging,
|
||||
url_route: str,
|
||||
|
@ -108,9 +110,9 @@ class PassThroughStreamingHandler:
|
|||
all_chunks=all_chunks,
|
||||
end_time=end_time,
|
||||
)
|
||||
standard_logging_response_object = anthropic_passthrough_logging_handler_result[
|
||||
"result"
|
||||
]
|
||||
standard_logging_response_object = (
|
||||
anthropic_passthrough_logging_handler_result["result"]
|
||||
)
|
||||
kwargs = anthropic_passthrough_logging_handler_result["kwargs"]
|
||||
elif endpoint_type == EndpointType.VERTEX_AI:
|
||||
vertex_passthrough_logging_handler_result = (
|
||||
|
@ -125,9 +127,9 @@ class PassThroughStreamingHandler:
|
|||
end_time=end_time,
|
||||
)
|
||||
)
|
||||
standard_logging_response_object = vertex_passthrough_logging_handler_result[
|
||||
"result"
|
||||
]
|
||||
standard_logging_response_object = (
|
||||
vertex_passthrough_logging_handler_result["result"]
|
||||
)
|
||||
kwargs = vertex_passthrough_logging_handler_result["kwargs"]
|
||||
|
||||
if standard_logging_response_object is None:
|
||||
|
@ -168,4 +170,4 @@ class PassThroughStreamingHandler:
|
|||
# Split by newlines and filter out empty lines
|
||||
lines = [line.strip() for line in combined_str.split("\n") if line.strip()]
|
||||
|
||||
return lines
|
||||
return lines
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue