mirror of
				https://github.com/meta-llama/llama-stack.git
				synced 2025-10-23 00:27:26 +00:00 
			
		
		
		
	
		
			Some checks failed
		
		
	
	SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 1s
				
			Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 1s
				
			SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 0s
				
			Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
				
			Python Package Build Test / build (3.12) (push) Failing after 1s
				
			Python Package Build Test / build (3.13) (push) Failing after 1s
				
			Integration Tests (Replay) / Integration Tests (, , , client=, ) (push) Failing after 2s
				
			Unit Tests / unit-tests (3.13) (push) Failing after 3s
				
			Update ReadTheDocs / update-readthedocs (push) Failing after 3s
				
			Test External API and Providers / test-external (venv) (push) Failing after 4s
				
			Vector IO Integration Tests / test-matrix (push) Failing after 4s
				
			UI Tests / ui-tests (22) (push) Successful in 35s
				
			API Conformance Tests / check-schema-compatibility (push) Successful in 6s
				
			Unit Tests / unit-tests (3.12) (push) Failing after 3s
				
			Pre-commit / pre-commit (push) Successful in 1m19s
				
			# What does this PR do? This PR is generated with AI and reviewed by me. Refactors the AuthorizedSqlStore class to store the access policy as an instance variable rather than passing it as a parameter to each method call. This simplifies the API. # Test Plan existing tests
		
			
				
	
	
		
			213 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			213 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) Meta Platforms, Inc. and affiliates.
 | |
| # All rights reserved.
 | |
| #
 | |
| # This source code is licensed under the terms described in the LICENSE file in
 | |
| # the root directory of this source tree.
 | |
| 
 | |
| from tempfile import TemporaryDirectory
 | |
| from unittest.mock import patch
 | |
| 
 | |
| from llama_stack.core.access_control.access_control import default_policy, is_action_allowed
 | |
| from llama_stack.core.access_control.datatypes import Action
 | |
| from llama_stack.core.datatypes import User
 | |
| from llama_stack.providers.utils.sqlstore.api import ColumnType
 | |
| from llama_stack.providers.utils.sqlstore.authorized_sqlstore import AuthorizedSqlStore, SqlRecord
 | |
| from llama_stack.providers.utils.sqlstore.sqlalchemy_sqlstore import SqlAlchemySqlStoreImpl
 | |
| from llama_stack.providers.utils.sqlstore.sqlstore import SqliteSqlStoreConfig
 | |
| 
 | |
| 
 | |
| @patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user")
 | |
| async def test_authorized_fetch_with_where_sql_access_control(mock_get_authenticated_user):
 | |
|     """Test that fetch_all works correctly with where_sql for access control"""
 | |
|     with TemporaryDirectory() as tmp_dir:
 | |
|         db_name = "test_access_control.db"
 | |
|         base_sqlstore = SqlAlchemySqlStoreImpl(
 | |
|             SqliteSqlStoreConfig(
 | |
|                 db_path=tmp_dir + "/" + db_name,
 | |
|             )
 | |
|         )
 | |
|         sqlstore = AuthorizedSqlStore(base_sqlstore, default_policy())
 | |
| 
 | |
|         # Create table with access control
 | |
