diff --git a/llama_stack/distribution/datatypes.py b/llama_stack/distribution/datatypes.py index 2a565c93c..90b269452 100644 --- a/llama_stack/distribution/datatypes.py +++ b/llama_stack/distribution/datatypes.py @@ -37,8 +37,8 @@ RoutingKey = str | list[str] class RegistryEntrySource(StrEnum): - default = "default" - provider = "provider" + via_register_api = "via_register_api" + listed_from_provider = "listed_from_provider" class User(BaseModel): @@ -55,7 +55,7 @@ class ResourceWithOwner(Resource): resource. This can be used to constrain access to the resource.""" owner: User | None = None - source: RegistryEntrySource = RegistryEntrySource.default + source: RegistryEntrySource = RegistryEntrySource.via_register_api # Use the extended Resource for all routable objects diff --git a/llama_stack/distribution/routing_tables/common.py b/llama_stack/distribution/routing_tables/common.py index 421e4162b..caf0780fd 100644 --- a/llama_stack/distribution/routing_tables/common.py +++ b/llama_stack/distribution/routing_tables/common.py @@ -117,6 +117,9 @@ class CommonRoutingTableImpl(RoutingTable): for p in self.impls_by_provider_id.values(): await p.shutdown() + async def refresh(self) -> None: + pass + async def get_provider_impl(self, routing_key: str, provider_id: str | None = None) -> Any: from .benchmarks import BenchmarksRoutingTable from .datasets import DatasetsRoutingTable diff --git a/llama_stack/distribution/routing_tables/models.py b/llama_stack/distribution/routing_tables/models.py index 7bbc1697f..022c3dd40 100644 --- a/llama_stack/distribution/routing_tables/models.py +++ b/llama_stack/distribution/routing_tables/models.py @@ -4,7 +4,6 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import asyncio import time from typing import Any @@ -22,53 +21,24 @@ logger = get_logger(name=__name__, category="core") class ModelsRoutingTable(CommonRoutingTableImpl, Models): listed_providers: set[str] = set() - model_refresh_interval_seconds: int = 300 - _refresh_task: asyncio.Task | None = None - async def initialize(self) -> None: - await super().initialize() - task = asyncio.create_task(self._refresh_models()) - self._refresh_task = task + async def refresh(self) -> None: + for provider_id, provider in self.impls_by_provider_id.items(): + refresh = await provider.should_refresh_models() + if not (refresh or provider_id in self.listed_providers): + continue - def cb(task): - import traceback + try: + models = await provider.list_models() + except Exception as e: + logger.exception(f"Model refresh failed for provider {provider_id}: {e}") + continue - if task.cancelled(): - logger.error("Model refresh task cancelled") - elif task.exception(): - logger.error(f"Model refresh task failed: {task.exception()}") - traceback.print_exception(task.exception()) - else: - logger.debug("Model refresh task completed") + self.listed_providers.add(provider_id) + if models is None: + continue - task.add_done_callback(cb) - - async def shutdown(self) -> None: - await super().shutdown() - if self._refresh_task: - self._refresh_task.cancel() - self._refresh_task = None - - async def _refresh_models(self) -> None: - while True: - for provider_id, provider in self.impls_by_provider_id.items(): - refresh = await provider.should_refresh_models() - if not (refresh or provider_id in self.listed_providers): - continue - - try: - models = await provider.list_models() - except Exception as e: - logger.exception(f"Model refresh failed for provider {provider_id}: {e}") - continue - - self.listed_providers.add(provider_id) - if models is None: - continue - - await self.update_registered_models(provider_id, models) - - await asyncio.sleep(self.model_refresh_interval_seconds) + await self.update_registered_models(provider_id, models) async def list_models(self) -> ListModelsResponse: return ListModelsResponse(data=await self.get_all_with_type("model")) @@ -132,7 +102,7 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models): provider_id=provider_id, metadata=metadata, model_type=model_type, - source=RegistryEntrySource.default, + source=RegistryEntrySource.via_register_api, ) registered_model = await self.register_object(model) return registered_model @@ -156,7 +126,7 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models): for model in existing_models: if model.provider_id != provider_id: continue - if model.source == RegistryEntrySource.default: + if model.source == RegistryEntrySource.via_register_api: model_ids[model.provider_resource_id] = model.identifier continue @@ -176,6 +146,6 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models): provider_id=provider_id, metadata=model.metadata, model_type=model.model_type, - source=RegistryEntrySource.provider, + source=RegistryEntrySource.listed_from_provider, ) ) diff --git a/llama_stack/distribution/stack.py b/llama_stack/distribution/stack.py index d7270156a..57bc4cd5f 100644 --- a/llama_stack/distribution/stack.py +++ b/llama_stack/distribution/stack.py @@ -4,6 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import asyncio import importlib.resources import os import re @@ -38,6 +39,7 @@ from llama_stack.distribution.distribution import get_provider_registry from llama_stack.distribution.inspect import DistributionInspectConfig, DistributionInspectImpl from llama_stack.distribution.providers import ProviderImpl, ProviderImplConfig from llama_stack.distribution.resolver import ProviderRegistry, resolve_impls +from llama_stack.distribution.routing_tables.common import CommonRoutingTableImpl from llama_stack.distribution.store.registry import create_dist_registry from llama_stack.distribution.utils.dynamic import instantiate_class_type from llama_stack.log import get_logger @@ -90,6 +92,9 @@ RESOURCES = [ ] +REGISTRY_REFRESH_INTERVAL_SECONDS = 300 + + async def register_resources(run_config: StackRunConfig, impls: dict[Api, Any]): for rsrc, api, register_method, list_method in RESOURCES: objects = getattr(run_config, rsrc) @@ -324,9 +329,33 @@ async def construct_stack( add_internal_implementations(impls, run_config) await register_resources(run_config, impls) + + task = asyncio.create_task(refresh_registry(impls)) + + def cb(task): + import traceback + + if task.cancelled(): + logger.error("Model refresh task cancelled") + elif task.exception(): + logger.error(f"Model refresh task failed: {task.exception()}") + traceback.print_exception(task.exception()) + else: + logger.debug("Model refresh task completed") + + task.add_done_callback(cb) return impls +async def refresh_registry(impls: dict[Api, Any]): + routing_tables = [v for v in impls.values() if isinstance(v, CommonRoutingTableImpl)] + while True: + for routing_table in routing_tables: + await routing_table.refresh() + + await asyncio.sleep(REGISTRY_REFRESH_INTERVAL_SECONDS) + + def get_stack_run_config_from_template(template: str) -> StackRunConfig: template_path = importlib.resources.files("llama_stack") / f"templates/{template}/run.yaml" diff --git a/llama_stack/providers/remote/inference/ollama/ollama.py b/llama_stack/providers/remote/inference/ollama/ollama.py index ee0049b2e..ba20185d3 100644 --- a/llama_stack/providers/remote/inference/ollama/ollama.py +++ b/llama_stack/providers/remote/inference/ollama/ollama.py @@ -5,6 +5,7 @@ # the root directory of this source tree. +import asyncio import base64 import uuid from collections.abc import AsyncGenerator, AsyncIterator @@ -97,14 +98,16 @@ class OllamaInferenceAdapter( def __init__(self, config: OllamaImplConfig) -> None: self.register_helper = ModelRegistryHelper(MODEL_ENTRIES) self.config = config - self._client = None + self._clients: dict[asyncio.AbstractEventLoop, AsyncClient] = {} self._openai_client = None @property def client(self) -> AsyncClient: - if self._client is None: - self._client = AsyncClient(host=self.config.url) - return self._client + # ollama client attaches itself to the current event loop (sadly?) + loop = asyncio.get_running_loop() + if loop not in self._clients: + self._clients[loop] = AsyncClient(host=self.config.url) + return self._clients[loop] @property def openai_client(self) -> AsyncOpenAI: @@ -191,8 +194,7 @@ class OllamaInferenceAdapter( return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}") async def shutdown(self) -> None: - self._client = None - self._openai_client = None + self._clients.clear() async def unregister_model(self, model_id: str) -> None: pass diff --git a/tests/unit/distribution/routers/test_routing_tables.py b/tests/unit/distribution/routers/test_routing_tables.py index fd1a7462f..c1b57cb4f 100644 --- a/tests/unit/distribution/routers/test_routing_tables.py +++ b/tests/unit/distribution/routers/test_routing_tables.py @@ -416,7 +416,7 @@ async def test_models_source_tracking_default(cached_disk_dist_registry): models = await table.list_models() assert len(models.data) == 1 model = models.data[0] - assert model.source == RegistryEntrySource.default + assert model.source == RegistryEntrySource.via_register_api assert model.identifier == "test_provider/user-model" # Cleanup @@ -452,7 +452,7 @@ async def test_models_source_tracking_provider(cached_disk_dist_registry): # All models should have provider source for model in models.data: - assert model.source == RegistryEntrySource.provider + assert model.source == RegistryEntrySource.listed_from_provider assert model.provider_id == "test_provider" # Cleanup @@ -473,7 +473,7 @@ async def test_models_source_interaction_preserves_default(cached_disk_dist_regi models = await table.list_models() assert len(models.data) == 1 user_model = models.data[0] - assert user_model.source == RegistryEntrySource.default + assert user_model.source == RegistryEntrySource.via_register_api assert user_model.identifier == "my-custom-alias" assert user_model.provider_resource_id == "provider-model-1" @@ -505,11 +505,11 @@ async def test_models_source_interaction_preserves_default(cached_disk_dist_regi provider_model = next((m for m in models.data if m.identifier == "different-model"), None) assert user_model is not None - assert user_model.source == RegistryEntrySource.default + assert user_model.source == RegistryEntrySource.via_register_api assert user_model.provider_resource_id == "provider-model-1" assert provider_model is not None - assert provider_model.source == RegistryEntrySource.provider + assert provider_model.source == RegistryEntrySource.listed_from_provider assert provider_model.provider_resource_id == "different-model" # Cleanup @@ -565,8 +565,8 @@ async def test_models_source_interaction_cleanup_provider_models(cached_disk_dis user_model = next((m for m in models.data if m.identifier == "test_provider/user-model"), None) provider_model = next((m for m in models.data if m.identifier == "provider-model-new"), None) - assert user_model.source == RegistryEntrySource.default - assert provider_model.source == RegistryEntrySource.provider + assert user_model.source == RegistryEntrySource.via_register_api + assert provider_model.source == RegistryEntrySource.listed_from_provider # Cleanup await table.shutdown()