fix: Vector store persistence across server restarts (backport #3977) (#4225)

# What does this PR do?

This PR fixes a bug in LlamaStack 0.3.0 where vector stores created via
the OpenAI-compatible API (`POST /v1/vector_stores`) would fail with
`VectorStoreNotFoundError` after server restart when attempting
operations like `vector_io.insert()` or `vector_io.query()`.

The bug affected **6 vector IO providers**: `pgvector`, `sqlite_vec`,
`chroma`, `milvus`, `qdrant`, and `weaviate`.

Created with the assistance of: claude-4.5-sonnet

## Root Cause

All affected providers had a broken
`_get_and_cache_vector_store_index()` method that:
1. Did not load existing vector stores from persistent storage during
initialization
2. Attempted to use `vector_store_table` (which was either `None` or a
`KVStore` without the required `get_vector_store()` method)
3. Could not reload vector stores after server restart or cache miss

## Solution

This PR implements a consistent pattern across all 6 providers:

1. **Load vector stores during initialization** - Pre-populate the cache
from KV store on startup
2. **Fix lazy loading** - Modified `_get_and_cache_vector_store_index()`
to load directly from KV store instead of relying on
`vector_store_table`
3. **Remove broken dependency** - Eliminated reliance on the
`vector_store_table` pattern

## Testing steps

### 1.1 Configure the stack

Create or use an existing configuration with a vector IO provider.

**Example `run.yaml`:**

```yaml
vector_io_store:
  - provider_id: pgvector
    provider_type: remote::pgvector
    config:
      host: localhost
      port: 5432
      db: llamastack
      user: llamastack
      password: llamastack

inference:
  - provider_id: sentence-transformers
    provider_type: inline::sentence-transformers
    config:
      model: sentence-transformers/all-MiniLM-L6-v2
```

### 1.2 Start the server

```bash
llama stack run run.yaml --port 5000
```

Wait for the server to fully start. You should see:

```
INFO: Started server process
INFO: Application startup complete
```

---

## Step 2: Create a Vector Store

### 2.1 Create via API

```bash
curl -X POST http://localhost:5000/v1/vector_stores \
  -H "Content-Type: application/json" \
  -d '{
    "name": "test-persistence-store",
    "extra_body": {
      "embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
      "embedding_dimension": 384,
      "provider_id": "pgvector"
    }
  }' | jq
```

### 2.2 Expected Response

```json
{
  "id": "vs_a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
  "object": "vector_store",
  "name": "test-persistence-store",
  "status": "completed",
  "created_at": 1730304000,
  "file_counts": {
    "total": 0,
    "completed": 0,
    "in_progress": 0,
    "failed": 0,
    "cancelled": 0
  },
  "usage_bytes": 0
}
```

**Save the `id` field** (e.g.,
`vs_a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d`) — you’ll need it for the next
steps.

---

## Step 3: Insert Data (Before Restart)

### 3.1 Insert chunks into the vector store

```bash
export VS_ID="vs_a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d"

curl -X POST http://localhost:5000/vector-io/insert \
  -H "Content-Type: application/json" \
  -d "{
    \"vector_store_id\": \"$VS_ID\",
    \"chunks\": [
      {
        \"content\": \"Python is a high-level programming language known for its readability.\",
        \"metadata\": {\"source\": \"doc1\", \"page\": 1}
      },
      {
        \"content\": \"Machine learning enables computers to learn from data without explicit programming.\",
        \"metadata\": {\"source\": \"doc2\", \"page\": 1}
      },
      {
        \"content\": \"Neural networks are inspired by biological neurons in the brain.\",
        \"metadata\": {\"source\": \"doc3\", \"page\": 1}
      }
    ]
  }"
```

### 3.2 Expected Response

Status: **200 OK**  
Response: *Empty or success confirmation*

---

## Step 4: Query Data (Before Restart – Baseline)

### 4.1 Query the vector store

```bash
curl -X POST http://localhost:5000/vector-io/query \
  -H "Content-Type: application/json" \
  -d "{
    \"vector_store_id\": \"$VS_ID\",
    \"query\": \"What is machine learning?\"
  }" | jq
```

### 4.2 Expected Response

```json
{
  "chunks": [
    {
      "content": "Machine learning enables computers to learn from data without explicit programming.",
      "metadata": {"source": "doc2", "page": 1}
    },
    {
      "content": "Neural networks are inspired by biological neurons in the brain.",
      "metadata": {"source": "doc3", "page": 1}
    }
  ],
  "scores": [0.85, 0.72]
}
```

**Checkpoint:** Works correctly before restart.

---

## Step 5: Restart the Server (Critical Test)

### 5.1 Stop the server

In the terminal where it’s running:

```
Ctrl + C
```

Wait for:

```
Shutting down...
```

### 5.2 Restart the server

```bash
llama stack run run.yaml --port 5000
```

Wait for:

```
INFO: Started server process
INFO: Application startup complete
```

The vector store cache is now empty, but data should persist.

---

## Step 6: Verify Vector Store Exists (After Restart)

### 6.1 List vector stores

```bash
curl http://localhost:5000/v1/vector_stores | jq
```

### 6.2 Expected Response

```json
{
  "object": "list",
  "data": [
    {
      "id": "vs_a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
      "name": "test-persistence-store",
      "status": "completed"
    }
  ]
}
```

**Checkpoint:** Vector store should be listed.

---

## Step 7: Insert Data (After Restart – THE BUG TEST)

### 7.1 Insert new chunks

```bash
curl -X POST http://localhost:5000/vector-io/insert \
  -H "Content-Type: application/json" \
  -d "{
    \"vector_store_id\": \"$VS_ID\",
    \"chunks\": [
      {
        \"content\": \"This chunk was inserted AFTER the server restart.\",
        \"metadata\": {\"source\": \"post-restart\", \"test\": true}
      }
    ]
  }"
```

### 7.2 Expected Results

**With Fix (Correct):**
```
Status: 200 OK
Response: Success
```

 **Without Fix (Bug):**
```json
{
  "detail": "VectorStoreNotFoundError: Vector Store 'vs_a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d' not found."
}
```

 **Critical Test:** If insertion succeeds, the fix works.

---

## Step 8: Query Data (After Restart – Verification)

### 8.1 Query all data

```bash
curl -X POST http://localhost:5000/vector-io/query \
  -H "Content-Type: application/json" \
  -d "{
    \"vector_store_id\": \"$VS_ID\",
    \"query\": \"restart\"
  }" | jq
```

### 8.2 Expected Response

```json
{
  "chunks": [
    {
      "content": "This chunk was inserted AFTER the server restart.",
      "metadata": {"source": "post-restart", "test": true}
    }
  ],
  "scores": [0.95]
}
```

**Checkpoint:** Both old and new data are queryable.

---

## Step 9: Multiple Restart Test (Extra Verification)

### 9.1 Restart again

```bash
Ctrl + C
llama stack run run.yaml --port 5000
```

### 9.2 Query after restart

```bash
curl -X POST http://localhost:5000/vector-io/query \
  -H "Content-Type: application/json" \
  -d "{
    \"vector_store_id\": \"$VS_ID\",
    \"query\": \"programming\"
  }" | jq
```

**Expected:** Works correctly across multiple restarts.



<hr>This is an automatic backport of pull request #3977 done by
[Mergify](https://mergify.com).

Signed-off-by: Charlie Doern <cdoern@redhat.com>
Co-authored-by: Juan Pérez de Algaba <124347725+jperezdealgaba@users.noreply.github.com>
Co-authored-by: Francisco Arceo <arceofrancisco@gmail.com>
This commit is contained in:
mergify[bot] 2025-11-24 11:30:21 -08:00 committed by GitHub
parent f216eb99be
commit 46bd95e453
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 203 additions and 33 deletions

View file

@ -223,7 +223,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
@ -239,7 +240,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
return [i.vector_store for i in self.cache.values()]
async def unregister_vector_store(self, vector_store_id: str) -> None:
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.")
if vector_store_id not in self.cache:
return
@ -248,6 +250,27 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
del self.cache[vector_store_id]
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier),
inference_api=self.inference_api,
)
self.cache[vector_store_id] = index
return index
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = self.cache.get(vector_db_id)
if index is None:

