diff --git a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py index 3455c2236..18184efdf 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py @@ -6,7 +6,9 @@ import json import os +import queue import sqlite3 +import threading from datetime import datetime from opentelemetry.sdk.trace import SpanProcessor @@ -17,14 +19,40 @@ class SQLiteSpanProcessor(SpanProcessor): def __init__(self, conn_string): """Initialize the SQLite span processor with a connection string.""" self.conn_string = conn_string - self.conn = None + self._local = threading.local() + self._queue = queue.Queue() + self._worker = threading.Thread(target=self._process_queue, daemon=True) + self._running = True self.setup_database() + self._worker.start() def _get_connection(self) -> sqlite3.Connection: - """Get the database connection.""" - if self.conn is None: - self.conn = sqlite3.connect(self.conn_string, check_same_thread=False) - return self.conn + """Get a thread-local database connection.""" + if not hasattr(self._local, "conn"): + self._local.conn = sqlite3.connect( + self.conn_string, + timeout=60.0, # Increase timeout for busy waiting + isolation_level="IMMEDIATE", # This helps prevent "database is locked" errors + ) + return self._local.conn + + def _process_queue(self): + """Worker thread to process spans from the queue.""" + conn = None + try: + conn = sqlite3.connect(self.conn_string) + while self._running or not self._queue.empty(): + try: + span = self._queue.get(timeout=1.0) + self._export_span(span, conn) + self._queue.task_done() + except queue.Empty: + continue + except Exception as e: + print(f"Error processing span: {e}") + finally: + if conn: + conn.close() def setup_database(self): """Create the necessary tables if they don't exist.""" @@ -90,9 +118,12 @@ class SQLiteSpanProcessor(SpanProcessor): pass def on_end(self, span: Span): - """Called when a span ends. Export the span data to SQLite.""" + """Queue the span for processing instead of processing directly.""" + self._queue.put(span) + + def _export_span(self, span: Span, conn: sqlite3.Connection): + """Export the span to SQLite using the provided connection.""" try: - conn = self._get_connection() cursor = conn.cursor() trace_id = format(span.get_span_context().trace_id, "032x") @@ -163,14 +194,21 @@ class SQLiteSpanProcessor(SpanProcessor): conn.commit() cursor.close() + except sqlite3.IntegrityError as e: + print(f"Integrity error exporting span: {e}") + print(f"Span ID: {span.get_span_context().span_id}") + conn.rollback() except Exception as e: - print(f"Error exporting span to SQLite: {e}") + print(f"Error exporting span: {e}") + conn.rollback() def shutdown(self): - """Cleanup any resources.""" - if self.conn: - self.conn.close() - self.conn = None + """Cleanup resources.""" + self._running = False + self._worker.join() + if hasattr(self._local, "conn"): + self._local.conn.close() + delattr(self._local, "conn") def force_flush(self, timeout_millis=30000): """Force export of spans."""