diff --git a/llama_stack/distribution/library_client.py b/llama_stack/distribution/library_client.py index aefffc326..54b974258 100644 --- a/llama_stack/distribution/library_client.py +++ b/llama_stack/distribution/library_client.py @@ -62,71 +62,6 @@ def in_notebook(): return True -# def stream_across_asyncio_run_boundary( -# async_gen_maker, -# pool_executor: ThreadPoolExecutor, -# path: Optional[str] = None, -# provider_data: Optional[dict[str, Any]] = None, -# ) -> Generator[T, None, None]: -# result_queue = queue.Queue() -# stop_event = threading.Event() - -# async def consumer(): -# # make sure we make the generator in the event loop context -# gen = await async_gen_maker() -# await start_trace(path, {"__location__": "library_client"}) -# if provider_data: -# set_request_provider_data( -# {"X-LlamaStack-Provider-Data": json.dumps(provider_data)} -# ) -# try: -# async for item in await gen: -# result_queue.put(item) -# except Exception as e: -# print(f"Error in generator {e}") -# result_queue.put(e) -# except asyncio.CancelledError: -# return -# finally: -# result_queue.put(StopIteration) -# stop_event.set() -# await end_trace() - -# def run_async(): -# # Run our own loop to avoid double async generator cleanup which is done -# # by asyncio.run() -# loop = asyncio.new_event_loop() -# asyncio.set_event_loop(loop) -# try: -# task = loop.create_task(consumer()) -# loop.run_until_complete(task) -# finally: -# # Handle pending tasks like a generator's athrow() -# pending = asyncio.all_tasks(loop) -# if pending: -# loop.run_until_complete( -# asyncio.gather(*pending, return_exceptions=True) -# ) -# loop.close() - -# future = pool_executor.submit(run_async) - -# try: -# # yield results as they come in -# while not stop_event.is_set() or not result_queue.empty(): -# try: -# item = result_queue.get(timeout=0.1) -# if item is StopIteration: -# break -# if isinstance(item, Exception): -# raise item -# yield item -# except queue.Empty: -# continue -# finally: -# future.result() - - def convert_pydantic_to_json_value(value: Any) -> Any: if isinstance(value, Enum): return value.value @@ -224,7 +159,6 @@ class LlamaStackAsLibraryClient(LlamaStackClient): # NOTE: We are using AsyncLlamaStackClient under the hood # A new event loop is needed to convert the AsyncStream # from async client into SyncStream return type for streaming - print("create new event loop") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -239,18 +173,11 @@ class LlamaStackAsLibraryClient(LlamaStackClient): chunk = loop.run_until_complete(async_stream.__anext__()) yield chunk except StopAsyncIteration: - print("StopAsyncIteration in sync_generator") + pass finally: loop.close() return sync_generator() - - # return stream_across_asyncio_run_boundary( - # lambda: self.async_client.request(*args, **kwargs), - # self.pool_executor, - # path=path, - # provider_data=self.provider_data, - # ) else: async def _traced_request():