mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
fix(main.py): fix logging event loop for async logging but sync streaming
This commit is contained in:
parent
bbe6a92eb9
commit
72275ad8cb
2 changed files with 19 additions and 14 deletions
|
@ -274,14 +274,10 @@ async def acompletion(
|
||||||
else:
|
else:
|
||||||
# Call the synchronous function using run_in_executor
|
# Call the synchronous function using run_in_executor
|
||||||
response = await loop.run_in_executor(None, func_with_context) # type: ignore
|
response = await loop.run_in_executor(None, func_with_context) # type: ignore
|
||||||
# if kwargs.get("stream", False): # return an async generator
|
if isinstance(response, CustomStreamWrapper):
|
||||||
# return _async_streaming(
|
response.set_logging_event_loop(
|
||||||
# response=response,
|
loop=loop
|
||||||
# model=model,
|
) # sets the logging event loop if the user does sync streaming (e.g. on proxy for sagemaker calls)
|
||||||
# custom_llm_provider=custom_llm_provider,
|
|
||||||
# args=args,
|
|
||||||
# )
|
|
||||||
# else:
|
|
||||||
return response
|
return response
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
custom_llm_provider = custom_llm_provider or "openai"
|
custom_llm_provider = custom_llm_provider or "openai"
|
||||||
|
|
|
@ -7116,6 +7116,7 @@ class CustomStreamWrapper:
|
||||||
"model_id": (_model_info.get("id", None))
|
"model_id": (_model_info.get("id", None))
|
||||||
} # returned as x-litellm-model-id response header in proxy
|
} # returned as x-litellm-model-id response header in proxy
|
||||||
self.response_id = None
|
self.response_id = None
|
||||||
|
self.logging_loop = None
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
return self
|
return self
|
||||||
|
@ -8016,16 +8017,24 @@ class CustomStreamWrapper:
|
||||||
original_exception=e,
|
original_exception=e,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def set_logging_event_loop(self, loop):
|
||||||
|
self.logging_loop = loop
|
||||||
|
|
||||||
|
async def your_async_function(self):
|
||||||
|
# Your asynchronous code here
|
||||||
|
return "Your asynchronous code is running"
|
||||||
|
|
||||||
def run_success_logging_in_thread(self, processed_chunk):
|
def run_success_logging_in_thread(self, processed_chunk):
|
||||||
# Create an event loop for the new thread
|
# Create an event loop for the new thread
|
||||||
## ASYNC LOGGING
|
## ASYNC LOGGING
|
||||||
# Run the asynchronous function in the new thread's event loop
|
if self.logging_loop is not None:
|
||||||
asyncio.run(
|
future = asyncio.run_coroutine_threadsafe(
|
||||||
self.logging_obj.async_success_handler(
|
self.logging_obj.async_success_handler(processed_chunk),
|
||||||
processed_chunk,
|
loop=self.logging_loop,
|
||||||
)
|
)
|
||||||
)
|
result = future.result()
|
||||||
|
else:
|
||||||
|
asyncio.run(self.logging_obj.async_success_handler(processed_chunk))
|
||||||
## SYNC LOGGING
|
## SYNC LOGGING
|
||||||
self.logging_obj.success_handler(processed_chunk)
|
self.logging_obj.success_handler(processed_chunk)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue