fix: set SqlRecord owner to None when owner_principal is empty (#4284)

Changes SqlRecord creation in AuthorizedSqlStore.fetch_all to use
owner=None when owner_principal is empty/missing, matching the
ResourceWithOwner pattern used in routing tables. This fixes an
inconsistency where SQL store was creating User(principal="") while
routing tables use owner=None for public resources.

Changes:
o Update ProtectedResource Protocol to allow owner: User | None 
o Update SqlRecord.__init__ to accept owner: User | None 
o Update fetch_all to create owner=None for records without
owner_principal

Signed-off-by: Derek Higgins <derekh@redhat.com>
This commit is contained in:
Derek Higgins 2025-12-03 18:28:33 +00:00 committed by GitHub
parent aa3898f486
commit fcd6370b34
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 64 additions and 11 deletions

View file

@ -15,7 +15,7 @@ class User(Protocol):
class ProtectedResource(Protocol):
type: str
identifier: str
owner: User
owner: User | None
class Condition(Protocol):

View file

@ -56,7 +56,7 @@ def _enhance_item_with_access_control(item: Mapping[str, Any], current_user: Use
class SqlRecord(ProtectedResource):
def __init__(self, record_id: str, table_name: str, owner: User):
def __init__(self, record_id: str, table_name: str, owner: User | None):
self.type = f"sql_record::{table_name}"
self.identifier = record_id
self.owner = owner
@ -171,12 +171,16 @@ class AuthorizedSqlStore:
for row in rows.data:
stored_access_attrs = row.get("access_attributes")
stored_owner_principal = row.get("owner_principal") or ""
stored_owner_principal = row.get("owner_principal")
record_id = row.get("id", "unknown")
sql_record = SqlRecord(
str(record_id), table, User(principal=stored_owner_principal, attributes=stored_access_attrs)
# Create owner as None if owner_principal is empty/missing, matching ResourceWithOwner behavior
owner = (
User(principal=stored_owner_principal, attributes=stored_access_attrs)
if stored_owner_principal
else None
)
sql_record = SqlRecord(str(record_id), table, owner)
if is_action_allowed(self.policy, action, sql_record, current_user):
filtered_rows.append(row)

View file

@ -247,3 +247,36 @@ async def test_user_ownership_policy(mock_get_authenticated_user, authorized_sto
finally:
# Clean up records
await cleanup_records(authorized_store.sql_store, table_name, ["1", "2"])
@pytest.mark.parametrize("backend_config", BACKEND_CONFIGS)
@patch("llama_stack.core.storage.sqlstore.authorized_sqlstore.get_authenticated_user")
async def test_sqlrecord_created_with_no_owner(mock_get_authenticated_user, authorized_store, request):
"""Test that SqlRecord is created with no owner == None when owner_principal is empty/missing"""
backend_name = request.node.callspec.id
# Create test table
table_name = f"test_sqlrecord_created_with_no_owner_{backend_name}"
await create_test_table(authorized_store, table_name)
try:
# Test with no authenticated user (should handle JSON null comparison)
mock_get_authenticated_user.return_value = None
# Insert some test data
await authorized_store.insert(table_name, {"id": "1", "data": "public_data"})
# Test fetching with no user - should create SqlRecord with no owner
with patch(
"llama_stack.core.storage.sqlstore.authorized_sqlstore.is_action_allowed", return_value=True
) as mock_is_action_allowed:
result = await authorized_store.fetch_all(table_name)
mock_is_action_allowed.assert_called_once()
args = mock_is_action_allowed.call_args
assert args[0][2].type == f"sql_record::{table_name}"
assert args[0][2].owner is None
assert len(result.data) == 1
finally:
# Clean up records
await cleanup_records(authorized_store.sql_store, table_name, ["1"])

View file

@ -101,22 +101,32 @@ async def test_sql_policy_consistency(mock_get_authenticated_user):
# Test scenarios with different access control patterns
test_scenarios = [
# Scenario 1: Public record (no access control - represents None user insert)
{"id": "1", "name": "public", "access_attributes": None},
{"id": "1", "name": "public", "owner_principal": "", "access_attributes": None},
# Scenario 2: Record with roles requirement
{"id": "2", "name": "admin-only", "access_attributes": {"roles": ["admin"]}},
{"id": "2", "name": "admin-only", "owner_principal": "owner1", "access_attributes": {"roles": ["admin"]}},
# Scenario 3: Record with multiple attribute categories
{"id": "3", "name": "admin-ml-team", "access_attributes": {"roles": ["admin"], "teams": ["ml-team"]}},
{
"id": "3",
"name": "admin-ml-team",
"owner_principal": "owner2",
"access_attributes": {"roles": ["admin"], "teams": ["ml-team"]},
},
# Scenario 4: Record with teams only (missing roles category)
{"id": "4", "name": "ml-team-only", "access_attributes": {"teams": ["ml-team"]}},
{
"id": "4",
"name": "ml-team-only",
"owner_principal": "owner3",
"access_attributes": {"teams": ["ml-team"]},
},
# Scenario 5: Record with roles and projects
{
"id": "5",
"name": "admin-project-x",
"owner_principal": "owner4",
"access_attributes": {"roles": ["admin"], "projects": ["project-x"]},
},
]
mock_get_authenticated_user.return_value = User("test-user", {"roles": ["admin"]})
for scenario in test_scenarios:
await base_sqlstore.insert("resources", scenario)
@ -148,10 +158,16 @@ async def test_sql_policy_consistency(mock_get_authenticated_user):
sql_ids = {row["id"] for row in sql_results.data}
policy_ids = set()
for scenario in test_scenarios:
# Create owner matching what was stored (None for public records)
owner = (
User(principal=scenario["owner_principal"], attributes=scenario["access_attributes"])
if scenario["owner_principal"]
else None
)
sql_record = SqlRecord(
record_id=scenario["id"],
table_name="resources",
owner=User(principal="test-user", attributes=scenario["access_attributes"]),
owner=owner,
)
if is_action_allowed(policy, Action.READ, sql_record, user):