View file

@ -412,6 +412,14 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro
return [v.vector_store for v in self.cache.values()]
async def register_vector_store(self, vector_store: VectorStore) -> None:
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
# Save to kvstore for persistence
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
# Create and cache the index
index = await SQLiteVecIndex.create(
vector_store.embedding_dimension, self.config.db_path, vector_store.identifier
)
@ -421,13 +429,16 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=SQLiteVecIndex(

View file

@ -131,7 +131,6 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence)
self.vector_store_table = self.kvstore
if isinstance(self.config, RemoteChromaVectorIOConfig):
log.info(f"Connecting to Chroma server at: {self.config.url}")
@ -190,9 +189,16 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
if vector_store_id in self.cache:
return self.cache[vector_store_id]
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack")
vector_store = VectorStore.model_validate_json(vector_store_data)
collection = await maybe_await(self.client.get_collection(vector_store_id))
if not collection:
raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")

View file

@ -328,13 +328,16 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore),

View file

@ -368,6 +368,22 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
log.exception("Could not connect to PGVector database server")
raise RuntimeError("Could not connect to PGVector database server") from e
# Load existing vector stores from KV store into cache
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_store_data in stored_vector_stores:
vector_store = VectorStore.model_validate_json(vector_store_data)
pgvector_index = PGVectorIndex(
vector_store=vector_store,
dimension=vector_store.embedding_dimension,
conn=self.conn,
kvstore=self.kvstore,
)
await pgvector_index.initialize()
index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api)
self.cache[vector_store.identifier] = index
async def shutdown(self) -> None:
if self.conn is not None:
self.conn.close()
@ -377,7 +393,13 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
async def register_vector_store(self, vector_store: VectorStore) -> None:
# Persist vector DB metadata in the KV store
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
# Save to kvstore for persistence
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
# Upsert model metadata in Postgres
upsert_models(self.conn, [(vector_store.identifier, vector_store)])
@ -396,7 +418,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
del self.cache[vector_store_id]
# Delete vector DB metadata from KV store
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.")
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
@ -413,13 +436,16 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn)
await index.initialize()
self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api)

