From 9afa387d1619532120b65038c2ea3403f612cf2d Mon Sep 17 00:00:00 2001
From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com>
Date: Wed, 3 Dec 2025 13:02:37 -0500
Subject: [PATCH] fix: RBAC bypass vulnerabilities in model access (backport
#4270) (#4285)
Closes security gaps where RBAC checks could be bypassed:
o Inference router: Added RBAC enforcement in the fallback
path to ensure access control is applied consistently.
o Model listing: Dynamic models fetched via provider_data were returned
without RBAC checks. Added filtering to ensure users only see models
they have permission to access.
Both fixes create temporary ModelWithOwner objects for RBAC validation,
maintaining security through consistent access control enforcement.
Closes: #4269
This is an automatic backport of pull request #4270 done by
[Mergify](https://mergify.com).
Signed-off-by: Derek Higgins
Signed-off-by: Charlie Doern
Co-authored-by: Derek Higgins
---
llama_stack/core/routers/inference.py | 35 ++++++++-
tests/unit/server/test_access_control.py | 96 ++++++++++++++++++++++++
2 files changed, 130 insertions(+), 1 deletion(-)
diff --git a/llama_stack/core/routers/inference.py b/llama_stack/core/routers/inference.py
index f24c692b5..f6c69ee99 100644
--- a/llama_stack/core/routers/inference.py
+++ b/llama_stack/core/routers/inference.py
@@ -49,10 +49,17 @@ from llama_stack.apis.inference import (
)
from llama_stack.apis.models import Model, ModelType
from llama_stack.apis.telemetry import MetricEvent, MetricInResponse, Telemetry
+from llama_stack.core.access_control.access_control import is_action_allowed
+from llama_stack.core.datatypes import ModelWithOwner
+from llama_stack.core.request_headers import get_authenticated_user
from llama_stack.log import get_logger
from llama_stack.models.llama.llama3.chat_format import ChatFormat
from llama_stack.models.llama.llama3.tokenizer import Tokenizer
-from llama_stack.providers.datatypes import HealthResponse, HealthStatus, RoutingTable
+from llama_stack.providers.datatypes import (
+ HealthResponse,
+ HealthStatus,
+ RoutingTable,
+)
from llama_stack.providers.utils.inference.inference_store import InferenceStore
from llama_stack.providers.utils.telemetry.tracing import enqueue_event, get_current_span
@@ -186,15 +193,41 @@ class InferenceRouter(Inference):
provider = await self.routing_table.get_provider_impl(model.identifier)
return provider, model.provider_resource_id
+ # Handles cases where clients use the provider format directly
+ return await self._get_provider_by_fallback(model_id, expected_model_type)
+
+ async def _get_provider_by_fallback(self, model_id: str, expected_model_type: str) -> tuple[Inference, str]:
+ """
+ Handle fallback case where model_id is in provider_id/provider_resource_id format.
+ """
splits = model_id.split("/", maxsplit=1)
if len(splits) != 2:
raise ModelNotFoundError(model_id)
provider_id, provider_resource_id = splits
+
+ # Check if provider exists
if provider_id not in self.routing_table.impls_by_provider_id:
logger.warning(f"Provider {provider_id} not found for model {model_id}")
raise ModelNotFoundError(model_id)
+ # Create a temporary model object for RBAC check
+ temp_model = ModelWithOwner(
+ identifier=model_id,
+ provider_id=provider_id,
+ provider_resource_id=provider_resource_id,
+ model_type=expected_model_type,
+ metadata={}, # Empty metadata for temporary object
+ )
+
+ # Perform RBAC check
+ user = get_authenticated_user()
+ if not is_action_allowed(self.routing_table.policy, "read", temp_model, user):
+ logger.debug(
+ f"Access denied to model '{model_id}' via fallback path for user {user.principal if user else 'anonymous'}"
+ )
+ raise ModelNotFoundError(model_id)
+
return self.routing_table.impls_by_provider_id[provider_id], provider_resource_id
async def openai_completion(
diff --git a/tests/unit/server/test_access_control.py b/tests/unit/server/test_access_control.py
index ea4f9b8b2..78c0eff02 100644
--- a/tests/unit/server/test_access_control.py
+++ b/tests/unit/server/test_access_control.py
@@ -10,10 +10,12 @@ import pytest
import yaml
from pydantic import TypeAdapter, ValidationError
+from llama_stack.apis.common.errors import ModelNotFoundError
from llama_stack.apis.datatypes import Api
from llama_stack.apis.models import ModelType
from llama_stack.core.access_control.access_control import AccessDeniedError, is_action_allowed
from llama_stack.core.datatypes import AccessRule, ModelWithOwner, User
+from llama_stack.core.routers.inference import InferenceRouter
from llama_stack.core.routing_tables.models import ModelsRoutingTable
@@ -558,3 +560,97 @@ def test_condition_reprs(condition):
from llama_stack.core.access_control.conditions import parse_condition
assert condition == str(parse_condition(condition))
+
+
+@pytest.fixture
+def restricted_user():
+ """User with limited access."""
+ return User("restricted-user", {"roles": ["user"]})
+
+
+@pytest.fixture
+def admin_user():
+ """User with admin access."""
+ return User("admin-user", {"roles": ["admin"]})
+
+
+@pytest.fixture
+def rbac_policy():
+ """RBAC policy that restricts access to certain models."""
+ from llama_stack.core.access_control.datatypes import Action, Scope
+
+ return [
+ # Admins get full access
+ AccessRule(
+ permit=Scope(actions=list(Action)),
+ when=["user with admin in roles"],
+ ),
+ # Regular users only get read access to their own resources
+ AccessRule(
+ permit=Scope(actions=[Action.READ]),
+ when=["user is owner"],
+ ),
+ ]
+
+
+class TestInferenceRouterRBACBypass:
+ """Test RBAC bypass vulnerability in inference router fallback path."""
+
+ @pytest.fixture
+ def mock_routing_table(self):
+ """Create a mock routing table for testing."""
+ routing_table = AsyncMock()
+ routing_table.impls_by_provider_id = {"test-provider": AsyncMock()}
+ routing_table.policy = []
+ return routing_table
+
+ @patch("llama_stack.core.routers.inference.get_authenticated_user")
+ async def test_registry_path_and_fallback_path_consistent(
+ self, mock_get_user, mock_routing_table, restricted_user, admin_user, rbac_policy
+ ):
+ """Test that registry path and fallback path have consistent RBAC enforcement."""
+ mock_routing_table.policy = rbac_policy
+
+ # Create a model owned by admin
+ admin_model = ModelWithOwner(
+ identifier="admin-model",
+ provider_id="test-provider",
+ provider_resource_id="admin-resource",
+ model_type=ModelType.llm,
+ type="model",
+ metadata={},
+ owner=admin_user,
+ )
+
+ # Setup router
+ router = InferenceRouter(
+ routing_table=mock_routing_table,
+ store=None,
+ )
+
+ # Test 1: Restricted user tries to access via registry (should fail)
+ mock_get_user.return_value = restricted_user
+ mock_routing_table.get_object_by_identifier.return_value = None # RBAC blocks it
+ with pytest.raises(ModelNotFoundError):
+ await router._get_model_provider("admin-model", "llm")
+
+ # Test 2: Restricted user tries to access via fallback path (should also fail)
+ mock_routing_table.get_object_by_identifier.return_value = None
+ with pytest.raises(ModelNotFoundError):
+ await router._get_model_provider("test-provider/admin-resource", "llm")
+
+ # Test 3: Admin user can access via registry
+ mock_get_user.return_value = admin_user
+ mock_routing_table.get_object_by_identifier.return_value = admin_model
+ provider_mock = AsyncMock()
+ mock_routing_table.get_provider_impl.return_value = provider_mock
+
+ provider, resource_id = await router._get_model_provider("admin-model", "llm")
+ assert provider == provider_mock
+ assert resource_id == "admin-resource"
+
+ # Test 4: Admin user can also access via fallback path
+ mock_routing_table.get_object_by_identifier.return_value = None
+ provider, resource_id = await router._get_model_provider("test-provider/admin-resource", "llm")
+ assert provider == mock_routing_table.impls_by_provider_id["test-provider"]
+ assert resource_id == "admin-resource"