forked from phoenix-oss/llama-stack-mirror
# What does this PR do? The chroma provider maintains a cache but does not sync up with chroma on a cold start. this change adds a fallback to read from chroma on a cache miss. ## Test Plan ```bash #start stack llama stack run /Users/dineshyv/.llama/distributions/llamastack-together/together-run.yaml # Add documents PYTHONPATH=. python -m examples.agents.rag_with_memory_bank localhost 5000 No available shields. Disable safety. Using model: Llama3.1-8B-Instruct Created session_id=b951b14f-a9d2-43a3-8b80-d80114d58322 for Agent(0687a251-6906-4081-8d4c-f52e19db9dd7) memory_retrieval> Retrieved context from banks: ['test_bank']. ==== Here are the retrieved documents for relevant context: === START-RETRIEVED-CONTEXT === id:num-1; content:_ the template from Llama2 to better support multiturn conversations. The same text in the Lla... > inference> Based on the retrieved documentation, the top 5 topics that were explained are: ............... # Kill stack # Bootup stack llama stack run /Users/dineshyv/.llama/distributions/llamastack-together/together-run.yaml # Run a RAG app with just the agent flow. it discovers the previously added documents No available shields. Disable safety. Using model: Llama3.1-8B-Instruct Created session_id=7a30c1a7-c87e-4787-936c-d0306589fe5d for Agent(b30420f3-c928-498a-887b-d084f0f3806c) memory_retrieval> Retrieved context from banks: ['test_bank']. ==== Here are the retrieved documents for relevant context: === START-RETRIEVED-CONTEXT === id:num-1; content:_ the template from Llama2 to better support multiturn conversations. The same text in the Lla... > inference> Based on the provided documentation, the top 5 topics that were explained are: ..... ```
176 lines
5.7 KiB
Python
176 lines
5.7 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
import json
|
|
from typing import List
|
|
from urllib.parse import urlparse
|
|
|
|
import chromadb
|
|
from numpy.typing import NDArray
|
|
|
|
from pydantic import parse_obj_as
|
|
|
|
from llama_stack.apis.memory import * # noqa: F403
|
|
|
|
from llama_stack.providers.datatypes import MemoryBanksProtocolPrivate
|
|
from llama_stack.providers.utils.memory.vector_store import (
|
|
BankWithIndex,
|
|
EmbeddingIndex,
|
|
)
|
|
|
|
|
|
class ChromaIndex(EmbeddingIndex):
|
|
def __init__(self, client: chromadb.AsyncHttpClient, collection):
|
|
self.client = client
|
|
self.collection = collection
|
|
|
|
async def add_chunks(self, chunks: List[Chunk], embeddings: NDArray):
|
|
assert len(chunks) == len(
|
|
embeddings
|
|
), f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
|
|
|
|
await self.collection.add(
|
|
documents=[chunk.json() for chunk in chunks],
|
|
embeddings=embeddings,
|
|
ids=[f"{c.document_id}:chunk-{i}" for i, c in enumerate(chunks)],
|
|
)
|
|
|
|
async def query(
|
|
self, embedding: NDArray, k: int, score_threshold: float
|
|
) -> QueryDocumentsResponse:
|
|
results = await self.collection.query(
|
|
query_embeddings=[embedding.tolist()],
|
|
n_results=k,
|
|
include=["documents", "distances"],
|
|
)
|
|
distances = results["distances"][0]
|
|
documents = results["documents"][0]
|
|
|
|
chunks = []
|
|
scores = []
|
|
for dist, doc in zip(distances, documents):
|
|
try:
|
|
doc = json.loads(doc)
|
|
chunk = Chunk(**doc)
|
|
except Exception:
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
print(f"Failed to parse document: {doc}")
|
|
continue
|
|
|
|
chunks.append(chunk)
|
|
scores.append(1.0 / float(dist))
|
|
|
|
return QueryDocumentsResponse(chunks=chunks, scores=scores)
|
|
|
|
async def delete(self):
|
|
await self.client.delete_collection(self.collection.name)
|
|
|
|
|
|
class ChromaMemoryAdapter(Memory, MemoryBanksProtocolPrivate):
|
|
def __init__(self, url: str) -> None:
|
|
print(f"Initializing ChromaMemoryAdapter with url: {url}")
|
|
url = url.rstrip("/")
|
|
parsed = urlparse(url)
|
|
|
|
if parsed.path and parsed.path != "/":
|
|
raise ValueError("URL should not contain a path")
|
|
|
|
self.host = parsed.hostname
|
|
self.port = parsed.port
|
|
|
|
self.client = None
|
|
self.cache = {}
|
|
|
|
async def initialize(self) -> None:
|
|
try:
|
|
print(f"Connecting to Chroma server at: {self.host}:{self.port}")
|
|
self.client = await chromadb.AsyncHttpClient(host=self.host, port=self.port)
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise RuntimeError("Could not connect to Chroma server") from e
|
|
|
|
async def shutdown(self) -> None:
|
|
pass
|
|
|
|
async def register_memory_bank(
|
|
self,
|
|
memory_bank: MemoryBank,
|
|
) -> None:
|
|
assert (
|
|
memory_bank.memory_bank_type == MemoryBankType.vector.value
|
|
), f"Only vector banks are supported {memory_bank.memory_bank_type}"
|
|
|
|
collection = await self.client.get_or_create_collection(
|
|
name=memory_bank.identifier,
|
|
metadata={"bank": memory_bank.json()},
|
|
)
|
|
bank_index = BankWithIndex(
|
|
bank=memory_bank, index=ChromaIndex(self.client, collection)
|
|
)
|
|
self.cache[memory_bank.identifier] = bank_index
|
|
|
|
async def list_memory_banks(self) -> List[MemoryBank]:
|
|
collections = await self.client.list_collections()
|
|
for collection in collections:
|
|
try:
|
|
data = json.loads(collection.metadata["bank"])
|
|
bank = parse_obj_as(VectorMemoryBank, data)
|
|
except Exception:
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
print(f"Failed to parse bank: {collection.metadata}")
|
|
continue
|
|
|
|
index = BankWithIndex(
|
|
bank=bank,
|
|
index=ChromaIndex(self.client, collection),
|
|
)
|
|
self.cache[bank.identifier] = index
|
|
|
|
return [i.bank for i in self.cache.values()]
|
|
|
|
async def unregister_memory_bank(self, memory_bank_id: str) -> None:
|
|
await self.cache[memory_bank_id].index.delete()
|
|
del self.cache[memory_bank_id]
|
|
|
|
async def insert_documents(
|
|
self,
|
|
bank_id: str,
|
|
documents: List[MemoryBankDocument],
|
|
ttl_seconds: Optional[int] = None,
|
|
) -> None:
|
|
index = await self._get_and_cache_bank_index(bank_id)
|
|
|
|
await index.insert_documents(documents)
|
|
|
|
async def query_documents(
|
|
self,
|
|
bank_id: str,
|
|
query: InterleavedTextMedia,
|
|
params: Optional[Dict[str, Any]] = None,
|
|
) -> QueryDocumentsResponse:
|
|
index = await self._get_and_cache_bank_index(bank_id)
|
|
|
|
return await index.query_documents(query, params)
|
|
|
|
async def _get_and_cache_bank_index(self, bank_id: str) -> BankWithIndex:
|
|
if bank_id in self.cache:
|
|
return self.cache[bank_id]
|
|
|
|
bank = await self.memory_bank_store.get_memory_bank(bank_id)
|
|
if not bank:
|
|
raise ValueError(f"Bank {bank_id} not found in Llama Stack")
|
|
collection = await self.client.get_collection(bank_id)
|
|
if not collection:
|
|
raise ValueError(f"Bank {bank_id} not found in Chroma")
|
|
index = BankWithIndex(bank=bank, index=ChromaIndex(self.client, collection))
|
|
self.cache[bank_id] = index
|
|
return index
|