fix: Use pool_pre_ping=True in SQLAlchemy engine creation (#3208)

# What does this PR do?

We noticed that when llama-stack is running for a long time, we would
run into database errors when trying to run messages through the agent
(which we configured to persist against postgres), seemingly due to the
database connections being stale or disconnected. This commit adds
`pool_pre_ping=True` to the SQLAlchemy engine creation to help mitigate
this issue by checking the connection before using it, and
re-establishing it if necessary.

More information in:


https://docs.sqlalchemy.org/en/20/core/pooling.html#dealing-with-disconnects

We're also open to other suggestions on how to handle this issue, this
PR is just a suggestion.

## Test Plan

We have not tested it yet (we're in the process of doing that) and we're
hoping it's going to resolve our issue.
This commit is contained in:
Omer Tuchfeld 2025-08-20 22:52:05 +02:00 committed by GitHub
parent e195ee3091
commit 00a67da449
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -22,6 +22,7 @@ from sqlalchemy import (
text, text,
) )
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.ext.asyncio.engine import AsyncEngine
from llama_stack.apis.common.responses import PaginatedResponse from llama_stack.apis.common.responses import PaginatedResponse
from llama_stack.log import get_logger from llama_stack.log import get_logger
@ -45,9 +46,12 @@ TYPE_MAPPING: dict[ColumnType, Any] = {
class SqlAlchemySqlStoreImpl(SqlStore): class SqlAlchemySqlStoreImpl(SqlStore):
def __init__(self, config: SqlAlchemySqlStoreConfig): def __init__(self, config: SqlAlchemySqlStoreConfig):
self.config = config self.config = config
self.async_session = async_sessionmaker(create_async_engine(config.engine_str)) self.async_session = async_sessionmaker(self.create_engine())
self.metadata = MetaData() self.metadata = MetaData()
def create_engine(self) -> AsyncEngine:
return create_async_engine(self.config.engine_str, pool_pre_ping=True)
async def create_table( async def create_table(
self, self,
table: str, table: str,
@ -83,7 +87,7 @@ class SqlAlchemySqlStoreImpl(SqlStore):
else: else:
sqlalchemy_table = self.metadata.tables[table] sqlalchemy_table = self.metadata.tables[table]
engine = create_async_engine(self.config.engine_str) engine = self.create_engine()
async with engine.begin() as conn: async with engine.begin() as conn:
await conn.run_sync(self.metadata.create_all, tables=[sqlalchemy_table], checkfirst=True) await conn.run_sync(self.metadata.create_all, tables=[sqlalchemy_table], checkfirst=True)
@ -241,7 +245,7 @@ class SqlAlchemySqlStoreImpl(SqlStore):
nullable: bool = True, nullable: bool = True,
) -> None: ) -> None:
"""Add a column to an existing table if the column doesn't already exist.""" """Add a column to an existing table if the column doesn't already exist."""
engine = create_async_engine(self.config.engine_str) engine = self.create_engine()
try: try:
async with engine.begin() as conn: async with engine.begin() as conn: