mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-03 01:48:05 +00:00
feat: Add debug logging for RBAC access control decisions
Refactor is_action_allowed() to track decision outcome, matched rule index, and reason. Add structured debug log output for troubleshooting access control. Signed-off-by: Derek Higgins <derekh@redhat.com>
This commit is contained in:
parent
e243892ef0
commit
61022245f0
1 changed files with 56 additions and 25 deletions
|
|
@ -7,6 +7,7 @@
|
|||
from typing import Any
|
||||
|
||||
from llama_stack.core.datatypes import User
|
||||
from llama_stack.log import get_logger
|
||||
|
||||
from .conditions import (
|
||||
Condition,
|
||||
|
|
@ -19,6 +20,8 @@ from .datatypes import (
|
|||
Scope,
|
||||
)
|
||||
|
||||
logger = get_logger(name=__name__, category="core::auth")
|
||||
|
||||
|
||||
def matches_resource(resource_scope: str, actual_resource: str) -> bool:
|
||||
if resource_scope == actual_resource:
|
||||
|
|
@ -74,35 +77,63 @@ def is_action_allowed(
|
|||
resource: ProtectedResource,
|
||||
user: User | None,
|
||||
) -> bool:
|
||||
qualified_resource_id = f"{resource.type}::{resource.identifier}"
|
||||
decision = False
|
||||
reason = ""
|
||||
index = -1
|
||||
|
||||
# If user is not set, assume authentication is not enabled
|
||||
if not user:
|
||||
return True
|
||||
|
||||
decision = True
|
||||
reason = "no auth"
|
||||
else:
|
||||
if not len(policy):
|
||||
policy = default_policy()
|
||||
|
||||
qualified_resource_id = f"{resource.type}::{resource.identifier}"
|
||||
for rule in policy:
|
||||
for index, rule in enumerate(policy): # noqa: B007
|
||||
if rule.forbid and matches_scope(rule.forbid, action, qualified_resource_id, user.principal):
|
||||
if rule.when:
|
||||
if matches_conditions(parse_conditions(as_list(rule.when)), resource, user):
|
||||
return False
|
||||
decision = False
|
||||
reason = rule.description or ""
|
||||
break
|
||||
elif rule.unless:
|
||||
if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user):
|
||||
return False
|
||||
decision = False
|
||||
reason = rule.description or ""
|
||||
break
|
||||
else:
|
||||
return False
|
||||
decision = False
|
||||
reason = rule.description or ""
|
||||
break
|
||||
elif rule.permit and matches_scope(rule.permit, action, qualified_resource_id, user.principal):
|
||||
if rule.when:
|
||||
if matches_conditions(parse_conditions(as_list(rule.when)), resource, user):
|
||||
return True
|
||||
decision = True
|
||||
reason = rule.description or ""
|
||||
break
|
||||
elif rule.unless:
|
||||
if not matches_conditions(parse_conditions(as_list(rule.unless)), resource, user):
|
||||
return True
|
||||
decision = True
|
||||
reason = rule.description or ""
|
||||
break
|
||||
else:
|
||||
return True
|
||||
# assume access is denied unless we find a rule that permits access
|
||||
return False
|
||||
decision = True
|
||||
reason = rule.description or ""
|
||||
break
|
||||
else:
|
||||
reason = "no matching rule"
|
||||
index = -1
|
||||
|
||||
# print apprived or denied
|
||||
decision_str = "APPROVED" if decision else "DENIED"
|
||||
user_str = user.principal if user else "none"
|
||||
logger.debug(
|
||||
f"AUTHZ,decision={decision_str},user={user_str},"
|
||||
f"resource_id={qualified_resource_id},action={action},"
|
||||
f"rule_index={index},reason={reason!r}"
|
||||
)
|
||||
return decision
|
||||
|
||||
|
||||
class AccessDeniedError(RuntimeError):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue