(feat) custom logger: async stream,assemble chunks

This commit is contained in:
ishaan-jaff 2023-12-09 10:10:48 -08:00
parent 5d65550732
commit c8b699c0aa
2 changed files with 41 additions and 28 deletions

View file

@ -698,7 +698,7 @@ class Logging:
"""
 Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions.
"""
start_time, end_time, result, complete_streaming_response = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result)
start_time, end_time, result = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result)
print_verbose(f"Async input callbacks: {litellm._async_input_callback}")
for callback in litellm._async_input_callback:
try:
@ -798,21 +798,10 @@ class Logging:
end_time = datetime.datetime.now()
self.model_call_details["log_event_type"] = "successful_api_call"
self.model_call_details["end_time"] = end_time
complete_streaming_response = None
## BUILD COMPLETE STREAMED RESPONSE
if self.stream:
if result.choices[0].finish_reason is not None: # if it's the last chunk
self.streaming_chunks.append(result)
complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks, messages=self.model_call_details.get("messages", None))
else:
self.streaming_chunks.append(result)
elif isinstance(result, OpenAIObject):
if isinstance(result, OpenAIObject):
result = result.model_dump()
if complete_streaming_response:
self.model_call_details["complete_streaming_response"] = complete_streaming_response
print_verbose(f"success callbacks: {litellm.success_callback}")
if litellm.max_budget and self.stream:
@ -820,7 +809,7 @@ class Logging:
float_diff = float(time_diff)
litellm._current_cost += litellm.completion_cost(model=self.model, prompt="", completion=result["content"], total_time=float_diff)
return start_time, end_time, result, complete_streaming_response
return start_time, end_time, result
except:
pass
@ -829,9 +818,25 @@ class Logging:
f"Logging Details LiteLLM-Success Call"
)
try:
start_time, end_time, result, complete_streaming_response = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result)
print_verbose(f"success callbacks: {litellm.success_callback}")
print_verbose(f"success callbacks: {litellm.success_callback}")
## BUILD COMPLETE STREAMED RESPONSE
complete_streaming_response = None
if self.model_call_details.get("litellm_params", {}).get("acompletion", False) == True:
# if it's acompletion == True, chunks are built/appended in async_success_handler
if result.choices[0].finish_reason is not None: # if it's the last chunk
complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks, messages=self.model_call_details.get("messages", None))
else:
# this is a completion() call
if self.stream:
if result.choices[0].finish_reason is not None: # if it's the last chunk
self.streaming_chunks.append(result)
complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks, messages=self.model_call_details.get("messages", None))
else:
self.streaming_chunks.append(result)
if complete_streaming_response:
self.model_call_details["complete_streaming_response"] = complete_streaming_response
start_time, end_time, result = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result)
for callback in litellm.success_callback:
try:
if callback == "lite_debugger":
@ -1026,9 +1031,19 @@ class Logging:
"""
Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions.
"""
start_time, end_time, result, complete_streaming_response = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result)
print_verbose(f"Async success callbacks: {litellm._async_success_callback}")
## BUILD COMPLETE STREAMED RESPONSE
complete_streaming_response = None
if self.stream:
if result.choices[0].finish_reason is not None: # if it's the last chunk
self.streaming_chunks.append(result)
complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks, messages=self.model_call_details.get("messages", None))
else:
self.streaming_chunks.append(result)
if complete_streaming_response:
self.model_call_details["complete_streaming_response"] = complete_streaming_response
start_time, end_time, result = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result)
for callback in litellm._async_success_callback:
try:
if isinstance(callback, CustomLogger): # custom logger class
@ -2031,8 +2046,10 @@ def get_litellm_params(
metadata=None,
model_info=None,
proxy_server_request=None,
acompletion=None,
):
litellm_params = {
"acompletion": acompletion,
"return_async": return_async,
"api_key": api_key,
"force_timeout": force_timeout,
@ -5349,7 +5366,7 @@ class CustomStreamWrapper:
return {"text": text, "is_finished": is_finished, "finish_reason": finish_reason}
return ""
def chunk_creator(self, chunk, in_async_func=False):
def chunk_creator(self, chunk):
model_response = ModelResponse(stream=True, model=self.model)
model_response.choices[0].finish_reason = None
response_obj = {}
@ -5526,10 +5543,7 @@ class CustomStreamWrapper:
self.sent_first_chunk = True
model_response.choices[0].delta = Delta(**completion_obj)
# LOGGING
if in_async_func != True:
# only do logging if we're not being called by _anext_
# _anext_ does its own logging, we check to avoid double counting chunks
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
print_verbose(f"model_response: {model_response}")
return model_response
else:
@ -5537,8 +5551,7 @@ class CustomStreamWrapper:
elif model_response.choices[0].finish_reason:
model_response.choices[0].finish_reason = map_finish_reason(model_response.choices[0].finish_reason) # ensure consistent output to openai
# LOGGING
if in_async_func != True:
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
return model_response
elif response_obj is not None and response_obj.get("original_chunk", None) is not None: # function / tool calling branch - only set for openai/azure compatible endpoints
# enter this branch when no content has been passed in response
@ -5560,8 +5573,7 @@ class CustomStreamWrapper:
model_response.choices[0].delta["role"] = "assistant"
self.sent_first_chunk = True
# LOGGING
if in_async_func != True:
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start() # log response
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start() # log response
return model_response
else:
return
@ -5607,7 +5619,7 @@ class CustomStreamWrapper:
# chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks.
# __anext__ also calls async_success_handler, which does logging
processed_chunk = self.chunk_creator(chunk=chunk, in_async_func=True)
processed_chunk = self.chunk_creator(chunk=chunk)
if processed_chunk is None:
continue
## LOGGING