[memory refactor][5/n] Migrate all vector_io providers (#835)

See https://github.com/meta-llama/llama-stack/issues/827 for the broader
design.

This PR finishes off all the stragglers and migrates everything to the
new naming.
This commit is contained in:
Ashwin Bharambe 2025-01-22 10:17:59 -08:00 committed by GitHub
parent 63f37f9b7c
commit c9e5578151
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
78 changed files with 504 additions and 623 deletions

View file

@ -12,21 +12,16 @@ from numpy.typing import NDArray
from psycopg2 import sql
from psycopg2.extras import execute_values, Json
from pydantic import BaseModel, parse_obj_as
from pydantic import BaseModel, TypeAdapter
from llama_stack.apis.inference import InterleavedContent
from llama_stack.apis.memory import (
Chunk,
Memory,
MemoryBankDocument,
QueryDocumentsResponse,
)
from llama_stack.apis.memory_banks import MemoryBank, MemoryBankType, VectorMemoryBank
from llama_stack.providers.datatypes import Api, MemoryBanksProtocolPrivate
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.utils.memory.vector_store import (
BankWithIndex,
EmbeddingIndex,
VectorDBWithIndex,
)
from .config import PGVectorConfig
@ -50,20 +45,20 @@ def upsert_models(cur, keys_models: List[Tuple[str, BaseModel]]):
"""
)
values = [(key, Json(model.dict())) for key, model in keys_models]
values = [(key, Json(model.model_dump())) for key, model in keys_models]
execute_values(cur, query, values, template="(%s, %s)")
def load_models(cur, cls):
cur.execute("SELECT key, data FROM metadata_store")
rows = cur.fetchall()
return [parse_obj_as(cls, row["data"]) for row in rows]
return [TypeAdapter(cls).validate_python(row["data"]) for row in rows]
class PGVectorIndex(EmbeddingIndex):
def __init__(self, bank: VectorMemoryBank, dimension: int, cursor):
def __init__(self, vector_db: VectorDB, dimension: int, cursor):
self.cursor = cursor
self.table_name = f"vector_store_{bank.identifier}"
self.table_name = f"vector_store_{vector_db.identifier}"
self.cursor.execute(
f"""
@ -85,7 +80,7 @@ class PGVectorIndex(EmbeddingIndex):
values.append(
(
f"{chunk.document_id}:chunk-{i}",
Json(chunk.dict()),
Json(chunk.model_dump()),
embeddings[i].tolist(),
)
)
@ -101,7 +96,7 @@ class PGVectorIndex(EmbeddingIndex):
async def query(
self, embedding: NDArray, k: int, score_threshold: float
) -> QueryDocumentsResponse:
) -> QueryChunksResponse:
self.cursor.execute(
f"""
SELECT document, embedding <-> %s::vector AS distance
@ -119,13 +114,13 @@ class PGVectorIndex(EmbeddingIndex):
chunks.append(Chunk(**doc))
scores.append(1.0 / float(dist))
return QueryDocumentsResponse(chunks=chunks, scores=scores)
return QueryChunksResponse(chunks=chunks, scores=scores)
async def delete(self):
self.cursor.execute(f"DROP TABLE IF EXISTS {self.table_name}")
class PGVectorMemoryAdapter(Memory, MemoryBanksProtocolPrivate):
class PGVectorVectorDBAdapter(VectorIO, VectorDBsProtocolPrivate):
def __init__(self, config: PGVectorConfig, inference_api: Api.inference) -> None:
self.config = config
self.inference_api = inference_api
@ -167,46 +162,45 @@ class PGVectorMemoryAdapter(Memory, MemoryBanksProtocolPrivate):
async def shutdown(self) -> None:
pass
async def register_memory_bank(self, memory_bank: MemoryBank) -> None:
assert (
memory_bank.memory_bank_type == MemoryBankType.vector.value
), f"Only vector banks are supported {memory_bank.memory_bank_type}"
async def register_vector_db(self, vector_db: VectorDB) -> None:
upsert_models(self.cursor, [(vector_db.identifier, vector_db)])
upsert_models(self.cursor, [(memory_bank.identifier, memory_bank)])
index = PGVectorIndex(memory_bank, memory_bank.embedding_dimension, self.cursor)
self.cache[memory_bank.identifier] = BankWithIndex(
memory_bank, index, self.inference_api
index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.cursor)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db, index, self.inference_api
)
async def unregister_memory_bank(self, memory_bank_id: str) -> None:
await self.cache[memory_bank_id].index.delete()
del self.cache[memory_bank_id]
async def unregister_vector_db(self, vector_db_id: str) -> None:
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
async def insert_documents(
async def insert_chunks(
self,
bank_id: str,
documents: List[MemoryBankDocument],
vector_db_id: str,
chunks: List[Chunk],
ttl_seconds: Optional[int] = None,
) -> None:
index = await self._get_and_cache_bank_index(bank_id)
await index.insert_documents(documents)
index = await self._get_and_cache_vector_db_index(vector_db_id)
await index.insert_chunks(chunks)
async def query_documents(
async def query_chunks(
self,
bank_id: str,
vector_db_id: str,
query: InterleavedContent,
params: Optional[Dict[str, Any]] = None,
) -> QueryDocumentsResponse:
index = await self._get_and_cache_bank_index(bank_id)
return await index.query_documents(query, params)
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
return await index.query_chunks(query, params)
self.inference_api = inference_api
async def _get_and_cache_vector_db_index(
self, vector_db_id: str
) -> VectorDBWithIndex:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def _get_and_cache_bank_index(self, bank_id: str) -> BankWithIndex:
if bank_id in self.cache:
return self.cache[bank_id]
bank = await self.memory_bank_store.get_memory_bank(bank_id)
index = PGVectorIndex(bank, bank.embedding_dimension, self.cursor)
self.cache[bank_id] = BankWithIndex(bank, index, self.inference_api)
return self.cache[bank_id]
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.cursor)
self.cache[vector_db_id] = VectorDBWithIndex(
vector_db, index, self.inference_api
)
return self.cache[vector_db_id]