diff --git a/src/llama_stack/core/access_control/access_control.py b/src/llama_stack/core/access_control/access_control.py index bde5cfd76..944913272 100644 --- a/src/llama_stack/core/access_control/access_control.py +++ b/src/llama_stack/core/access_control/access_control.py @@ -7,6 +7,7 @@ from typing import Any from llama_stack.core.datatypes import User +from llama_stack.log import get_logger from .conditions import ( Condition, @@ -19,6 +20,8 @@ from .datatypes import ( Scope, ) +logger = get_logger(name=__name__, category="core::auth") + def matches_resource(resource_scope: str, actual_resource: str) -> bool: if resource_scope == actual_resource: @@ -74,35 +77,63 @@ def is_action_allowed( resource: ProtectedResource, user: User | None, ) -> bool: + qualified_resource_id = f"{resource.type}::{resource.identifier}" + decision = False + reason = "" + index = -1 + # If user is not set, assume authentication is not enabled if not user: - return True + decision = True + reason = "no auth" + else: + if not len(policy): + policy = default_policy() - if not len(policy): - policy = default_policy() + for index, rule in enumerate(policy): # noqa: B007 + if rule.forbid and matches_scope(rule.forbid, action, qualified_resource_id, user.principal): + if rule.when: + if matches_conditions(parse_conditions(as_list(rule.when)), resource, user): + decision = False + reason = rule.description or "" + break + elif rule.unless: + if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user): + decision = False + reason = rule.description or "" + break + else: + decision = False + reason = rule.description or "" + break + elif rule.permit and matches_scope(rule.permit, action, qualified_resource_id, user.principal): + if rule.when: + if matches_conditions(parse_conditions(as_list(rule.when)), resource, user): + decision = True + reason = rule.description or "" + break + elif rule.unless: + if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user): + decision = True + reason = rule.description or "" + break + else: + decision = True + reason = rule.description or "" + break + else: + reason = "no matching rule" + index = -1 - qualified_resource_id = f"{resource.type}::{resource.identifier}" - for rule in policy: - if rule.forbid and matches_scope(rule.forbid, action, qualified_resource_id, user.principal): - if rule.when: - if matches_conditions(parse_conditions(as_list(rule.when)), resource, user): - return False - elif rule.unless: - if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user): - return False - else: - return False - elif rule.permit and matches_scope(rule.permit, action, qualified_resource_id, user.principal): - if rule.when: - if matches_conditions(parse_conditions(as_list(rule.when)), resource, user): - return True - elif rule.unless: - if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user): - return True - else: - return True - # assume access is denied unless we find a rule that permits access - return False + # print apprived or denied + decision_str = "APPROVED" if decision else "DENIED" + user_str = user.principal if user else "none" + logger.debug( + f"AUTHZ,decision={decision_str},user={user_str}," + f"resource_id={qualified_resource_id},action={action}," + f"rule_index={index},reason={reason!r}" + ) + return decision class AccessDeniedError(RuntimeError):