remove threading logic from span processor

This commit is contained in:
Dinesh Yeduguru 2024-12-10 17:26:25 -08:00
parent 84904914c2
commit 17679fe49f

View file

@ -7,34 +7,24 @@
import json import json
import os import os
import sqlite3 import sqlite3
import threading from datetime import datetime
from datetime import datetime, timedelta
from typing import Dict
from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace import Span from opentelemetry.trace import Span
class SQLiteSpanProcessor(SpanProcessor): class SQLiteSpanProcessor(SpanProcessor):
def __init__(self, conn_string, ttl_days=30): def __init__(self, conn_string):
"""Initialize the SQLite span processor with a connection string.""" """Initialize the SQLite span processor with a connection string."""
self.conn_string = conn_string self.conn_string = conn_string
self.ttl_days = ttl_days self.conn = None
self._shutdown_event = threading.Event()
self.cleanup_task = None
self._thread_local = threading.local()
self._connections: Dict[int, sqlite3.Connection] = {}
self._lock = threading.Lock()
self.setup_database() self.setup_database()
def _get_connection(self) -> sqlite3.Connection: def _get_connection(self) -> sqlite3.Connection:
"""Get a thread-specific database connection.""" """Get the database connection."""
thread_id = threading.get_ident() if self.conn is None:
with self._lock: self.conn = sqlite3.connect(self.conn_string, check_same_thread=False)
if thread_id not in self._connections: return self.conn
conn = sqlite3.connect(self.conn_string)
self._connections[thread_id] = conn
return self._connections[thread_id]
def setup_database(self): def setup_database(self):
"""Create the necessary tables if they don't exist.""" """Create the necessary tables if they don't exist."""
@ -95,61 +85,6 @@ class SQLiteSpanProcessor(SpanProcessor):
conn.commit() conn.commit()
cursor.close() cursor.close()
# Start periodic cleanup in a separate thread
self.cleanup_task = threading.Thread(target=self._periodic_cleanup, daemon=True)
self.cleanup_task.start()
def _cleanup_old_data(self):
"""Delete records older than TTL."""
try:
conn = self._get_connection()
cutoff_date = (datetime.now() - timedelta(days=self.ttl_days)).isoformat()
cursor = conn.cursor()
# Delete old span events
cursor.execute(
"""
DELETE FROM span_events
WHERE span_id IN (
SELECT span_id FROM spans
WHERE trace_id IN (
SELECT trace_id FROM traces
WHERE created_at < ?
)
)
""",
(cutoff_date,),
)
# Delete old spans
cursor.execute(
"""
DELETE FROM spans
WHERE trace_id IN (
SELECT trace_id FROM traces
WHERE created_at < ?
)
""",
(cutoff_date,),
)
# Delete old traces
cursor.execute("DELETE FROM traces WHERE created_at < ?", (cutoff_date,))
conn.commit()
cursor.close()
except Exception as e:
print(f"Error during cleanup: {e}")
def _periodic_cleanup(self):
"""Run cleanup periodically."""
import time
while not self._shutdown_event.is_set():
time.sleep(3600) # Sleep for 1 hour
if not self._shutdown_event.is_set():
self._cleanup_old_data()
def on_start(self, span: Span, parent_context=None): def on_start(self, span: Span, parent_context=None):
"""Called when a span starts.""" """Called when a span starts."""
pass pass
@ -233,23 +168,9 @@ class SQLiteSpanProcessor(SpanProcessor):
def shutdown(self): def shutdown(self):
"""Cleanup any resources.""" """Cleanup any resources."""
self._shutdown_event.set() if self.conn:
self.conn.close()
# Wait for cleanup thread to finish if it exists self.conn = None
if self.cleanup_task and self.cleanup_task.is_alive():
self.cleanup_task.join(timeout=5.0)
current_thread_id = threading.get_ident()
with self._lock:
# Close all connections from the current thread
for thread_id, conn in list(self._connections.items()):
if thread_id == current_thread_id:
try:
if conn:
conn.close()
del self._connections[thread_id]
except sqlite3.Error:
pass # Ignore errors during shutdown
def force_flush(self, timeout_millis=30000): def force_flush(self, timeout_millis=30000):
"""Force export of spans.""" """Force export of spans."""