Revert "add tracing to library client (#591)"

This reverts commit bc1fddf1df.
This commit is contained in:
Dinesh Yeduguru 2024-12-10 08:50:20 -08:00
parent 16d103842a
commit 2e3d3a62a5
2 changed files with 17 additions and 49 deletions

View file

@ -24,7 +24,6 @@ from termcolor import cprint
from llama_stack.distribution.build import print_pip_install_help from llama_stack.distribution.build import print_pip_install_help
from llama_stack.distribution.configure import parse_and_maybe_upgrade_config from llama_stack.distribution.configure import parse_and_maybe_upgrade_config
from llama_stack.distribution.datatypes import Api
from llama_stack.distribution.resolver import ProviderRegistry from llama_stack.distribution.resolver import ProviderRegistry
from llama_stack.distribution.server.endpoints import get_all_api_endpoints from llama_stack.distribution.server.endpoints import get_all_api_endpoints
from llama_stack.distribution.stack import ( from llama_stack.distribution.stack import (
@ -32,11 +31,6 @@ from llama_stack.distribution.stack import (
get_stack_run_config_from_template, get_stack_run_config_from_template,
replace_env_vars, replace_env_vars,
) )
from llama_stack.providers.utils.telemetry.tracing import (
end_trace,
setup_logger,
start_trace,
)
T = TypeVar("T") T = TypeVar("T")
@ -246,10 +240,6 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient):
) )
return False return False
# Set up telemetry logger similar to server.py
if Api.telemetry in self.impls:
setup_logger(self.impls[Api.telemetry])
console = Console() console = Console()
console.print(f"Using config [blue]{self.config_path_or_template_name}[/blue]:") console.print(f"Using config [blue]{self.config_path_or_template_name}[/blue]:")
console.print(yaml.dump(self.config.model_dump(), indent=2)) console.print(yaml.dump(self.config.model_dump(), indent=2))
@ -286,20 +276,14 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient):
async def _call_non_streaming( async def _call_non_streaming(
self, path: str, body: dict = None, cast_to: Any = None self, path: str, body: dict = None, cast_to: Any = None
): ):
await start_trace(path, {"__location__": "library_client"})
try:
func = self.endpoint_impls.get(path) func = self.endpoint_impls.get(path)
if not func: if not func:
raise ValueError(f"No endpoint found for {path}") raise ValueError(f"No endpoint found for {path}")
body = self._convert_body(path, body) body = self._convert_body(path, body)
return convert_pydantic_to_json_value(await func(**body), cast_to) return convert_pydantic_to_json_value(await func(**body), cast_to)
finally:
end_trace()
async def _call_streaming(self, path: str, body: dict = None, cast_to: Any = None): async def _call_streaming(self, path: str, body: dict = None, cast_to: Any = None):
await start_trace(path, {"__location__": "library_client"})
try:
func = self.endpoint_impls.get(path) func = self.endpoint_impls.get(path)
if not func: if not func:
raise ValueError(f"No endpoint found for {path}") raise ValueError(f"No endpoint found for {path}")
@ -307,8 +291,6 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient):
body = self._convert_body(path, body) body = self._convert_body(path, body)
async for chunk in await func(**body): async for chunk in await func(**body):
yield convert_pydantic_to_json_value(chunk, cast_to) yield convert_pydantic_to_json_value(chunk, cast_to)
finally:
end_trace()
def _convert_body(self, path: str, body: Optional[dict] = None) -> dict: def _convert_body(self, path: str, body: Optional[dict] = None) -> dict:
if not body: if not body:

View file

@ -20,7 +20,6 @@ class SQLiteSpanProcessor(SpanProcessor):
"""Initialize the SQLite span processor with a connection string.""" """Initialize the SQLite span processor with a connection string."""
self.conn_string = conn_string self.conn_string = conn_string
self.ttl_days = ttl_days self.ttl_days = ttl_days
self._shutdown_event = threading.Event()
self.cleanup_task = None self.cleanup_task = None
self._thread_local = threading.local() self._thread_local = threading.local()
self._connections: Dict[int, sqlite3.Connection] = {} self._connections: Dict[int, sqlite3.Connection] = {}
@ -145,9 +144,8 @@ class SQLiteSpanProcessor(SpanProcessor):
"""Run cleanup periodically.""" """Run cleanup periodically."""
import time import time
while not self._shutdown_event.is_set(): while True:
time.sleep(3600) # Sleep for 1 hour time.sleep(3600) # Sleep for 1 hour
if not self._shutdown_event.is_set():
self._cleanup_old_data() self._cleanup_old_data()
def on_start(self, span: Span, parent_context=None): def on_start(self, span: Span, parent_context=None):
@ -233,23 +231,11 @@ class SQLiteSpanProcessor(SpanProcessor):
def shutdown(self): def shutdown(self):
"""Cleanup any resources.""" """Cleanup any resources."""
self._shutdown_event.set()
# Wait for cleanup thread to finish if it exists
if self.cleanup_task and self.cleanup_task.is_alive():
self.cleanup_task.join(timeout=5.0)
current_thread_id = threading.get_ident()
with self._lock: with self._lock:
# Close all connections from the current thread for conn in self._connections.values():
for thread_id, conn in list(self._connections.items()):
if thread_id == current_thread_id:
try:
if conn: if conn:
conn.close() conn.close()
del self._connections[thread_id] self._connections.clear()
except sqlite3.Error:
pass # Ignore errors during shutdown
def force_flush(self, timeout_millis=30000): def force_flush(self, timeout_millis=30000):
"""Force export of spans.""" """Force export of spans."""