fix sqlite span processor

This commit is contained in:
Dinesh Yeduguru 2025-01-13 14:25:10 -08:00
parent 314806cde3
commit d26e96a02e

View file

@ -6,7 +6,9 @@
import json import json
import os import os
import queue
import sqlite3 import sqlite3
import threading
from datetime import datetime from datetime import datetime
from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace import SpanProcessor
@ -17,14 +19,40 @@ class SQLiteSpanProcessor(SpanProcessor):
def __init__(self, conn_string): def __init__(self, conn_string):
"""Initialize the SQLite span processor with a connection string.""" """Initialize the SQLite span processor with a connection string."""
self.conn_string = conn_string self.conn_string = conn_string
self.conn = None self._local = threading.local()
self._queue = queue.Queue()
self._worker = threading.Thread(target=self._process_queue, daemon=True)
self._running = True
self.setup_database() self.setup_database()
self._worker.start()
def _get_connection(self) -> sqlite3.Connection: def _get_connection(self) -> sqlite3.Connection:
"""Get the database connection.""" """Get a thread-local database connection."""
if self.conn is None: if not hasattr(self._local, "conn"):
self.conn = sqlite3.connect(self.conn_string, check_same_thread=False) self._local.conn = sqlite3.connect(
return self.conn self.conn_string,
timeout=60.0, # Increase timeout for busy waiting
isolation_level="IMMEDIATE", # This helps prevent "database is locked" errors
)
return self._local.conn
def _process_queue(self):
"""Worker thread to process spans from the queue."""
conn = None
try:
conn = sqlite3.connect(self.conn_string)
while self._running or not self._queue.empty():
try:
span = self._queue.get(timeout=1.0)
self._export_span(span, conn)
self._queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Error processing span: {e}")
finally:
if conn:
conn.close()
def setup_database(self): def setup_database(self):
"""Create the necessary tables if they don't exist.""" """Create the necessary tables if they don't exist."""
@ -90,9 +118,12 @@ class SQLiteSpanProcessor(SpanProcessor):
pass pass
def on_end(self, span: Span): def on_end(self, span: Span):
"""Called when a span ends. Export the span data to SQLite.""" """Queue the span for processing instead of processing directly."""
self._queue.put(span)
def _export_span(self, span: Span, conn: sqlite3.Connection):
"""Export the span to SQLite using the provided connection."""
try: try:
conn = self._get_connection()
cursor = conn.cursor() cursor = conn.cursor()
trace_id = format(span.get_span_context().trace_id, "032x") trace_id = format(span.get_span_context().trace_id, "032x")
@ -163,14 +194,21 @@ class SQLiteSpanProcessor(SpanProcessor):
conn.commit() conn.commit()
cursor.close() cursor.close()
except sqlite3.IntegrityError as e:
print(f"Integrity error exporting span: {e}")
print(f"Span ID: {span.get_span_context().span_id}")
conn.rollback()
except Exception as e: except Exception as e:
print(f"Error exporting span to SQLite: {e}") print(f"Error exporting span: {e}")
conn.rollback()
def shutdown(self): def shutdown(self):
"""Cleanup any resources.""" """Cleanup resources."""
if self.conn: self._running = False
self.conn.close() self._worker.join()
self.conn = None if hasattr(self._local, "conn"):
self._local.conn.close()
delattr(self._local, "conn")
def force_flush(self, timeout_millis=30000): def force_flush(self, timeout_millis=30000):
"""Force export of spans.""" """Force export of spans."""