diff --git a/src/llama_stack/core/access_control/access_control.py b/src/llama_stack/core/access_control/access_control.py index bde5cfd76..8baa58f7f 100644 --- a/src/llama_stack/core/access_control/access_control.py +++ b/src/llama_stack/core/access_control/access_control.py @@ -63,7 +63,13 @@ def default_policy() -> list[AccessRule]: return [ AccessRule( permit=Scope(actions=list(Action)), - when=["user in owners " + name for name in ["roles", "teams", "projects", "namespaces"]], + when=["user in owners " + name], + ) + for name in ["roles", "teams", "projects", "namespaces"] + ] + [ + AccessRule( + permit=Scope(actions=list(Action)), + when=["user is owner"], ), ] diff --git a/src/llama_stack/core/access_control/conditions.py b/src/llama_stack/core/access_control/conditions.py index 25a267124..0bb199993 100644 --- a/src/llama_stack/core/access_control/conditions.py +++ b/src/llama_stack/core/access_control/conditions.py @@ -40,7 +40,7 @@ class UserInOwnersList: def matches(self, resource: ProtectedResource, user: User) -> bool: required = self.owners_values(resource) if not required: - return True + return False if not user.attributes or self.name not in user.attributes or not user.attributes[self.name]: return False user_values = user.attributes[self.name] @@ -92,7 +92,7 @@ class UserWithValueNotInList(UserWithValueInList): class UserIsOwner: def matches(self, resource: ProtectedResource, user: User) -> bool: - return resource.owner.principal == user.principal if resource.owner else False + return resource.owner.principal == user.principal if resource.owner else True def __repr__(self): return "user is owner" diff --git a/tests/unit/server/test_access_control.py b/tests/unit/server/test_access_control.py index 23a9636d5..7c1a4b9b2 100644 --- a/tests/unit/server/test_access_control.py +++ b/tests/unit/server/test_access_control.py @@ -77,7 +77,7 @@ async def test_access_control_with_cache(mock_get_authenticated_user, test_setup with pytest.raises(ValueError): await routing_table.get_model("model-data-scientist") - mock_get_authenticated_user.return_value = User("test-user", {"roles": ["data-scientist"], "teams": ["other-team"]}) + mock_get_authenticated_user.return_value = User("test-user", {"roles": ["user"], "teams": ["other-team"]}) all_models = await routing_table.list_models() assert len(all_models.data) == 1 assert all_models.data[0].identifier == "model-public" @@ -158,11 +158,12 @@ async def test_access_control_empty_attributes(mock_get_authenticated_user, test "roles": [], }, ) - result = await routing_table.get_model("model-empty-attrs") - assert result.identifier == "model-empty-attrs" + # With the security fix, resources with empty owner attributes should deny access + with pytest.raises(ValueError): + await routing_table.get_model("model-empty-attrs") all_models = await routing_table.list_models() model_ids = [m.identifier for m in all_models.data] - assert "model-empty-attrs" in model_ids + assert "model-empty-attrs" not in model_ids @patch("llama_stack.core.routing_tables.common.get_authenticated_user") @@ -210,6 +211,7 @@ async def test_automatic_access_attributes(mock_get_authenticated_user, test_set provider_id="test_provider", provider_resource_id="auto-access-model", model_type=ModelType.llm, + owner=User("test-user", {}), ) await routing_table.register_object(model) @@ -222,17 +224,15 @@ async def test_automatic_access_attributes(mock_get_authenticated_user, test_set assert registered_model.owner.attributes["projects"] == ["llama-3"] # Verify another user without matching attributes can't access it - mock_get_authenticated_user.return_value = User("test-user", {"roles": ["engineer"], "teams": ["infra-team"]}) + mock_get_authenticated_user.return_value = User("test-user2", {"roles": ["engineer"], "teams": ["infra-team"]}) with pytest.raises(ValueError): await routing_table.get_model("auto-access-model") # But a user with matching attributes can mock_get_authenticated_user.return_value = User( - "test-user", + "test-user3 ", { - "roles": ["data-scientist", "engineer"], "teams": ["ml-team", "platform-team"], - "projects": ["llama-3"], }, ) model = await routing_table.get_model("auto-access-model") @@ -376,10 +376,10 @@ def test_permit_unless(): identifier="mymodel", provider_id="myprovider", model_type=ModelType.llm, - owner=User("testuser", {"namespaces": ["foo"]}), + owner=User("testuser", {"namespaces": ["foo"], "teams": ["ml-team"]}), ) assert is_action_allowed(policy, "read", model, User("user-1", {"namespaces": ["foo"]})) - assert not is_action_allowed(policy, "read", model, User("user-1", {"namespaces": ["bar"]})) + assert not is_action_allowed(policy, "read", model, User("user-1", {"namespaces": ["bar"], "teams": ["ml-team"]})) assert not is_action_allowed(policy, "read", model, User("user-2", {"namespaces": ["foo"]}))