From 270d64007aa6d13ecc8e63149b5a489018c9d031 Mon Sep 17 00:00:00 2001 From: ehhuang Date: Wed, 26 Feb 2025 14:44:31 -0800 Subject: [PATCH] fix: sqlite conn (#1282) # Summary: Our tests sometimes error out with ``` ========================== 11 passed, 342 warnings in 58.86s ========================== Error exporting span to SQLite: Cannot operate on a closed database. Fatal Python error: _enter_buffered_busy: could not acquire lock for <_io.BufferedWriter name=''> at interpreter shutdown, possibly due to daemon threads Python runtime state: finalizing (tstate=0x000000012af04280) Current thread 0x00000001fa29c240 (most recent call first): ``` Usually able to repro this by running 10 times. The proposed fix is to use threadsafe var for creating sqlite connection to ensure connection is only used by one thread. Not 100% if this is the fix, but am not able to repro with this. # Test Plan: Run 10 times and saw no more errors ``` for i in {1..10}; do echo "=== Starting Run $i ===" LLAMA_STACK_CONFIG=fireworks pytest -s -v tests/client-sdk/agents/test_agents.py --safety-shield meta-llama/Llama-Guard-3-8B if [[ $? -ne 0 ]]; then echo "=== Run $i FAILED with exit code $? ===" break else echo "=== Run $i PASSED ===" fi echo done ``` --- .../meta_reference/sqlite_span_processor.py | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py index 3455c2236..168808bf8 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py @@ -7,6 +7,7 @@ import json import os import sqlite3 +import threading from datetime import datetime from opentelemetry.sdk.trace import SpanProcessor @@ -17,14 +18,18 @@ class SQLiteSpanProcessor(SpanProcessor): def __init__(self, conn_string): """Initialize the SQLite span processor with a connection string.""" self.conn_string = conn_string - self.conn = None + self._local = threading.local() # Thread-local storage for connections self.setup_database() - def _get_connection(self) -> sqlite3.Connection: - """Get the database connection.""" - if self.conn is None: - self.conn = sqlite3.connect(self.conn_string, check_same_thread=False) - return self.conn + def _get_connection(self): + """Get a thread-local database connection.""" + if not hasattr(self._local, "conn"): + try: + self._local.conn = sqlite3.connect(self.conn_string) + except Exception as e: + print(f"Error connecting to SQLite database: {e}") + raise e + return self._local.conn def setup_database(self): """Create the necessary tables if they don't exist.""" @@ -168,9 +173,14 @@ class SQLiteSpanProcessor(SpanProcessor): def shutdown(self): """Cleanup any resources.""" - if self.conn: - self.conn.close() - self.conn = None + # We can't access other threads' connections, so we just close our own + if hasattr(self._local, "conn"): + try: + self._local.conn.close() + except Exception as e: + print(f"Error closing SQLite connection: {e}") + finally: + del self._local.conn def force_flush(self, timeout_millis=30000): """Force export of spans."""