fix: access control to fail-closed when owner attributes are missing

Changed UserInOwnersList.matches() to return False instead of True when
a resource's owner attributes are None. This prevents unintended access
when resource when owner attributes arn't present.

For example, checking "user in owners teams" now returns False if the
resource has no teams attribute, rather than defaulting to True.

Changed UserIsOwner.matches() to return True when a resource has no
owner attribute set. This allows access to resources that don't use the
owner attribute.

Updated default_policy to use multiple separate "user in owners"
AccessRules instead of a single rule with multiple when clauses. With
the new fail-closed behavior, only one rule needs to match. Added a
"user is owner" rule to handle resources without attribute-based access.

Closes: #4272

Signed-off-by: Derek Higgins <derekh@redhat.com>
This commit is contained in:
Derek Higgins 2025-11-13 16:44:42 +00:00
parent ee107aadd6
commit a29253e9bf
3 changed files with 19 additions and 13 deletions

View file

@ -63,7 +63,13 @@ def default_policy() -> list[AccessRule]:
return [ return [
AccessRule( AccessRule(
permit=Scope(actions=list(Action)), permit=Scope(actions=list(Action)),
when=["user in owners " + name for name in ["roles", "teams", "projects", "namespaces"]], when=["user in owners " + name],
)
for name in ["roles", "teams", "projects", "namespaces"]
] + [
AccessRule(
permit=Scope(actions=list(Action)),
when=["user is owner"],
), ),
] ]

View file

@ -40,7 +40,7 @@ class UserInOwnersList:
def matches(self, resource: ProtectedResource, user: User) -> bool: def matches(self, resource: ProtectedResource, user: User) -> bool:
required = self.owners_values(resource) required = self.owners_values(resource)
if not required: if not required:
return True return False
if not user.attributes or self.name not in user.attributes or not user.attributes[self.name]: if not user.attributes or self.name not in user.attributes or not user.attributes[self.name]:
return False return False
user_values = user.attributes[self.name] user_values = user.attributes[self.name]
@ -92,7 +92,7 @@ class UserWithValueNotInList(UserWithValueInList):
class UserIsOwner: class UserIsOwner:
def matches(self, resource: ProtectedResource, user: User) -> bool: def matches(self, resource: ProtectedResource, user: User) -> bool:
return resource.owner.principal == user.principal if resource.owner else False return resource.owner.principal == user.principal if resource.owner else True
def __repr__(self): def __repr__(self):
return "user is owner" return "user is owner"

View file

@ -77,7 +77,7 @@ async def test_access_control_with_cache(mock_get_authenticated_user, test_setup
with pytest.raises(ValueError): with pytest.raises(ValueError):
await routing_table.get_model("model-data-scientist") await routing_table.get_model("model-data-scientist")
mock_get_authenticated_user.return_value = User("test-user", {"roles": ["data-scientist"], "teams": ["other-team"]}) mock_get_authenticated_user.return_value = User("test-user", {"roles": ["user"], "teams": ["other-team"]})
all_models = await routing_table.list_models() all_models = await routing_table.list_models()
assert len(all_models.data) == 1 assert len(all_models.data) == 1
assert all_models.data[0].identifier == "model-public" assert all_models.data[0].identifier == "model-public"
@ -158,11 +158,12 @@ async def test_access_control_empty_attributes(mock_get_authenticated_user, test
"roles": [], "roles": [],
}, },
) )
result = await routing_table.get_model("model-empty-attrs") # With the security fix, resources with empty owner attributes should deny access
assert result.identifier == "model-empty-attrs" with pytest.raises(ValueError):
await routing_table.get_model("model-empty-attrs")
all_models = await routing_table.list_models() all_models = await routing_table.list_models()
model_ids = [m.identifier for m in all_models.data] model_ids = [m.identifier for m in all_models.data]
assert "model-empty-attrs" in model_ids assert "model-empty-attrs" not in model_ids
@patch("llama_stack.core.routing_tables.common.get_authenticated_user") @patch("llama_stack.core.routing_tables.common.get_authenticated_user")
@ -210,6 +211,7 @@ async def test_automatic_access_attributes(mock_get_authenticated_user, test_set
provider_id="test_provider", provider_id="test_provider",
provider_resource_id="auto-access-model", provider_resource_id="auto-access-model",
model_type=ModelType.llm, model_type=ModelType.llm,
owner=User("test-user", {}),
) )
await routing_table.register_object(model) await routing_table.register_object(model)
@ -222,17 +224,15 @@ async def test_automatic_access_attributes(mock_get_authenticated_user, test_set
assert registered_model.owner.attributes["projects"] == ["llama-3"] assert registered_model.owner.attributes["projects"] == ["llama-3"]
# Verify another user without matching attributes can't access it # Verify another user without matching attributes can't access it
mock_get_authenticated_user.return_value = User("test-user", {"roles": ["engineer"], "teams": ["infra-team"]}) mock_get_authenticated_user.return_value = User("test-user2", {"roles": ["engineer"], "teams": ["infra-team"]})
with pytest.raises(ValueError): with pytest.raises(ValueError):
await routing_table.get_model("auto-access-model") await routing_table.get_model("auto-access-model")
# But a user with matching attributes can # But a user with matching attributes can
mock_get_authenticated_user.return_value = User( mock_get_authenticated_user.return_value = User(
"test-user", "test-user3 ",
{ {
"roles": ["data-scientist", "engineer"],
"teams": ["ml-team", "platform-team"], "teams": ["ml-team", "platform-team"],
"projects": ["llama-3"],
}, },
) )
model = await routing_table.get_model("auto-access-model") model = await routing_table.get_model("auto-access-model")
@ -376,10 +376,10 @@ def test_permit_unless():
identifier="mymodel", identifier="mymodel",
provider_id="myprovider", provider_id="myprovider",
model_type=ModelType.llm, model_type=ModelType.llm,
owner=User("testuser", {"namespaces": ["foo"]}), owner=User("testuser", {"namespaces": ["foo"], "teams": ["ml-team"]}),
) )
assert is_action_allowed(policy, "read", model, User("user-1", {"namespaces": ["foo"]})) assert is_action_allowed(policy, "read", model, User("user-1", {"namespaces": ["foo"]}))
assert not is_action_allowed(policy, "read", model, User("user-1", {"namespaces": ["bar"]})) assert not is_action_allowed(policy, "read", model, User("user-1", {"namespaces": ["bar"], "teams": ["ml-team"]}))
assert not is_action_allowed(policy, "read", model, User("user-2", {"namespaces": ["foo"]})) assert not is_action_allowed(policy, "read", model, User("user-2", {"namespaces": ["foo"]}))