|         await sqlstore.create_table(
 | |
|             table="documents",
 | |
|             schema={
 | |
|                 "id": ColumnType.INTEGER,
 | |
|                 "title": ColumnType.STRING,
 | |
|                 "content": ColumnType.TEXT,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         admin_user = User("admin-user", {"roles": ["admin"], "teams": ["engineering"]})
 | |
|         regular_user = User("regular-user", {"roles": ["user"], "teams": ["marketing"]})
 | |
| 
 | |
|         # Set user attributes for creating documents
 | |
|         mock_get_authenticated_user.return_value = admin_user
 | |
| 
 | |
|         # Insert documents with access attributes
 | |
|         await sqlstore.insert("documents", {"id": 1, "title": "Admin Document", "content": "This is admin content"})
 | |
| 
 | |
|         # Change user attributes
 | |
|         mock_get_authenticated_user.return_value = regular_user
 | |
| 
 | |
|         await sqlstore.insert("documents", {"id": 2, "title": "User Document", "content": "Public user content"})
 | |
| 
 | |
|         # Test that access control works with where parameter
 | |
|         mock_get_authenticated_user.return_value = admin_user
 | |
| 
 | |
|         # Admin should see both documents
 | |
|         result = await sqlstore.fetch_all("documents", where={"id": 1})
 | |
|         assert len(result.data) == 1
 | |
|         assert result.data[0]["title"] == "Admin Document"
 | |
| 
 | |
|         # User should only see their document
 | |
|         mock_get_authenticated_user.return_value = regular_user
 | |
| 
 | |
|         result = await sqlstore.fetch_all("documents", where={"id": 1})
 | |
|         assert len(result.data) == 0
 | |
| 
 | |
|         result = await sqlstore.fetch_all("documents", where={"id": 2})
 | |
|         assert len(result.data) == 1
 | |
|         assert result.data[0]["title"] == "User Document"
 | |
| 
 | |
|         row = await sqlstore.fetch_one("documents", where={"id": 1})
 | |
|         assert row is None
 | |
| 
 | |
|         row = await sqlstore.fetch_one("documents", where={"id": 2})
 | |
|         assert row is not None
 | |
|         assert row["title"] == "User Document"
 | |
| 
 | |
| 
 | |
| @patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user")
 | |
| async def test_sql_policy_consistency(mock_get_authenticated_user):
 | |
|     """Test that SQL WHERE clause logic exactly matches is_action_allowed policy logic"""
 | |
|     with TemporaryDirectory() as tmp_dir:
 | |
|         db_name = "test_consistency.db"
 | |
|         base_sqlstore = SqlAlchemySqlStoreImpl(
 | |
|             SqliteSqlStoreConfig(
 | |
|                 db_path=tmp_dir + "/" + db_name,
 | |
|             )
 | |
|         )
 | |
|         sqlstore = AuthorizedSqlStore(base_sqlstore, default_policy())
 | |
| 
 | |
|         await sqlstore.create_table(
 | |
|             table="resources",
 | |
|             schema={
 | |
|                 "id": ColumnType.STRING,
 | |
|                 "name": ColumnType.STRING,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         # Test scenarios with different access control patterns
 | |
|         test_scenarios = [
 | |
|             # Scenario 1: Public record (no access control - represents None user insert)
 | |
|             {"id": "1", "name": "public", "access_attributes": None},
 | |
|             # Scenario 2: Record with roles requirement
 | |
|             {"id": "2", "name": "admin-only", "access_attributes": {"roles": ["admin"]}},
 | |
|             # Scenario 3: Record with multiple attribute categories
 | |
|             {"id": "3", "name": "admin-ml-team", "access_attributes": {"roles": ["admin"], "teams": ["ml-team"]}},
 | |
|             # Scenario 4: Record with teams only (missing roles category)
 | |
|             {"id": "4", "name": "ml-team-only", "access_attributes": {"teams": ["ml-team"]}},
 | |
|             # Scenario 5: Record with roles and projects
 | |
|             {
 | |
|                 "id": "5",
 | |
|                 "name": "admin-project-x",
 | |
|                 "access_attributes": {"roles": ["admin"], "projects": ["project-x"]},
 | |
|             },
 | |
|         ]
 | |
| 
 | |
|         mock_get_authenticated_user.return_value = User("test-user", {"roles": ["admin"]})
 | |
|         for scenario in test_scenarios:
 | |
|             await base_sqlstore.insert("resources", scenario)
 | |
| 
 | |
|         # Test with different user configurations
 | |
|         user_scenarios = [
 | |
|             # User 1: No attributes (should only see public records)
 | |
|             {"principal": "user1", "attributes": None},
 | |
|             # User 2: Empty attributes (should only see public records)
 | |
|             {"principal": "user2", "attributes": {}},
 | |
|             # User 3: Admin role only
 | |
|             {"principal": "user3", "attributes": {"roles": ["admin"]}},
 | |
|             # User 4: ML team only
 | |
|             {"principal": "user4", "attributes": {"teams": ["ml-team"]}},
 | |
|             # User 5: Admin + ML team
 | |
|             {"principal": "user5", "attributes": {"roles": ["admin"], "teams": ["ml-team"]}},
 | |
|             # User 6: Admin + Project X
 | |
|             {"principal": "user6", "attributes": {"roles": ["admin"], "projects": ["project-x"]}},
 | |
|             # User 7: Different role (should only see public)
 | |
|             {"principal": "user7", "attributes": {"roles": ["viewer"]}},
 | |
|         ]
 | |
| 
 | |
|         policy = default_policy()
 | |
| 
 | |
|         for user_data in user_scenarios:
 | |
|             user = User(principal=user_data["principal"], attributes=user_data["attributes"])
 | |
|             mock_get_authenticated_user.return_value = user
 | |
| 
 | |
|             sql_results = await sqlstore.fetch_all("resources")
 | |
|             sql_ids = {row["id"] for row in sql_results.data}
 | |
|             policy_ids = set()
 | |
|             for scenario in test_scenarios:
 | |
|                 sql_record = SqlRecord(
 | |
|                     record_id=scenario["id"],
 | |
|                     table_name="resources",
 | |
|                     owner=User(principal="test-user", attributes=scenario["access_attributes"]),
 | |
|                 )
 | |
| 
 | |
|                 if is_action_allowed(policy, Action.READ, sql_record, user):
 | |
|                     policy_ids.add(scenario["id"])
 | |
|             assert sql_ids == policy_ids, (
 | |
|                 f"Consistency failure for user {user.principal} with attributes {user.attributes}:\n"
 | |
|                 f"SQL returned: {sorted(sql_ids)}\n"
 | |
|                 f"Policy allows: {sorted(policy_ids)}\n"
 | |
|                 f"Difference: SQL only: {sql_ids - policy_ids}, Policy only: {policy_ids - sql_ids}"
 | |
|             )
 | |
| 
 | |
| 
 | |
| @patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user")
 | |
| async def test_authorized_store_user_attribute_capture(mock_get_authenticated_user):
 | |
|     """Test that user attributes are properly captured during insert"""
 | |
|     with TemporaryDirectory() as tmp_dir:
 | |
|         db_name = "test_attributes.db"
 | |
|         base_sqlstore = SqlAlchemySqlStoreImpl(
 | |
|             SqliteSqlStoreConfig(
 | |
|                 db_path=tmp_dir + "/" + db_name,
 | |
|             )
 | |
|         )
 | |
|         authorized_store = AuthorizedSqlStore(base_sqlstore, default_policy())
 | |
| 
 | |
|         await authorized_store.create_table(
 | |
|             table="user_data",
 | |
|             schema={
 | |
|                 "id": ColumnType.STRING,
 | |
|                 "content": ColumnType.STRING,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         mock_get_authenticated_user.return_value = User(
 | |
|             "user-with-attrs", {"roles": ["editor"], "teams": ["content"], "projects": ["blog"]}
 | |
|         )
 | |
| 
 | |
|         await authorized_store.insert("user_data", {"id": "item1", "content": "User content"})
 | |
| 
 | |
|         mock_get_authenticated_user.return_value = User("user-no-attrs", None)
 | |
| 
 | |
|         await authorized_store.insert("user_data", {"id": "item2", "content": "Public content"})
 | |
| 
 | |
|         mock_get_authenticated_user.return_value = None
 | |
| 
 | |
|         await authorized_store.insert("user_data", {"id": "item3", "content": "Anonymous content"})
 | |
|         result = await base_sqlstore.fetch_all("user_data", order_by=[("id", "asc")])
 | |
|         assert len(result.data) == 3
 | |
| 
 | |
|         # First item should have full attributes
 | |
|         assert result.data[0]["id"] == "item1"
 | |
|         assert result.data[0]["access_attributes"] == {"roles": ["editor"], "teams": ["content"], "projects": ["blog"]}
 | |
| 
 | |
|         # Second item should have null attributes (user with no attributes)
 | |
|         assert result.data[1]["id"] == "item2"
 | |
|         assert result.data[1]["access_attributes"] is None
 | |
| 
 | |
|         # Third item should have null attributes (no authenticated user)
 | |
|         assert result.data[2]["id"] == "item3"
 | |
|         assert result.data[2]["access_attributes"] is None
 |