mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-03 18:00:36 +00:00
Merge remote-tracking branch 'upstream/main' into models-dep
This commit is contained in:
commit
dac1ff1f57
9 changed files with 342 additions and 111 deletions
|
|
@ -10,7 +10,7 @@ import TabItem from '@theme/TabItem';
|
||||||
|
|
||||||
# Kubernetes Deployment Guide
|
# Kubernetes Deployment Guide
|
||||||
|
|
||||||
Deploy Llama Stack and vLLM servers in a Kubernetes cluster instead of running them locally. This guide covers both local development with Kind and production deployment on AWS EKS.
|
Deploy Llama Stack and vLLM servers in a Kubernetes cluster instead of running them locally. This guide covers deployment using the Kubernetes operator to manage the Llama Stack server with Kind. The vLLM inference server is deployed manually.
|
||||||
|
|
||||||
## Prerequisites
|
## Prerequisites
|
||||||
|
|
||||||
|
|
@ -110,115 +110,176 @@ spec:
|
||||||
EOF
|
EOF
|
||||||
```
|
```
|
||||||
|
|
||||||
### Step 3: Configure Llama Stack
|
### Step 3: Install Kubernetes Operator
|
||||||
|
|
||||||
Update your run configuration:
|
Install the Llama Stack Kubernetes operator to manage Llama Stack deployments:
|
||||||
|
|
||||||
```yaml
|
|
||||||
providers:
|
|
||||||
inference:
|
|
||||||
- provider_id: vllm
|
|
||||||
provider_type: remote::vllm
|
|
||||||
config:
|
|
||||||
url: http://vllm-server.default.svc.cluster.local:8000/v1
|
|
||||||
max_tokens: 4096
|
|
||||||
api_token: fake
|
|
||||||
```
|
|
||||||
|
|
||||||
Build container image:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
tmp_dir=$(mktemp -d) && cat >$tmp_dir/Containerfile.llama-stack-run-k8s <<EOF
|
# Install from the latest main branch
|
||||||
FROM distribution-myenv:dev
|
kubectl apply -f https://raw.githubusercontent.com/llamastack/llama-stack-k8s-operator/main/release/operator.yaml
|
||||||
RUN apt-get update && apt-get install -y git
|
|
||||||
RUN git clone https://github.com/meta-llama/llama-stack.git /app/llama-stack-source
|
# Or install a specific version (e.g., v0.4.0)
|
||||||
ADD ./vllm-llama-stack-run-k8s.yaml /app/config.yaml
|
# kubectl apply -f https://raw.githubusercontent.com/llamastack/llama-stack-k8s-operator/v0.4.0/release/operator.yaml
|
||||||
EOF
|
|
||||||
podman build -f $tmp_dir/Containerfile.llama-stack-run-k8s -t llama-stack-run-k8s $tmp_dir
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Step 4: Deploy Llama Stack Server
|
Verify the operator is running:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
kubectl get pods -n llama-stack-operator-system
|
||||||
|
```
|
||||||
|
|
||||||
|
For more information about the operator, see the [llama-stack-k8s-operator repository](https://github.com/llamastack/llama-stack-k8s-operator).
|
||||||
|
|
||||||
|
### Step 4: Deploy Llama Stack Server using Operator
|
||||||
|
|
||||||
|
Create a `LlamaStackDistribution` custom resource to deploy the Llama Stack server. The operator will automatically create the necessary Deployment, Service, and other resources.
|
||||||
|
You can optionally override the default `run.yaml` using `spec.server.userConfig` with a ConfigMap (see [userConfig spec](https://github.com/llamastack/llama-stack-k8s-operator/blob/main/docs/api-overview.md#userconfigspec)).
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
cat <<EOF | kubectl apply -f -
|
cat <<EOF | kubectl apply -f -
|
||||||
apiVersion: v1
|
apiVersion: llamastack.io/v1alpha1
|
||||||
kind: PersistentVolumeClaim
|
kind: LlamaStackDistribution
|
||||||
metadata:
|
metadata:
|
||||||
name: llama-pvc
|
name: llamastack-vllm
|
||||||
spec:
|
|
||||||
accessModes:
|
|
||||||
- ReadWriteOnce
|
|
||||||
resources:
|
|
||||||
requests:
|
|
||||||
storage: 1Gi
|
|
||||||
---
|
|
||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
name: llama-stack-server
|
|
||||||
spec:
|
spec:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
selector:
|
server:
|
||||||
matchLabels:
|
distribution:
|
||||||
app.kubernetes.io/name: llama-stack
|
name: starter
|
||||||
template:
|
containerSpec:
|
||||||
metadata:
|
port: 8321
|
||||||
labels:
|
env:
|
||||||
app.kubernetes.io/name: llama-stack
|
- name: VLLM_URL
|
||||||
spec:
|
value: "http://vllm-server.default.svc.cluster.local:8000/v1"
|
||||||
containers:
|
- name: VLLM_MAX_TOKENS
|
||||||
- name: llama-stack
|
value: "4096"
|
||||||
image: localhost/llama-stack-run-k8s:latest
|
- name: VLLM_API_TOKEN
|
||||||
imagePullPolicy: IfNotPresent
|
value: "fake"
|
||||||
command: ["llama", "stack", "run", "/app/config.yaml"]
|
# Optional: override run.yaml from a ConfigMap using userConfig
|
||||||
ports:
|
userConfig:
|
||||||
- containerPort: 5000
|
configMap:
|
||||||
volumeMounts:
|
name: llama-stack-config
|
||||||
- name: llama-storage
|
storage:
|
||||||
mountPath: /root/.llama
|
size: "20Gi"
|
||||||
volumes:
|
mountPath: "/home/lls/.lls"
|
||||||
- name: llama-storage
|
|
||||||
persistentVolumeClaim:
|
|
||||||
claimName: llama-pvc
|
|
||||||
---
|
|
||||||
apiVersion: v1
|
|
||||||
kind: Service
|
|
||||||
metadata:
|
|
||||||
name: llama-stack-service
|
|
||||||
spec:
|
|
||||||
selector:
|
|
||||||
app.kubernetes.io/name: llama-stack
|
|
||||||
ports:
|
|
||||||
- protocol: TCP
|
|
||||||
port: 5000
|
|
||||||
targetPort: 5000
|
|
||||||
type: ClusterIP
|
|
||||||
EOF
|
EOF
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**Configuration Options:**
|
||||||
|
|
||||||
|
- `replicas`: Number of Llama Stack server instances to run
|
||||||
|
- `server.distribution.name`: The distribution to use (e.g., `starter` for the starter distribution). See the [list of supported distributions](https://github.com/llamastack/llama-stack-k8s-operator/blob/main/distributions.json) in the operator repository.
|
||||||
|
- `server.distribution.image`: (Optional) Custom container image for non-supported distributions. Use this field when deploying a distribution that is not in the supported list. If specified, this takes precedence over `name`.
|
||||||
|
- `server.containerSpec.port`: Port on which the Llama Stack server listens (default: 8321)
|
||||||
|
- `server.containerSpec.env`: Environment variables to configure providers:
|
||||||
|
- `server.userConfig`: (Optional) Override the default `run.yaml` using a ConfigMap. See [userConfig spec](https://github.com/llamastack/llama-stack-k8s-operator/blob/main/docs/api-overview.md#userconfigspec).
|
||||||
|
- `server.storage.size`: Size of the persistent volume for model and data storage
|
||||||
|
- `server.storage.mountPath`: Where to mount the storage in the container
|
||||||
|
|
||||||
|
**Note:** For a complete list of supported distributions, see [distributions.json](https://github.com/llamastack/llama-stack-k8s-operator/blob/main/distributions.json) in the operator repository. To use a custom or non-supported distribution, set the `server.distribution.image` field with your container image instead of `server.distribution.name`.
|
||||||
|
|
||||||
|
The operator automatically creates:
|
||||||
|
- A Deployment for the Llama Stack server
|
||||||
|
- A Service to access the server
|
||||||
|
- A PersistentVolumeClaim for storage
|
||||||
|
- All necessary RBAC resources
|
||||||
|
|
||||||
|
|
||||||
|
Check the status of your deployment:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
kubectl get llamastackdistribution
|
||||||
|
kubectl describe llamastackdistribution llamastack-vllm
|
||||||
|
```
|
||||||
|
|
||||||
### Step 5: Test Deployment
|
### Step 5: Test Deployment
|
||||||
|
|
||||||
|
Wait for the Llama Stack server pod to be ready:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Port forward and test
|
# Check the status of the LlamaStackDistribution
|
||||||
kubectl port-forward service/llama-stack-service 5000:5000
|
kubectl get llamastackdistribution llamastack-vllm
|
||||||
llama-stack-client --endpoint http://localhost:5000 inference chat-completion --message "hello, what model are you?"
|
|
||||||
|
# Check the pods created by the operator
|
||||||
|
kubectl get pods -l app.kubernetes.io/name=llama-stack
|
||||||
|
|
||||||
|
# Wait for the pod to be ready
|
||||||
|
kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=llama-stack --timeout=300s
|
||||||
|
```
|
||||||
|
|
||||||
|
Get the service name created by the operator (it typically follows the pattern `<llamastackdistribution-name>-service`):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# List services to find the service name
|
||||||
|
kubectl get services | grep llamastack
|
||||||
|
|
||||||
|
# Port forward and test (replace SERVICE_NAME with the actual service name)
|
||||||
|
kubectl port-forward service/llamastack-vllm-service 8321:8321
|
||||||
|
```
|
||||||
|
|
||||||
|
In another terminal, test the deployment:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
llama-stack-client --endpoint http://localhost:8321 inference chat-completion --message "hello, what model are you?"
|
||||||
```
|
```
|
||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
|
|
||||||
**Check pod status:**
|
### vLLM Server Issues
|
||||||
|
|
||||||
|
**Check vLLM pod status:**
|
||||||
```bash
|
```bash
|
||||||
kubectl get pods -l app.kubernetes.io/name=vllm
|
kubectl get pods -l app.kubernetes.io/name=vllm
|
||||||
kubectl logs -l app.kubernetes.io/name=vllm
|
kubectl logs -l app.kubernetes.io/name=vllm
|
||||||
```
|
```
|
||||||
|
|
||||||
**Test service connectivity:**
|
**Test vLLM service connectivity:**
|
||||||
```bash
|
```bash
|
||||||
kubectl run -it --rm debug --image=curlimages/curl --restart=Never -- curl http://vllm-server:8000/v1/models
|
kubectl run -it --rm debug --image=curlimages/curl --restart=Never -- curl http://vllm-server:8000/v1/models
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Llama Stack Server Issues
|
||||||
|
|
||||||
|
**Check LlamaStackDistribution status:**
|
||||||
|
```bash
|
||||||
|
# Get detailed status
|
||||||
|
kubectl describe llamastackdistribution llamastack-vllm
|
||||||
|
|
||||||
|
# Check for events
|
||||||
|
kubectl get events --sort-by='.lastTimestamp' | grep llamastack-vllm
|
||||||
|
```
|
||||||
|
|
||||||
|
**Check operator-managed pods:**
|
||||||
|
```bash
|
||||||
|
# List all pods managed by the operator
|
||||||
|
kubectl get pods -l app.kubernetes.io/name=llama-stack
|
||||||
|
|
||||||
|
# Check pod logs (replace POD_NAME with actual pod name)
|
||||||
|
kubectl logs -l app.kubernetes.io/name=llama-stack
|
||||||
|
```
|
||||||
|
|
||||||
|
**Check operator status:**
|
||||||
|
```bash
|
||||||
|
# Verify the operator is running
|
||||||
|
kubectl get pods -n llama-stack-operator-system
|
||||||
|
|
||||||
|
# Check operator logs if issues persist
|
||||||
|
kubectl logs -n llama-stack-operator-system -l control-plane=controller-manager
|
||||||
|
```
|
||||||
|
|
||||||
|
**Verify service connectivity:**
|
||||||
|
```bash
|
||||||
|
# Get the service endpoint
|
||||||
|
kubectl get svc llamastack-vllm-service
|
||||||
|
|
||||||
|
# Test connectivity from within the cluster
|
||||||
|
kubectl run -it --rm debug --image=curlimages/curl --restart=Never -- curl http://llamastack-vllm-service:8321/health
|
||||||
|
```
|
||||||
|
|
||||||
## Related Resources
|
## Related Resources
|
||||||
|
|
||||||
- **[Deployment Overview](/docs/deploying/)** - Overview of deployment options
|
- **[Deployment Overview](/docs/deploying/)** - Overview of deployment options
|
||||||
- **[Distributions](/docs/distributions)** - Understanding Llama Stack distributions
|
- **[Distributions](/docs/distributions)** - Understanding Llama Stack distributions
|
||||||
- **[Configuration](/docs/distributions/configuration)** - Detailed configuration options
|
- **[Configuration](/docs/distributions/configuration)** - Detailed configuration options
|
||||||
|
- **[LlamaStack Operator](https://github.com/llamastack/llama-stack-k8s-operator)** - Overview of llama-stack kubernetes operator
|
||||||
|
- **[LlamaStackDistribution](https://github.com/llamastack/llama-stack-k8s-operator/blob/main/docs/api-overview.md)** - API Spec of the llama-stack operator Custom Resource.
|
||||||
|
|
|
||||||
|
|
@ -223,7 +223,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
|
||||||
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
|
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
|
||||||
|
|
||||||
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
||||||
assert self.kvstore is not None
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
|
||||||
|
|
||||||
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
|
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
|
||||||
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
|
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
|
||||||
|
|
@ -239,7 +240,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
|
||||||
return [i.vector_store for i in self.cache.values()]
|
return [i.vector_store for i in self.cache.values()]
|
||||||
|
|
||||||
async def unregister_vector_store(self, vector_store_id: str) -> None:
|
async def unregister_vector_store(self, vector_store_id: str) -> None:
|
||||||
assert self.kvstore is not None
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.")
|
||||||
|
|
||||||
if vector_store_id not in self.cache:
|
if vector_store_id not in self.cache:
|
||||||
return
|
return
|
||||||
|
|
@ -248,6 +250,27 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
|
||||||
del self.cache[vector_store_id]
|
del self.cache[vector_store_id]
|
||||||
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
|
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
|
||||||
|
|
||||||
|
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
|
||||||
|
if vector_store_id in self.cache:
|
||||||
|
return self.cache[vector_store_id]
|
||||||
|
|
||||||
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
|
|
||||||
|
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
|
||||||
|
vector_store_data = await self.kvstore.get(key)
|
||||||
|
if not vector_store_data:
|
||||||
|
raise VectorStoreNotFoundError(vector_store_id)
|
||||||
|
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
|
index = VectorStoreWithIndex(
|
||||||
|
vector_store=vector_store,
|
||||||
|
index=await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier),
|
||||||
|
inference_api=self.inference_api,
|
||||||
|
)
|
||||||
|
self.cache[vector_store_id] = index
|
||||||
|
return index
|
||||||
|
|
||||||
async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
|
async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
|
||||||
index = self.cache.get(vector_store_id)
|
index = self.cache.get(vector_store_id)
|
||||||
if index is None:
|
if index is None:
|
||||||
|
|
|
||||||
|
|
@ -412,6 +412,14 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro
|
||||||
return [v.vector_store for v in self.cache.values()]
|
return [v.vector_store for v in self.cache.values()]
|
||||||
|
|
||||||
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
||||||
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
|
||||||
|
|
||||||
|
# Save to kvstore for persistence
|
||||||
|
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
|
||||||
|
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
|
||||||
|
|
||||||
|
# Create and cache the index
|
||||||
index = await SQLiteVecIndex.create(
|
index = await SQLiteVecIndex.create(
|
||||||
vector_store.embedding_dimension, self.config.db_path, vector_store.identifier
|
vector_store.embedding_dimension, self.config.db_path, vector_store.identifier
|
||||||
)
|
)
|
||||||
|
|
@ -421,13 +429,16 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro
|
||||||
if vector_store_id in self.cache:
|
if vector_store_id in self.cache:
|
||||||
return self.cache[vector_store_id]
|
return self.cache[vector_store_id]
|
||||||
|
|
||||||
if self.vector_store_table is None:
|
# Try to load from kvstore
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
vector_store = self.vector_store_table.get_vector_store(vector_store_id)
|
|
||||||
if not vector_store:
|
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
|
||||||
|
vector_store_data = await self.kvstore.get(key)
|
||||||
|
if not vector_store_data:
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
raise VectorStoreNotFoundError(vector_store_id)
|
||||||
|
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
index = VectorStoreWithIndex(
|
index = VectorStoreWithIndex(
|
||||||
vector_store=vector_store,
|
vector_store=vector_store,
|
||||||
index=SQLiteVecIndex(
|
index=SQLiteVecIndex(
|
||||||
|
|
|
||||||
|
|
@ -131,7 +131,6 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
|
||||||
|
|
||||||
async def initialize(self) -> None:
|
async def initialize(self) -> None:
|
||||||
self.kvstore = await kvstore_impl(self.config.persistence)
|
self.kvstore = await kvstore_impl(self.config.persistence)
|
||||||
self.vector_store_table = self.kvstore
|
|
||||||
|
|
||||||
if isinstance(self.config, RemoteChromaVectorIOConfig):
|
if isinstance(self.config, RemoteChromaVectorIOConfig):
|
||||||
log.info(f"Connecting to Chroma server at: {self.config.url}")
|
log.info(f"Connecting to Chroma server at: {self.config.url}")
|
||||||
|
|
@ -190,9 +189,16 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
|
||||||
if vector_store_id in self.cache:
|
if vector_store_id in self.cache:
|
||||||
return self.cache[vector_store_id]
|
return self.cache[vector_store_id]
|
||||||
|
|
||||||
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
|
# Try to load from kvstore
|
||||||
if not vector_store:
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
|
|
||||||
|
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
|
||||||
|
vector_store_data = await self.kvstore.get(key)
|
||||||
|
if not vector_store_data:
|
||||||
raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack")
|
raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack")
|
||||||
|
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
collection = await maybe_await(self.client.get_collection(vector_store_id))
|
collection = await maybe_await(self.client.get_collection(vector_store_id))
|
||||||
if not collection:
|
if not collection:
|
||||||
raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")
|
raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")
|
||||||
|
|
|
||||||
|
|
@ -328,13 +328,16 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
|
||||||
if vector_store_id in self.cache:
|
if vector_store_id in self.cache:
|
||||||
return self.cache[vector_store_id]
|
return self.cache[vector_store_id]
|
||||||
|
|
||||||
if self.vector_store_table is None:
|
# Try to load from kvstore
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
|
|
||||||
if not vector_store:
|
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
|
||||||
|
vector_store_data = await self.kvstore.get(key)
|
||||||
|
if not vector_store_data:
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
raise VectorStoreNotFoundError(vector_store_id)
|
||||||
|
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
index = VectorStoreWithIndex(
|
index = VectorStoreWithIndex(
|
||||||
vector_store=vector_store,
|
vector_store=vector_store,
|
||||||
index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore),
|
index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore),
|
||||||
|
|
|
||||||
|
|
@ -368,6 +368,22 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
|
||||||
log.exception("Could not connect to PGVector database server")
|
log.exception("Could not connect to PGVector database server")
|
||||||
raise RuntimeError("Could not connect to PGVector database server") from e
|
raise RuntimeError("Could not connect to PGVector database server") from e
|
||||||
|
|
||||||
|
# Load existing vector stores from KV store into cache
|
||||||
|
start_key = VECTOR_DBS_PREFIX
|
||||||
|
end_key = f"{VECTOR_DBS_PREFIX}\xff"
|
||||||
|
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
|
||||||
|
for vector_store_data in stored_vector_stores:
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
|
pgvector_index = PGVectorIndex(
|
||||||
|
vector_store=vector_store,
|
||||||
|
dimension=vector_store.embedding_dimension,
|
||||||
|
conn=self.conn,
|
||||||
|
kvstore=self.kvstore,
|
||||||
|
)
|
||||||
|
await pgvector_index.initialize()
|
||||||
|
index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api)
|
||||||
|
self.cache[vector_store.identifier] = index
|
||||||
|
|
||||||
async def shutdown(self) -> None:
|
async def shutdown(self) -> None:
|
||||||
if self.conn is not None:
|
if self.conn is not None:
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
|
|
@ -377,7 +393,13 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
|
||||||
|
|
||||||
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
||||||
# Persist vector DB metadata in the KV store
|
# Persist vector DB metadata in the KV store
|
||||||
assert self.kvstore is not None
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
|
||||||
|
|
||||||
|
# Save to kvstore for persistence
|
||||||
|
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
|
||||||
|
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
|
||||||
|
|
||||||
# Upsert model metadata in Postgres
|
# Upsert model metadata in Postgres
|
||||||
upsert_models(self.conn, [(vector_store.identifier, vector_store)])
|
upsert_models(self.conn, [(vector_store.identifier, vector_store)])
|
||||||
|
|
||||||
|
|
@ -396,7 +418,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
|
||||||
del self.cache[vector_store_id]
|
del self.cache[vector_store_id]
|
||||||
|
|
||||||
# Delete vector DB metadata from KV store
|
# Delete vector DB metadata from KV store
|
||||||
assert self.kvstore is not None
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.")
|
||||||
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}")
|
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}")
|
||||||
|
|
||||||
async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
|
async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
|
||||||
|
|
@ -413,13 +436,16 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
|
||||||
if vector_store_id in self.cache:
|
if vector_store_id in self.cache:
|
||||||
return self.cache[vector_store_id]
|
return self.cache[vector_store_id]
|
||||||
|
|
||||||
if self.vector_store_table is None:
|
# Try to load from kvstore
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
|
|
||||||
if not vector_store:
|
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
|
||||||
|
vector_store_data = await self.kvstore.get(key)
|
||||||
|
if not vector_store_data:
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
raise VectorStoreNotFoundError(vector_store_id)
|
||||||
|
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn)
|
index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn)
|
||||||
await index.initialize()
|
await index.initialize()
|
||||||
self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api)
|
self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api)
|
||||||
|
|
|
||||||
|
|
@ -183,7 +183,8 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
|
||||||
await super().shutdown()
|
await super().shutdown()
|
||||||
|
|
||||||
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
async def register_vector_store(self, vector_store: VectorStore) -> None:
|
||||||
assert self.kvstore is not None
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
|
||||||
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
|
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
|
||||||
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
|
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
|
||||||
|
|
||||||
|
|
@ -200,20 +201,24 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
|
||||||
await self.cache[vector_store_id].index.delete()
|
await self.cache[vector_store_id].index.delete()
|
||||||
del self.cache[vector_store_id]
|
del self.cache[vector_store_id]
|
||||||
|
|
||||||
assert self.kvstore is not None
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
|
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
|
||||||
|
|
||||||
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
|
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
|
||||||
if vector_store_id in self.cache:
|
if vector_store_id in self.cache:
|
||||||
return self.cache[vector_store_id]
|
return self.cache[vector_store_id]
|
||||||
|
|
||||||
if self.vector_store_table is None:
|
# Try to load from kvstore
|
||||||
raise ValueError(f"Vector DB not found {vector_store_id}")
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
|
|
||||||
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
|
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
|
||||||
if not vector_store:
|
vector_store_data = await self.kvstore.get(key)
|
||||||
|
if not vector_store_data:
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
raise VectorStoreNotFoundError(vector_store_id)
|
||||||
|
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
index = VectorStoreWithIndex(
|
index = VectorStoreWithIndex(
|
||||||
vector_store=vector_store,
|
vector_store=vector_store,
|
||||||
index=QdrantIndex(client=self.client, collection_name=vector_store.identifier),
|
index=QdrantIndex(client=self.client, collection_name=vector_store.identifier),
|
||||||
|
|
|
||||||
|
|
@ -346,13 +346,16 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
|
||||||
if vector_store_id in self.cache:
|
if vector_store_id in self.cache:
|
||||||
return self.cache[vector_store_id]
|
return self.cache[vector_store_id]
|
||||||
|
|
||||||
if self.vector_store_table is None:
|
# Try to load from kvstore
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
if self.kvstore is None:
|
||||||
|
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
|
||||||
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
|
|
||||||
if not vector_store:
|
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
|
||||||
|
vector_store_data = await self.kvstore.get(key)
|
||||||
|
if not vector_store_data:
|
||||||
raise VectorStoreNotFoundError(vector_store_id)
|
raise VectorStoreNotFoundError(vector_store_id)
|
||||||
|
|
||||||
|
vector_store = VectorStore.model_validate_json(vector_store_data)
|
||||||
client = self._get_client()
|
client = self._get_client()
|
||||||
sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
|
sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
|
||||||
if not client.collections.exists(sanitized_collection_name):
|
if not client.collections.exists(sanitized_collection_name):
|
||||||
|
|
|
||||||
|
|
@ -92,6 +92,99 @@ async def test_persistence_across_adapter_restarts(vector_io_adapter):
|
||||||
await vector_io_adapter.shutdown()
|
await vector_io_adapter.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_vector_store_lazy_loading_from_kvstore(vector_io_adapter):
|
||||||
|
"""
|
||||||
|
Test that vector stores can be lazy-loaded from KV store when not in cache.
|
||||||
|
|
||||||
|
Verifies that clearing the cache doesn't break vector store access - they
|
||||||
|
can be loaded on-demand from persistent storage.
|
||||||
|
"""
|
||||||
|
await vector_io_adapter.initialize()
|
||||||
|
|
||||||
|
vector_store_id = f"lazy_load_test_{np.random.randint(1e6)}"
|
||||||
|
vector_store = VectorStore(
|
||||||
|
identifier=vector_store_id,
|
||||||
|
provider_id="test_provider",
|
||||||
|
embedding_model="test_model",
|
||||||
|
embedding_dimension=128,
|
||||||
|
)
|
||||||
|
await vector_io_adapter.register_vector_store(vector_store)
|
||||||
|
assert vector_store_id in vector_io_adapter.cache
|
||||||
|
|
||||||
|
vector_io_adapter.cache.clear()
|
||||||
|
assert vector_store_id not in vector_io_adapter.cache
|
||||||
|
|
||||||
|
loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
|
||||||
|
assert loaded_index is not None
|
||||||
|
assert loaded_index.vector_store.identifier == vector_store_id
|
||||||
|
assert vector_store_id in vector_io_adapter.cache
|
||||||
|
|
||||||
|
cached_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
|
||||||
|
assert cached_index is loaded_index
|
||||||
|
|
||||||
|
await vector_io_adapter.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_vector_store_preloading_on_initialization(vector_io_adapter):
|
||||||
|
"""
|
||||||
|
Test that vector stores are preloaded from KV store during initialization.
|
||||||
|
|
||||||
|
Verifies that after restart, all vector stores are automatically loaded into
|
||||||
|
cache and immediately accessible without requiring lazy loading.
|
||||||
|
"""
|
||||||
|
await vector_io_adapter.initialize()
|
||||||
|
|
||||||
|
vector_store_ids = [f"preload_test_{i}_{np.random.randint(1e6)}" for i in range(3)]
|
||||||
|
for vs_id in vector_store_ids:
|
||||||
|
vector_store = VectorStore(
|
||||||
|
identifier=vs_id,
|
||||||
|
provider_id="test_provider",
|
||||||
|
embedding_model="test_model",
|
||||||
|
embedding_dimension=128,
|
||||||
|
)
|
||||||
|
await vector_io_adapter.register_vector_store(vector_store)
|
||||||
|
|
||||||
|
for vs_id in vector_store_ids:
|
||||||
|
assert vs_id in vector_io_adapter.cache
|
||||||
|
|
||||||
|
await vector_io_adapter.shutdown()
|
||||||
|
await vector_io_adapter.initialize()
|
||||||
|
|
||||||
|
for vs_id in vector_store_ids:
|
||||||
|
assert vs_id in vector_io_adapter.cache
|
||||||
|
|
||||||
|
for vs_id in vector_store_ids:
|
||||||
|
loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vs_id)
|
||||||
|
assert loaded_index is not None
|
||||||
|
assert loaded_index.vector_store.identifier == vs_id
|
||||||
|
|
||||||
|
await vector_io_adapter.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_kvstore_none_raises_runtime_error(vector_io_adapter):
|
||||||
|
"""
|
||||||
|
Test that accessing vector stores with uninitialized kvstore raises RuntimeError.
|
||||||
|
|
||||||
|
Verifies proper RuntimeError is raised instead of assertions when kvstore is None.
|
||||||
|
"""
|
||||||
|
await vector_io_adapter.initialize()
|
||||||
|
|
||||||
|
vector_store_id = f"kvstore_none_test_{np.random.randint(1e6)}"
|
||||||
|
vector_store = VectorStore(
|
||||||
|
identifier=vector_store_id,
|
||||||
|
provider_id="test_provider",
|
||||||
|
embedding_model="test_model",
|
||||||
|
embedding_dimension=128,
|
||||||
|
)
|
||||||
|
await vector_io_adapter.register_vector_store(vector_store)
|
||||||
|
|
||||||
|
vector_io_adapter.cache.clear()
|
||||||
|
vector_io_adapter.kvstore = None
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="KVStore not initialized"):
|
||||||
|
await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
|
||||||
|
|
||||||
|
|
||||||
async def test_register_and_unregister_vector_store(vector_io_adapter):
|
async def test_register_and_unregister_vector_store(vector_io_adapter):
|
||||||
unique_id = f"foo_db_{np.random.randint(1e6)}"
|
unique_id = f"foo_db_{np.random.randint(1e6)}"
|
||||||
dummy = VectorStore(
|
dummy = VectorStore(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue