fix: sqlite conn (#1282)

# Summary:
Our tests sometimes error out with
```
========================== 11 passed, 342 warnings in 58.86s ==========================
Error exporting span to SQLite: Cannot operate on a closed database.
Fatal Python error: _enter_buffered_busy: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=0x000000012af04280)

Current thread 0x00000001fa29c240 (most recent call first):
  <no Python frame>
```
Usually able to repro this by running 10 times.

The proposed fix is to use threadsafe var for creating sqlite connection
to ensure connection is only used by one thread. Not 100% if this is the
fix, but am not able to repro with this.

# Test Plan:
Run 10 times and saw no more errors
```
for i in {1..10}; do
  echo "=== Starting Run $i ==="
  LLAMA_STACK_CONFIG=fireworks pytest -s -v tests/client-sdk/agents/test_agents.py --safety-shield meta-llama/Llama-Guard-3-8B
  if [[ $? -ne 0 ]]; then
    echo "=== Run $i FAILED with exit code $? ==="
    break
  else
    echo "=== Run $i PASSED ==="
  fi
  echo
done
```
This commit is contained in:
ehhuang 2025-02-26 14:44:31 -08:00 committed by GitHub
parent c8a20b8ed0
commit 270d64007a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -7,6 +7,7 @@
import json import json
import os import os
import sqlite3 import sqlite3
import threading
from datetime import datetime from datetime import datetime
from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace import SpanProcessor
@ -17,14 +18,18 @@ class SQLiteSpanProcessor(SpanProcessor):
def __init__(self, conn_string): def __init__(self, conn_string):
"""Initialize the SQLite span processor with a connection string.""" """Initialize the SQLite span processor with a connection string."""
self.conn_string = conn_string self.conn_string = conn_string
self.conn = None self._local = threading.local() # Thread-local storage for connections
self.setup_database() self.setup_database()
def _get_connection(self) -> sqlite3.Connection: def _get_connection(self):
"""Get the database connection.""" """Get a thread-local database connection."""
if self.conn is None: if not hasattr(self._local, "conn"):
self.conn = sqlite3.connect(self.conn_string, check_same_thread=False) try:
return self.conn self._local.conn = sqlite3.connect(self.conn_string)
except Exception as e:
print(f"Error connecting to SQLite database: {e}")
raise e
return self._local.conn
def setup_database(self): def setup_database(self):
"""Create the necessary tables if they don't exist.""" """Create the necessary tables if they don't exist."""
@ -168,9 +173,14 @@ class SQLiteSpanProcessor(SpanProcessor):
def shutdown(self): def shutdown(self):
"""Cleanup any resources.""" """Cleanup any resources."""
if self.conn: # We can't access other threads' connections, so we just close our own
self.conn.close() if hasattr(self._local, "conn"):
self.conn = None try:
self._local.conn.close()
except Exception as e:
print(f"Error closing SQLite connection: {e}")
finally:
del self._local.conn
def force_flush(self, timeout_millis=30000): def force_flush(self, timeout_millis=30000):
"""Force export of spans.""" """Force export of spans."""