View file

@ -183,7 +183,8 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
await super().shutdown()
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
@ -200,20 +201,24 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise ValueError(f"Vector DB not found {vector_store_id}")
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=QdrantIndex(client=self.client, collection_name=vector_store.identifier),

View file

@ -346,13 +346,16 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
if not client.collections.exists(sanitized_collection_name):

View file

@ -92,6 +92,99 @@ async def test_persistence_across_adapter_restarts(vector_io_adapter):
await vector_io_adapter.shutdown()
async def test_vector_store_lazy_loading_from_kvstore(vector_io_adapter):
"""
Test that vector stores can be lazy-loaded from KV store when not in cache.
Verifies that clearing the cache doesn't break vector store access - they
can be loaded on-demand from persistent storage.
"""
await vector_io_adapter.initialize()
vector_store_id = f"lazy_load_test_{np.random.randint(1e6)}"
vector_store = VectorStore(
identifier=vector_store_id,
provider_id="test_provider",
embedding_model="test_model",
embedding_dimension=128,
)
await vector_io_adapter.register_vector_store(vector_store)
assert vector_store_id in vector_io_adapter.cache
vector_io_adapter.cache.clear()
assert vector_store_id not in vector_io_adapter.cache
loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
assert loaded_index is not None
assert loaded_index.vector_store.identifier == vector_store_id
assert vector_store_id in vector_io_adapter.cache
cached_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
assert cached_index is loaded_index
await vector_io_adapter.shutdown()
async def test_vector_store_preloading_on_initialization(vector_io_adapter):
"""
Test that vector stores are preloaded from KV store during initialization.
Verifies that after restart, all vector stores are automatically loaded into
cache and immediately accessible without requiring lazy loading.
"""
await vector_io_adapter.initialize()
vector_store_ids = [f"preload_test_{i}_{np.random.randint(1e6)}" for i in range(3)]
for vs_id in vector_store_ids:
vector_store = VectorStore(
identifier=vs_id,
provider_id="test_provider",
embedding_model="test_model",
embedding_dimension=128,
)
await vector_io_adapter.register_vector_store(vector_store)
for vs_id in vector_store_ids:
assert vs_id in vector_io_adapter.cache
await vector_io_adapter.shutdown()
await vector_io_adapter.initialize()
for vs_id in vector_store_ids:
assert vs_id in vector_io_adapter.cache
for vs_id in vector_store_ids:
loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vs_id)
assert loaded_index is not None
assert loaded_index.vector_store.identifier == vs_id
await vector_io_adapter.shutdown()
async def test_kvstore_none_raises_runtime_error(vector_io_adapter):
"""
Test that accessing vector stores with uninitialized kvstore raises RuntimeError.
Verifies proper RuntimeError is raised instead of assertions when kvstore is None.
"""
await vector_io_adapter.initialize()
vector_store_id = f"kvstore_none_test_{np.random.randint(1e6)}"
vector_store = VectorStore(
identifier=vector_store_id,
provider_id="test_provider",
embedding_model="test_model",
embedding_dimension=128,
)
await vector_io_adapter.register_vector_store(vector_store)
vector_io_adapter.cache.clear()
vector_io_adapter.kvstore = None
with pytest.raises(RuntimeError, match="KVStore not initialized"):
await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
async def test_register_and_unregister_vector_store(vector_io_adapter):
unique_id = f"foo_db_{np.random.randint(1e6)}"
dummy = VectorStore(