mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-07-27 06:28:50 +00:00
make refreshing happen for all routing tables, naming changes, ollama fixes
This commit is contained in:
parent
487e073378
commit
0fe110d94a
6 changed files with 67 additions and 63 deletions
|
@ -37,8 +37,8 @@ RoutingKey = str | list[str]
|
||||||
|
|
||||||
|
|
||||||
class RegistryEntrySource(StrEnum):
|
class RegistryEntrySource(StrEnum):
|
||||||
default = "default"
|
via_register_api = "via_register_api"
|
||||||
provider = "provider"
|
listed_from_provider = "listed_from_provider"
|
||||||
|
|
||||||
|
|
||||||
class User(BaseModel):
|
class User(BaseModel):
|
||||||
|
@ -55,7 +55,7 @@ class ResourceWithOwner(Resource):
|
||||||
resource. This can be used to constrain access to the resource."""
|
resource. This can be used to constrain access to the resource."""
|
||||||
|
|
||||||
owner: User | None = None
|
owner: User | None = None
|
||||||
source: RegistryEntrySource = RegistryEntrySource.default
|
source: RegistryEntrySource = RegistryEntrySource.via_register_api
|
||||||
|
|
||||||
|
|
||||||
# Use the extended Resource for all routable objects
|
# Use the extended Resource for all routable objects
|
||||||
|
|
|
@ -117,6 +117,9 @@ class CommonRoutingTableImpl(RoutingTable):
|
||||||
for p in self.impls_by_provider_id.values():
|
for p in self.impls_by_provider_id.values():
|
||||||
await p.shutdown()
|
await p.shutdown()
|
||||||
|
|
||||||
|
async def refresh(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
async def get_provider_impl(self, routing_key: str, provider_id: str | None = None) -> Any:
|
async def get_provider_impl(self, routing_key: str, provider_id: str | None = None) -> Any:
|
||||||
from .benchmarks import BenchmarksRoutingTable
|
from .benchmarks import BenchmarksRoutingTable
|
||||||
from .datasets import DatasetsRoutingTable
|
from .datasets import DatasetsRoutingTable
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
# This source code is licensed under the terms described in the LICENSE file in
|
||||||
# the root directory of this source tree.
|
# the root directory of this source tree.
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
@ -22,35 +21,8 @@ logger = get_logger(name=__name__, category="core")
|
||||||
|
|
||||||
class ModelsRoutingTable(CommonRoutingTableImpl, Models):
|
class ModelsRoutingTable(CommonRoutingTableImpl, Models):
|
||||||
listed_providers: set[str] = set()
|
listed_providers: set[str] = set()
|
||||||
model_refresh_interval_seconds: int = 300
|
|
||||||
_refresh_task: asyncio.Task | None = None
|
|
||||||
|
|
||||||
async def initialize(self) -> None:
|
async def refresh(self) -> None:
|
||||||
await super().initialize()
|
|
||||||
task = asyncio.create_task(self._refresh_models())
|
|
||||||
self._refresh_task = task
|
|
||||||
|
|
||||||
def cb(task):
|
|
||||||
import traceback
|
|
||||||
|
|
||||||
if task.cancelled():
|
|
||||||
logger.error("Model refresh task cancelled")
|
|
||||||
elif task.exception():
|
|
||||||
logger.error(f"Model refresh task failed: {task.exception()}")
|
|
||||||
traceback.print_exception(task.exception())
|
|
||||||
else:
|
|
||||||
logger.debug("Model refresh task completed")
|
|
||||||
|
|
||||||
task.add_done_callback(cb)
|
|
||||||
|
|
||||||
async def shutdown(self) -> None:
|
|
||||||
await super().shutdown()
|
|
||||||
if self._refresh_task:
|
|
||||||
self._refresh_task.cancel()
|
|
||||||
self._refresh_task = None
|
|
||||||
|
|
||||||
async def _refresh_models(self) -> None:
|
|
||||||
while True:
|
|
||||||
for provider_id, provider in self.impls_by_provider_id.items():
|
for provider_id, provider in self.impls_by_provider_id.items():
|
||||||
refresh = await provider.should_refresh_models()
|
refresh = await provider.should_refresh_models()
|
||||||
if not (refresh or provider_id in self.listed_providers):
|
if not (refresh or provider_id in self.listed_providers):
|
||||||
|
@ -68,8 +40,6 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models):
|
||||||
|
|
||||||
await self.update_registered_models(provider_id, models)
|
await self.update_registered_models(provider_id, models)
|
||||||
|
|
||||||
await asyncio.sleep(self.model_refresh_interval_seconds)
|
|
||||||
|
|
||||||
async def list_models(self) -> ListModelsResponse:
|
async def list_models(self) -> ListModelsResponse:
|
||||||
return ListModelsResponse(data=await self.get_all_with_type("model"))
|
return ListModelsResponse(data=await self.get_all_with_type("model"))
|
||||||
|
|
||||||
|
@ -132,7 +102,7 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models):
|
||||||
provider_id=provider_id,
|
provider_id=provider_id,
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
model_type=model_type,
|
model_type=model_type,
|
||||||
source=RegistryEntrySource.default,
|
source=RegistryEntrySource.via_register_api,
|
||||||
)
|
)
|
||||||
registered_model = await self.register_object(model)
|
registered_model = await self.register_object(model)
|
||||||
return registered_model
|
return registered_model
|
||||||
|
@ -156,7 +126,7 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models):
|
||||||
for model in existing_models:
|
for model in existing_models:
|
||||||
if model.provider_id != provider_id:
|
if model.provider_id != provider_id:
|
||||||
continue
|
continue
|
||||||
if model.source == RegistryEntrySource.default:
|
if model.source == RegistryEntrySource.via_register_api:
|
||||||
model_ids[model.provider_resource_id] = model.identifier
|
model_ids[model.provider_resource_id] = model.identifier
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -176,6 +146,6 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models):
|
||||||
provider_id=provider_id,
|
provider_id=provider_id,
|
||||||
metadata=model.metadata,
|
metadata=model.metadata,
|
||||||
model_type=model.model_type,
|
model_type=model.model_type,
|
||||||
source=RegistryEntrySource.provider,
|
source=RegistryEntrySource.listed_from_provider,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
# This source code is licensed under the terms described in the LICENSE file in
|
||||||
# the root directory of this source tree.
|
# the root directory of this source tree.
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import importlib.resources
|
import importlib.resources
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
@ -38,6 +39,7 @@ from llama_stack.distribution.distribution import get_provider_registry
|
||||||
from llama_stack.distribution.inspect import DistributionInspectConfig, DistributionInspectImpl
|
from llama_stack.distribution.inspect import DistributionInspectConfig, DistributionInspectImpl
|
||||||
from llama_stack.distribution.providers import ProviderImpl, ProviderImplConfig
|
from llama_stack.distribution.providers import ProviderImpl, ProviderImplConfig
|
||||||
from llama_stack.distribution.resolver import ProviderRegistry, resolve_impls
|
from llama_stack.distribution.resolver import ProviderRegistry, resolve_impls
|
||||||
|
from llama_stack.distribution.routing_tables.common import CommonRoutingTableImpl
|
||||||
from llama_stack.distribution.store.registry import create_dist_registry
|
from llama_stack.distribution.store.registry import create_dist_registry
|
||||||
from llama_stack.distribution.utils.dynamic import instantiate_class_type
|
from llama_stack.distribution.utils.dynamic import instantiate_class_type
|
||||||
from llama_stack.log import get_logger
|
from llama_stack.log import get_logger
|
||||||
|
@ -90,6 +92,9 @@ RESOURCES = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
REGISTRY_REFRESH_INTERVAL_SECONDS = 300
|
||||||
|
|
||||||
|
|
||||||
async def register_resources(run_config: StackRunConfig, impls: dict[Api, Any]):
|
async def register_resources(run_config: StackRunConfig, impls: dict[Api, Any]):
|
||||||
for rsrc, api, register_method, list_method in RESOURCES:
|
for rsrc, api, register_method, list_method in RESOURCES:
|
||||||
objects = getattr(run_config, rsrc)
|
objects = getattr(run_config, rsrc)
|
||||||
|
@ -324,9 +329,33 @@ async def construct_stack(
|
||||||
add_internal_implementations(impls, run_config)
|
add_internal_implementations(impls, run_config)
|
||||||
|
|
||||||
await register_resources(run_config, impls)
|
await register_resources(run_config, impls)
|
||||||
|
|
||||||
|
task = asyncio.create_task(refresh_registry(impls))
|
||||||
|
|
||||||
|
def cb(task):
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
if task.cancelled():
|
||||||
|
logger.error("Model refresh task cancelled")
|
||||||
|
elif task.exception():
|
||||||
|
logger.error(f"Model refresh task failed: {task.exception()}")
|
||||||
|
traceback.print_exception(task.exception())
|
||||||
|
else:
|
||||||
|
logger.debug("Model refresh task completed")
|
||||||
|
|
||||||
|
task.add_done_callback(cb)
|
||||||
return impls
|
return impls
|
||||||
|
|
||||||
|
|
||||||
|
async def refresh_registry(impls: dict[Api, Any]):
|
||||||
|
routing_tables = [v for v in impls.values() if isinstance(v, CommonRoutingTableImpl)]
|
||||||
|
while True:
|
||||||
|
for routing_table in routing_tables:
|
||||||
|
await routing_table.refresh()
|
||||||
|
|
||||||
|
await asyncio.sleep(REGISTRY_REFRESH_INTERVAL_SECONDS)
|
||||||
|
|
||||||
|
|
||||||
def get_stack_run_config_from_template(template: str) -> StackRunConfig:
|
def get_stack_run_config_from_template(template: str) -> StackRunConfig:
|
||||||
template_path = importlib.resources.files("llama_stack") / f"templates/{template}/run.yaml"
|
template_path = importlib.resources.files("llama_stack") / f"templates/{template}/run.yaml"
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
# the root directory of this source tree.
|
# the root directory of this source tree.
|
||||||
|
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import base64
|
import base64
|
||||||
import uuid
|
import uuid
|
||||||
from collections.abc import AsyncGenerator, AsyncIterator
|
from collections.abc import AsyncGenerator, AsyncIterator
|
||||||
|
@ -97,14 +98,16 @@ class OllamaInferenceAdapter(
|
||||||
def __init__(self, config: OllamaImplConfig) -> None:
|
def __init__(self, config: OllamaImplConfig) -> None:
|
||||||
self.register_helper = ModelRegistryHelper(MODEL_ENTRIES)
|
self.register_helper = ModelRegistryHelper(MODEL_ENTRIES)
|
||||||
self.config = config
|
self.config = config
|
||||||
self._client = None
|
self._clients: dict[asyncio.AbstractEventLoop, AsyncClient] = {}
|
||||||
self._openai_client = None
|
self._openai_client = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def client(self) -> AsyncClient:
|
def client(self) -> AsyncClient:
|
||||||
if self._client is None:
|
# ollama client attaches itself to the current event loop (sadly?)
|
||||||
self._client = AsyncClient(host=self.config.url)
|
loop = asyncio.get_running_loop()
|
||||||
return self._client
|
if loop not in self._clients:
|
||||||
|
self._clients[loop] = AsyncClient(host=self.config.url)
|
||||||
|
return self._clients[loop]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def openai_client(self) -> AsyncOpenAI:
|
def openai_client(self) -> AsyncOpenAI:
|
||||||
|
@ -191,8 +194,7 @@ class OllamaInferenceAdapter(
|
||||||
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
|
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
|
||||||
|
|
||||||
async def shutdown(self) -> None:
|
async def shutdown(self) -> None:
|
||||||
self._client = None
|
self._clients.clear()
|
||||||
self._openai_client = None
|
|
||||||
|
|
||||||
async def unregister_model(self, model_id: str) -> None:
|
async def unregister_model(self, model_id: str) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -416,7 +416,7 @@ async def test_models_source_tracking_default(cached_disk_dist_registry):
|
||||||
models = await table.list_models()
|
models = await table.list_models()
|
||||||
assert len(models.data) == 1
|
assert len(models.data) == 1
|
||||||
model = models.data[0]
|
model = models.data[0]
|
||||||
assert model.source == RegistryEntrySource.default
|
assert model.source == RegistryEntrySource.via_register_api
|
||||||
assert model.identifier == "test_provider/user-model"
|
assert model.identifier == "test_provider/user-model"
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
|
@ -452,7 +452,7 @@ async def test_models_source_tracking_provider(cached_disk_dist_registry):
|
||||||
|
|
||||||
# All models should have provider source
|
# All models should have provider source
|
||||||
for model in models.data:
|
for model in models.data:
|
||||||
assert model.source == RegistryEntrySource.provider
|
assert model.source == RegistryEntrySource.listed_from_provider
|
||||||
assert model.provider_id == "test_provider"
|
assert model.provider_id == "test_provider"
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
|
@ -473,7 +473,7 @@ async def test_models_source_interaction_preserves_default(cached_disk_dist_regi
|
||||||
models = await table.list_models()
|
models = await table.list_models()
|
||||||
assert len(models.data) == 1
|
assert len(models.data) == 1
|
||||||
user_model = models.data[0]
|
user_model = models.data[0]
|
||||||
assert user_model.source == RegistryEntrySource.default
|
assert user_model.source == RegistryEntrySource.via_register_api
|
||||||
assert user_model.identifier == "my-custom-alias"
|
assert user_model.identifier == "my-custom-alias"
|
||||||
assert user_model.provider_resource_id == "provider-model-1"
|
assert user_model.provider_resource_id == "provider-model-1"
|
||||||
|
|
||||||
|
@ -505,11 +505,11 @@ async def test_models_source_interaction_preserves_default(cached_disk_dist_regi
|
||||||
provider_model = next((m for m in models.data if m.identifier == "different-model"), None)
|
provider_model = next((m for m in models.data if m.identifier == "different-model"), None)
|
||||||
|
|
||||||
assert user_model is not None
|
assert user_model is not None
|
||||||
assert user_model.source == RegistryEntrySource.default
|
assert user_model.source == RegistryEntrySource.via_register_api
|
||||||
assert user_model.provider_resource_id == "provider-model-1"
|
assert user_model.provider_resource_id == "provider-model-1"
|
||||||
|
|
||||||
assert provider_model is not None
|
assert provider_model is not None
|
||||||
assert provider_model.source == RegistryEntrySource.provider
|
assert provider_model.source == RegistryEntrySource.listed_from_provider
|
||||||
assert provider_model.provider_resource_id == "different-model"
|
assert provider_model.provider_resource_id == "different-model"
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
|
@ -565,8 +565,8 @@ async def test_models_source_interaction_cleanup_provider_models(cached_disk_dis
|
||||||
user_model = next((m for m in models.data if m.identifier == "test_provider/user-model"), None)
|
user_model = next((m for m in models.data if m.identifier == "test_provider/user-model"), None)
|
||||||
provider_model = next((m for m in models.data if m.identifier == "provider-model-new"), None)
|
provider_model = next((m for m in models.data if m.identifier == "provider-model-new"), None)
|
||||||
|
|
||||||
assert user_model.source == RegistryEntrySource.default
|
assert user_model.source == RegistryEntrySource.via_register_api
|
||||||
assert provider_model.source == RegistryEntrySource.provider
|
assert provider_model.source == RegistryEntrySource.listed_from_provider
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
await table.shutdown()
|
await table.shutdown()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue