mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 19:54:13 +00:00
(fix) async logging race condition
This commit is contained in:
parent
6b253fec7c
commit
bb33c9230d
1 changed files with 22 additions and 13 deletions
|
@ -1640,7 +1640,7 @@ class Logging:
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
"Async success callbacks: Got a complete streaming response"
|
"Async success callbacks: Got a complete streaming response"
|
||||||
)
|
)
|
||||||
self.model_call_details["complete_streaming_response"] = (
|
self.model_call_details["async_complete_streaming_response"] = (
|
||||||
complete_streaming_response
|
complete_streaming_response
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
@ -1688,28 +1688,31 @@ class Logging:
|
||||||
print_verbose("async success_callback: reaches cache for logging!")
|
print_verbose("async success_callback: reaches cache for logging!")
|
||||||
kwargs = self.model_call_details
|
kwargs = self.model_call_details
|
||||||
if self.stream:
|
if self.stream:
|
||||||
if "complete_streaming_response" not in kwargs:
|
if "async_complete_streaming_response" not in kwargs:
|
||||||
print_verbose(
|
print_verbose(
|
||||||
f"async success_callback: reaches cache for logging, there is no complete_streaming_response. Kwargs={kwargs}\n\n"
|
f"async success_callback: reaches cache for logging, there is no async_complete_streaming_response. Kwargs={kwargs}\n\n"
|
||||||
)
|
)
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
print_verbose(
|
print_verbose(
|
||||||
"async success_callback: reaches cache for logging, there is a complete_streaming_response. Adding to cache"
|
"async success_callback: reaches cache for logging, there is a async_complete_streaming_response. Adding to cache"
|
||||||
)
|
)
|
||||||
result = kwargs["complete_streaming_response"]
|
result = kwargs["async_complete_streaming_response"]
|
||||||
# only add to cache once we have a complete streaming response
|
# only add to cache once we have a complete streaming response
|
||||||
litellm.cache.add_cache(result, **kwargs)
|
litellm.cache.add_cache(result, **kwargs)
|
||||||
if isinstance(callback, CustomLogger): # custom logger class
|
if isinstance(callback, CustomLogger): # custom logger class
|
||||||
print_verbose(
|
print_verbose(
|
||||||
f"Async success callbacks: {callback}; self.stream: {self.stream}; complete_streaming_response: {self.model_call_details.get('complete_streaming_response', None)}"
|
f"Running Async success callback: {callback}; self.stream: {self.stream}; async_complete_streaming_response: {self.model_call_details.get('async_complete_streaming_response', None)} result={result}"
|
||||||
)
|
)
|
||||||
if self.stream == True:
|
if self.stream == True:
|
||||||
if "complete_streaming_response" in self.model_call_details:
|
if (
|
||||||
|
"async_complete_streaming_response"
|
||||||
|
in self.model_call_details
|
||||||
|
):
|
||||||
await callback.async_log_success_event(
|
await callback.async_log_success_event(
|
||||||
kwargs=self.model_call_details,
|
kwargs=self.model_call_details,
|
||||||
response_obj=self.model_call_details[
|
response_obj=self.model_call_details[
|
||||||
"complete_streaming_response"
|
"async_complete_streaming_response"
|
||||||
],
|
],
|
||||||
start_time=start_time,
|
start_time=start_time,
|
||||||
end_time=end_time,
|
end_time=end_time,
|
||||||
|
@ -1730,15 +1733,18 @@ class Logging:
|
||||||
)
|
)
|
||||||
if callable(callback): # custom logger functions
|
if callable(callback): # custom logger functions
|
||||||
print_verbose(
|
print_verbose(
|
||||||
f"Making async function logging call - {self.model_call_details}"
|
f"Making async function logging call for {callback}, result={result} - {self.model_call_details}"
|
||||||
)
|
)
|
||||||
if self.stream:
|
if self.stream:
|
||||||
if "complete_streaming_response" in self.model_call_details:
|
if (
|
||||||
|
"async_complete_streaming_response"
|
||||||
|
in self.model_call_details
|
||||||
|
):
|
||||||
|
|
||||||
await customLogger.async_log_event(
|
await customLogger.async_log_event(
|
||||||
kwargs=self.model_call_details,
|
kwargs=self.model_call_details,
|
||||||
response_obj=self.model_call_details[
|
response_obj=self.model_call_details[
|
||||||
"complete_streaming_response"
|
"async_complete_streaming_response"
|
||||||
],
|
],
|
||||||
start_time=start_time,
|
start_time=start_time,
|
||||||
end_time=end_time,
|
end_time=end_time,
|
||||||
|
@ -1759,14 +1765,17 @@ class Logging:
|
||||||
if dynamoLogger is None:
|
if dynamoLogger is None:
|
||||||
dynamoLogger = DyanmoDBLogger()
|
dynamoLogger = DyanmoDBLogger()
|
||||||
if self.stream:
|
if self.stream:
|
||||||
if "complete_streaming_response" in self.model_call_details:
|
if (
|
||||||
|
"async_complete_streaming_response"
|
||||||
|
in self.model_call_details
|
||||||
|
):
|
||||||
print_verbose(
|
print_verbose(
|
||||||
"DynamoDB Logger: Got Stream Event - Completed Stream Response"
|
"DynamoDB Logger: Got Stream Event - Completed Stream Response"
|
||||||
)
|
)
|
||||||
await dynamoLogger._async_log_event(
|
await dynamoLogger._async_log_event(
|
||||||
kwargs=self.model_call_details,
|
kwargs=self.model_call_details,
|
||||||
response_obj=self.model_call_details[
|
response_obj=self.model_call_details[
|
||||||
"complete_streaming_response"
|
"async_complete_streaming_response"
|
||||||
],
|
],
|
||||||
start_time=start_time,
|
start_time=start_time,
|
||||||
end_time=end_time,
|
end_time=end_time,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue