feat(auth): allow token to be provided for use against jwks endpoint

Though the jwks endpoint does not usually require authentication, it does by
default in a kubernetes cluster. The cluster can be configured to allow anonymous
access to that endpoint, but by allowing a token to be presented that cluster
configuration is not necessary.
This commit is contained in:
Gordon Sim 2025-06-04 19:12:15 +01:00
parent c8c742ba45
commit 74d891db72
3 changed files with 57 additions and 24 deletions

View file

@ -345,6 +345,56 @@ def test_invalid_oauth2_authentication(oauth2_client, invalid_token):
assert "Invalid JWT token" in response.json()["error"]["message"]
async def mock_auth_jwks_response(*args, **kwargs):
if "headers" not in kwargs or "Authorization" not in kwargs["headers"]:
return MockResponse(401, {})
authz = kwargs["headers"]["Authorization"]
if authz != "Bearer abcdefg":
return MockResponse(401, {})
return await mock_jwks_response(args, kwargs)
@pytest.fixture
def oauth2_app_with_jwks_token():
app = FastAPI()
auth_config = AuthenticationConfig(
provider_type=AuthProviderType.OAUTH2_TOKEN,
config={
"jwks": {
"uri": "http://mock-authz-service/token/introspect",
"key_recheck_period": "3600",
"token": "abcdefg",
},
"audience": "llama-stack",
},
)
app.add_middleware(AuthenticationMiddleware, auth_config=auth_config)
@app.get("/test")
def test_endpoint():
return {"message": "Authentication successful"}
return app
@pytest.fixture
def oauth2_client_with_jwks_token(oauth2_app_with_jwks_token):
return TestClient(oauth2_app_with_jwks_token)
@patch("httpx.AsyncClient.get", new=mock_auth_jwks_response)
def test_oauth2_with_jwks_token_expected(oauth2_client, jwt_token_valid):
response = oauth2_client.get("/test", headers={"Authorization": f"Bearer {jwt_token_valid}"})
assert response.status_code == 401
@patch("httpx.AsyncClient.get", new=mock_auth_jwks_response)
def test_oauth2_with_jwks_token_configured(oauth2_client_with_jwks_token, jwt_token_valid):
response = oauth2_client_with_jwks_token.get("/test", headers={"Authorization": f"Bearer {jwt_token_valid}"})
assert response.status_code == 200
assert response.json() == {"message": "Authentication successful"}
def test_get_attributes_from_claims():
claims = {
"sub": "my-user",