From 2e3d3a62a5bc3f6928d7cc0707f89877bf0967b3 Mon Sep 17 00:00:00 2001 From: Dinesh Yeduguru Date: Tue, 10 Dec 2024 08:50:20 -0800 Subject: [PATCH] Revert "add tracing to library client (#591)" This reverts commit bc1fddf1df68fd845ae01f517eb8979f151e10d9. --- llama_stack/distribution/library_client.py | 40 +++++-------------- .../meta_reference/sqlite_span_processor.py | 26 +++--------- 2 files changed, 17 insertions(+), 49 deletions(-) diff --git a/llama_stack/distribution/library_client.py b/llama_stack/distribution/library_client.py index 45382c417..8766f7a72 100644 --- a/llama_stack/distribution/library_client.py +++ b/llama_stack/distribution/library_client.py @@ -24,7 +24,6 @@ from termcolor import cprint from llama_stack.distribution.build import print_pip_install_help from llama_stack.distribution.configure import parse_and_maybe_upgrade_config -from llama_stack.distribution.datatypes import Api from llama_stack.distribution.resolver import ProviderRegistry from llama_stack.distribution.server.endpoints import get_all_api_endpoints from llama_stack.distribution.stack import ( @@ -32,11 +31,6 @@ from llama_stack.distribution.stack import ( get_stack_run_config_from_template, replace_env_vars, ) -from llama_stack.providers.utils.telemetry.tracing import ( - end_trace, - setup_logger, - start_trace, -) T = TypeVar("T") @@ -246,10 +240,6 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient): ) return False - # Set up telemetry logger similar to server.py - if Api.telemetry in self.impls: - setup_logger(self.impls[Api.telemetry]) - console = Console() console.print(f"Using config [blue]{self.config_path_or_template_name}[/blue]:") console.print(yaml.dump(self.config.model_dump(), indent=2)) @@ -286,29 +276,21 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient): async def _call_non_streaming( self, path: str, body: dict = None, cast_to: Any = None ): - await start_trace(path, {"__location__": "library_client"}) - try: - func = self.endpoint_impls.get(path) - if not func: - raise ValueError(f"No endpoint found for {path}") + func = self.endpoint_impls.get(path) + if not func: + raise ValueError(f"No endpoint found for {path}") - body = self._convert_body(path, body) - return convert_pydantic_to_json_value(await func(**body), cast_to) - finally: - end_trace() + body = self._convert_body(path, body) + return convert_pydantic_to_json_value(await func(**body), cast_to) async def _call_streaming(self, path: str, body: dict = None, cast_to: Any = None): - await start_trace(path, {"__location__": "library_client"}) - try: - func = self.endpoint_impls.get(path) - if not func: - raise ValueError(f"No endpoint found for {path}") + func = self.endpoint_impls.get(path) + if not func: + raise ValueError(f"No endpoint found for {path}") - body = self._convert_body(path, body) - async for chunk in await func(**body): - yield convert_pydantic_to_json_value(chunk, cast_to) - finally: - end_trace() + body = self._convert_body(path, body) + async for chunk in await func(**body): + yield convert_pydantic_to_json_value(chunk, cast_to) def _convert_body(self, path: str, body: Optional[dict] = None) -> dict: if not body: diff --git a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py index f8fdbc12f..553dd5000 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py @@ -20,7 +20,6 @@ class SQLiteSpanProcessor(SpanProcessor): """Initialize the SQLite span processor with a connection string.""" self.conn_string = conn_string self.ttl_days = ttl_days - self._shutdown_event = threading.Event() self.cleanup_task = None self._thread_local = threading.local() self._connections: Dict[int, sqlite3.Connection] = {} @@ -145,10 +144,9 @@ class SQLiteSpanProcessor(SpanProcessor): """Run cleanup periodically.""" import time - while not self._shutdown_event.is_set(): + while True: time.sleep(3600) # Sleep for 1 hour - if not self._shutdown_event.is_set(): - self._cleanup_old_data() + self._cleanup_old_data() def on_start(self, span: Span, parent_context=None): """Called when a span starts.""" @@ -233,23 +231,11 @@ class SQLiteSpanProcessor(SpanProcessor): def shutdown(self): """Cleanup any resources.""" - self._shutdown_event.set() - - # Wait for cleanup thread to finish if it exists - if self.cleanup_task and self.cleanup_task.is_alive(): - self.cleanup_task.join(timeout=5.0) - current_thread_id = threading.get_ident() - with self._lock: - # Close all connections from the current thread - for thread_id, conn in list(self._connections.items()): - if thread_id == current_thread_id: - try: - if conn: - conn.close() - del self._connections[thread_id] - except sqlite3.Error: - pass # Ignore errors during shutdown + for conn in self._connections.values(): + if conn: + conn.close() + self._connections.clear() def force_flush(self, timeout_millis=30000): """Force export of spans."""