Changes to access rule conditions:

* change from access_attributes to owner on dynamically created resources
 * define simpler string based conditions for more intuitive restriction
This commit is contained in:
Gordon Sim 2025-05-29 20:21:20 +01:00
parent 01ad876012
commit 96cd51a0c8
20 changed files with 427 additions and 431 deletions

View file

@ -4,16 +4,18 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, Protocol
from typing import Any
from llama_stack.distribution.request_headers import User
from llama_stack.distribution.datatypes import User
from .conditions import (
Condition,
ProtectedResource,
parse_conditions,
)
from .datatypes import (
AccessAttributes,
AccessRule,
Action,
AttributeReference,
Condition,
Scope,
)
@ -37,43 +39,6 @@ def matches_scope(
return action in scope.actions
def user_in_literal(
literal: str,
user_attributes: dict[str, list[str]] | None,
) -> bool:
for qualifier in ["role::", "team::", "project::", "namespace::"]:
if literal.startswith(qualifier):
if not user_attributes:
return False
ref = qualifier.replace("::", "s")
if ref in user_attributes:
value = literal.removeprefix(qualifier)
return value in user_attributes[ref]
else:
return False
return False
def user_in(
ref: AttributeReference | str,
resource_attributes: AccessAttributes | None,
user_attributes: dict[str, list[str]] | None,
) -> bool:
if not ref.startswith("resource."):
return user_in_literal(ref, user_attributes)
name = ref.removeprefix("resource.")
required = resource_attributes and getattr(resource_attributes, name)
if not required:
return True
if not user_attributes or name not in user_attributes:
return False
actual = user_attributes[name]
for value in required:
if value in actual:
return True
return False
def as_list(obj: Any) -> list[Any]:
if isinstance(obj, list):
return obj
@ -82,55 +47,27 @@ def as_list(obj: Any) -> list[Any]:
def matches_conditions(
conditions: list[Condition],
resource_attributes: AccessAttributes | None,
user_attributes: dict[str, list[str]] | None,
resource: ProtectedResource,
user: User,
) -> bool:
for condition in conditions:
# must match all conditions
if not matches_condition(condition, resource_attributes, user_attributes):
if not condition.matches(resource, user):
return False
return True
def matches_condition(
condition: Condition | list[Condition],
resource_attributes: AccessAttributes | None,
user_attributes: dict[str, list[str]] | None,
) -> bool:
if isinstance(condition, list):
return matches_conditions(as_list(condition), resource_attributes, user_attributes)
if condition.user_in:
for ref in as_list(condition.user_in):
# if multiple references are specified, all must match
if not user_in(ref, resource_attributes, user_attributes):
return False
return True
if condition.user_not_in:
for ref in as_list(condition.user_not_in):
# if multiple references are specified, none must match
if user_in(ref, resource_attributes, user_attributes):
return False
return True
return True
def default_policy() -> list[AccessRule]:
# for backwards compatibility, if no rules are provided , assume
# full access to all subject to attribute matching rules
# for backwards compatibility, if no rules are provided, assume
# full access subject to previous attribute matching rules
return [
AccessRule(
permit=Scope(actions=list(Action)),
when=Condition(user_in=list(AttributeReference)),
)
when=["user in owners " + name for name in ["roles", "teams", "projects", "namespaces"]],
),
]
class ProtectedResource(Protocol):
type: str
identifier: str
access_attributes: AccessAttributes
def is_action_allowed(
policy: list[AccessRule],
action: Action,
@ -144,26 +81,23 @@ def is_action_allowed(
if not len(policy):
policy = default_policy()
resource_attributes = AccessAttributes()
if hasattr(resource, "access_attributes"):
resource_attributes = resource.access_attributes
qualified_resource_id = resource.type + "::" + resource.identifier
for rule in policy:
if rule.forbid and matches_scope(rule.forbid, action, qualified_resource_id, user.principal):
if rule.when:
if matches_condition(rule.when, resource_attributes, user.attributes):
if matches_conditions(parse_conditions(as_list(rule.when)), resource, user):
return False
elif rule.unless:
if not matches_condition(rule.unless, resource_attributes, user.attributes):
if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user):
return False
else:
return False
elif rule.permit and matches_scope(rule.permit, action, qualified_resource_id, user.principal):
if rule.when:
if matches_condition(rule.when, resource_attributes, user.attributes):
if matches_conditions(parse_conditions(as_list(rule.when)), resource, user):
return True
elif rule.unless:
if not matches_condition(rule.unless, resource_attributes, user.attributes):
if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user):
return True
else:
return True