diff --git a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py index f8fdbc12f..3455c2236 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py @@ -7,34 +7,24 @@ import json import os import sqlite3 -import threading -from datetime import datetime, timedelta -from typing import Dict +from datetime import datetime from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.trace import Span class SQLiteSpanProcessor(SpanProcessor): - def __init__(self, conn_string, ttl_days=30): + def __init__(self, conn_string): """Initialize the SQLite span processor with a connection string.""" self.conn_string = conn_string - self.ttl_days = ttl_days - self._shutdown_event = threading.Event() - self.cleanup_task = None - self._thread_local = threading.local() - self._connections: Dict[int, sqlite3.Connection] = {} - self._lock = threading.Lock() + self.conn = None self.setup_database() def _get_connection(self) -> sqlite3.Connection: - """Get a thread-specific database connection.""" - thread_id = threading.get_ident() - with self._lock: - if thread_id not in self._connections: - conn = sqlite3.connect(self.conn_string) - self._connections[thread_id] = conn - return self._connections[thread_id] + """Get the database connection.""" + if self.conn is None: + self.conn = sqlite3.connect(self.conn_string, check_same_thread=False) + return self.conn def setup_database(self): """Create the necessary tables if they don't exist.""" @@ -95,61 +85,6 @@ class SQLiteSpanProcessor(SpanProcessor): conn.commit() cursor.close() - # Start periodic cleanup in a separate thread - self.cleanup_task = threading.Thread(target=self._periodic_cleanup, daemon=True) - self.cleanup_task.start() - - def _cleanup_old_data(self): - """Delete records older than TTL.""" - try: - conn = self._get_connection() - cutoff_date = (datetime.now() - timedelta(days=self.ttl_days)).isoformat() - cursor = conn.cursor() - - # Delete old span events - cursor.execute( - """ - DELETE FROM span_events - WHERE span_id IN ( - SELECT span_id FROM spans - WHERE trace_id IN ( - SELECT trace_id FROM traces - WHERE created_at < ? - ) - ) - """, - (cutoff_date,), - ) - - # Delete old spans - cursor.execute( - """ - DELETE FROM spans - WHERE trace_id IN ( - SELECT trace_id FROM traces - WHERE created_at < ? - ) - """, - (cutoff_date,), - ) - - # Delete old traces - cursor.execute("DELETE FROM traces WHERE created_at < ?", (cutoff_date,)) - - conn.commit() - cursor.close() - except Exception as e: - print(f"Error during cleanup: {e}") - - def _periodic_cleanup(self): - """Run cleanup periodically.""" - import time - - while not self._shutdown_event.is_set(): - time.sleep(3600) # Sleep for 1 hour - if not self._shutdown_event.is_set(): - self._cleanup_old_data() - def on_start(self, span: Span, parent_context=None): """Called when a span starts.""" pass @@ -233,23 +168,9 @@ class SQLiteSpanProcessor(SpanProcessor): def shutdown(self): """Cleanup any resources.""" - self._shutdown_event.set() - - # Wait for cleanup thread to finish if it exists - if self.cleanup_task and self.cleanup_task.is_alive(): - self.cleanup_task.join(timeout=5.0) - current_thread_id = threading.get_ident() - - with self._lock: - # Close all connections from the current thread - for thread_id, conn in list(self._connections.items()): - if thread_id == current_thread_id: - try: - if conn: - conn.close() - del self._connections[thread_id] - except sqlite3.Error: - pass # Ignore errors during shutdown + if self.conn: + self.conn.close() + self.conn = None def force_flush(self, timeout_millis=30000): """Force export of spans."""