From ba85634758c72d51ea5d76a01f466a69b5a1fdf8 Mon Sep 17 00:00:00 2001 From: Xi Yan Date: Tue, 14 Jan 2025 10:15:45 -0800 Subject: [PATCH] create new event loop for non-streaming case explicitly --- llama_stack/distribution/library_client.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/llama_stack/distribution/library_client.py b/llama_stack/distribution/library_client.py index 6e38620a2..e579b22ca 100644 --- a/llama_stack/distribution/library_client.py +++ b/llama_stack/distribution/library_client.py @@ -144,22 +144,19 @@ class LlamaStackAsLibraryClient(LlamaStackClient): print(f"Removed handler {handler.__class__.__name__} from root logger") def request(self, *args, **kwargs): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + async_response = loop.run_until_complete( + self.async_client.request(*args, **kwargs) + ) if kwargs.get("stream"): # NOTE: We are using AsyncLlamaStackClient under the hood - # A new event loop is needed to convert the AsyncStream - # from async client into SyncStream return type for streaming - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Call the async client request and get the AsyncStream - async_stream = loop.run_until_complete( - self.async_client.request(*args, **kwargs) - ) - + # We need to convert the AsyncStream from async client into + # SyncStream return type for streaming def sync_generator(): try: while True: - chunk = loop.run_until_complete(async_stream.__anext__()) + chunk = loop.run_until_complete(async_response.__anext__()) yield chunk except StopAsyncIteration: pass @@ -168,7 +165,8 @@ class LlamaStackAsLibraryClient(LlamaStackClient): return sync_generator() else: - return asyncio.run(self.async_client.request(*args, **kwargs)) + loop.close() + return async_response class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient):