mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-27 23:22:01 +00:00
feat: Extend the oauth_token provider to allow for token introspection
This may be desired in order to reject revoked tokens or where opaque tokens are used. Signed-off-by: Gordon Sim <gsim@redhat.com>
This commit is contained in:
parent
87a4b9cb28
commit
7ee1ae0a3d
3 changed files with 251 additions and 13 deletions
|
|
@ -396,8 +396,10 @@ def oauth2_app():
|
|||
auth_config = AuthProviderConfig(
|
||||
provider_type=AuthProviderType.OAUTH2_TOKEN,
|
||||
config={
|
||||
"jwks_uri": "http://mock-authz-service/token/introspect",
|
||||
"cache_ttl": "3600",
|
||||
"jwks": {
|
||||
"uri": "http://mock-authz-service/token/introspect",
|
||||
"cache_ttl": "3600",
|
||||
},
|
||||
"audience": "llama-stack",
|
||||
},
|
||||
)
|
||||
|
|
@ -517,3 +519,159 @@ def test_get_attributes_from_claims():
|
|||
|
||||
|
||||
# TODO: add more tests for oauth2 token provider
|
||||
|
||||
|
||||
# oauth token introspection tests
|
||||
@pytest.fixture
|
||||
def mock_introspection_endpoint():
|
||||
return "http://mock-authz-service/token/introspect"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def introspection_app(mock_introspection_endpoint):
|
||||
app = FastAPI()
|
||||
auth_config = AuthProviderConfig(
|
||||
provider_type=AuthProviderType.OAUTH2_TOKEN,
|
||||
config={
|
||||
"jwks": None,
|
||||
"introspection": {"url": mock_introspection_endpoint, "client_id": "myclient", "client_secret": "abcdefg"},
|
||||
},
|
||||
)
|
||||
app.add_middleware(AuthenticationMiddleware, auth_config=auth_config)
|
||||
|
||||
@app.get("/test")
|
||||
def test_endpoint():
|
||||
return {"message": "Authentication successful"}
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def introspection_app_with_custom_mapping(mock_introspection_endpoint):
|
||||
app = FastAPI()
|
||||
auth_config = AuthProviderConfig(
|
||||
provider_type=AuthProviderType.OAUTH2_TOKEN,
|
||||
config={
|
||||
"jwks": None,
|
||||
"introspection": {
|
||||
"url": mock_introspection_endpoint,
|
||||
"client_id": "myclient",
|
||||
"client_secret": "abcdefg",
|
||||
"send_secret_in_body": "true",
|
||||
},
|
||||
"claims_mapping": {
|
||||
"sub": "roles",
|
||||
"scope": "roles",
|
||||
"groups": "teams",
|
||||
"aud": "namespaces",
|
||||
},
|
||||
},
|
||||
)
|
||||
app.add_middleware(AuthenticationMiddleware, auth_config=auth_config)
|
||||
|
||||
@app.get("/test")
|
||||
def test_endpoint():
|
||||
return {"message": "Authentication successful"}
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def introspection_client(introspection_app):
|
||||
return TestClient(introspection_app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def introspection_client_with_custom_mapping(introspection_app_with_custom_mapping):
|
||||
return TestClient(introspection_app_with_custom_mapping)
|
||||
|
||||
|
||||
def test_missing_auth_header_introspection(introspection_client):
|
||||
response = introspection_client.get("/test")
|
||||
assert response.status_code == 401
|
||||
assert "Missing or invalid Authorization header" in response.json()["error"]["message"]
|
||||
|
||||
|
||||
def test_invalid_auth_header_format_introspection(introspection_client):
|
||||
response = introspection_client.get("/test", headers={"Authorization": "InvalidFormat token123"})
|
||||
assert response.status_code == 401
|
||||
assert "Missing or invalid Authorization header" in response.json()["error"]["message"]
|
||||
|
||||
|
||||
async def mock_introspection_active(*args, **kwargs):
|
||||
return MockResponse(
|
||||
200,
|
||||
{
|
||||
"active": True,
|
||||
"sub": "my-user",
|
||||
"groups": ["group1", "group2"],
|
||||
"scope": "foo bar",
|
||||
"aud": ["set1", "set2"],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def mock_introspection_inactive(*args, **kwargs):
|
||||
return MockResponse(
|
||||
200,
|
||||
{
|
||||
"active": False,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def mock_introspection_invalid(*args, **kwargs):
|
||||
class InvalidResponse:
|
||||
def __init__(self, status_code):
|
||||
self.status_code = status_code
|
||||
|
||||
def json(self):
|
||||
raise ValueError("Not JSON")
|
||||
|
||||
return InvalidResponse(200)
|
||||
|
||||
|
||||
async def mock_introspection_failed(*args, **kwargs):
|
||||
return MockResponse(
|
||||
500,
|
||||
{},
|
||||
)
|
||||
|
||||
|
||||
@patch("httpx.AsyncClient.post", new=mock_introspection_active)
|
||||
def test_valid_introspection_authentication(introspection_client, valid_api_key):
|
||||
response = introspection_client.get("/test", headers={"Authorization": f"Bearer {valid_api_key}"})
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Authentication successful"}
|
||||
|
||||
|
||||
@patch("httpx.AsyncClient.post", new=mock_introspection_inactive)
|
||||
def test_inactive_introspection_authentication(introspection_client, invalid_api_key):
|
||||
response = introspection_client.get("/test", headers={"Authorization": f"Bearer {invalid_api_key}"})
|
||||
assert response.status_code == 401
|
||||
assert "Token not active" in response.json()["error"]["message"]
|
||||
|
||||
|
||||
@patch("httpx.AsyncClient.post", new=mock_introspection_invalid)
|
||||
def test_invalid_introspection_authentication(introspection_client, invalid_api_key):
|
||||
response = introspection_client.get("/test", headers={"Authorization": f"Bearer {invalid_api_key}"})
|
||||
assert response.status_code == 401
|
||||
assert "Not JSON" in response.json()["error"]["message"]
|
||||
|
||||
|
||||
@patch("httpx.AsyncClient.post", new=mock_introspection_failed)
|
||||
def test_failed_introspection_authentication(introspection_client, invalid_api_key):
|
||||
response = introspection_client.get("/test", headers={"Authorization": f"Bearer {invalid_api_key}"})
|
||||
assert response.status_code == 401
|
||||
assert "Token introspection failed: 500" in response.json()["error"]["message"]
|
||||
|
||||
|
||||
@patch("httpx.AsyncClient.post", new=mock_introspection_active)
|
||||
def test_valid_introspection_with_custom_mapping_authentication(
|
||||
introspection_client_with_custom_mapping, valid_api_key
|
||||
):
|
||||
response = introspection_client_with_custom_mapping.get(
|
||||
"/test", headers={"Authorization": f"Bearer {valid_api_key}"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Authentication successful"}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue