diff --git a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py index 3455c2236..168808bf8 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py @@ -7,6 +7,7 @@ import json import os import sqlite3 +import threading from datetime import datetime from opentelemetry.sdk.trace import SpanProcessor @@ -17,14 +18,18 @@ class SQLiteSpanProcessor(SpanProcessor): def __init__(self, conn_string): """Initialize the SQLite span processor with a connection string.""" self.conn_string = conn_string - self.conn = None + self._local = threading.local() # Thread-local storage for connections self.setup_database() - def _get_connection(self) -> sqlite3.Connection: - """Get the database connection.""" - if self.conn is None: - self.conn = sqlite3.connect(self.conn_string, check_same_thread=False) - return self.conn + def _get_connection(self): + """Get a thread-local database connection.""" + if not hasattr(self._local, "conn"): + try: + self._local.conn = sqlite3.connect(self.conn_string) + except Exception as e: + print(f"Error connecting to SQLite database: {e}") + raise e + return self._local.conn def setup_database(self): """Create the necessary tables if they don't exist.""" @@ -168,9 +173,14 @@ class SQLiteSpanProcessor(SpanProcessor): def shutdown(self): """Cleanup any resources.""" - if self.conn: - self.conn.close() - self.conn = None + # We can't access other threads' connections, so we just close our own + if hasattr(self._local, "conn"): + try: + self._local.conn.close() + except Exception as e: + print(f"Error closing SQLite connection: {e}") + finally: + del self._local.conn def force_flush(self, timeout_millis=30000): """Force export of spans."""