feat: Add Kubernetes auth provider to use SelfSubjectReview and kubernetes api server (#2559)

# What does this PR do?
Add Kubernetes authentication provider support
- Add KubernetesAuthProvider class for token validation using Kubernetes
SelfSubjectReview API
- Add KubernetesAuthProviderConfig with configurable API server URL, TLS
settings, and claims mapping
- Implement authentication via POST requests to
/apis/authentication.k8s.io/v1/selfsubjectreviews endpoint
- Add support for parsing Kubernetes SelfSubjectReview response format
to extract user information
- Add KUBERNETES provider type to AuthProviderType enum
- Update create_auth_provider factory function to handle 'kubernetes'
provider type
- Add comprehensive unit tests for KubernetesAuthProvider functionality
- Add documentation with configuration examples and usage instructions

The provider validates tokens by sending SelfSubjectReview requests to
the Kubernetes API server and extracts user information from the
userInfo structure in the response.


<!-- If resolving an issue, uncomment and update the line below -->
<!-- Closes #[issue-number] -->

## Test Plan
<!-- Describe the tests you ran to verify your changes with result
summaries. *Provide clear instructions so the plan can be easily
re-executed.* -->
What This Verifies:
Authentication header validation
Token validation with Kubernetes SelfSubjectReview and kubernetes server
API endpoint
Error handling for invalid tokens and HTTP errors
Request payload structure and headers

```
python -m pytest tests/unit/server/test_auth.py -k "kubernetes" -v
```

Signed-off-by: Akram Ben Aissi <akram.benaissi@gmail.com>
This commit is contained in:
Akram Ben Aissi 2025-09-08 11:25:10 +02:00 committed by GitHub
parent 44e1a40595
commit 072dca0609
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 311 additions and 4 deletions

View file

@ -8,16 +8,18 @@ import ssl
import time
from abc import ABC, abstractmethod
from asyncio import Lock
from urllib.parse import parse_qs, urlparse
from urllib.parse import parse_qs, urljoin, urlparse
import httpx
from jose import jwt
from pydantic import BaseModel, Field
from llama_stack.apis.common.errors import TokenValidationError
from llama_stack.core.datatypes import (
AuthenticationConfig,
CustomAuthConfig,
GitHubTokenAuthConfig,
KubernetesAuthProviderConfig,
OAuth2TokenAuthConfig,
User,
)
@ -162,7 +164,7 @@ class OAuth2TokenAuthProvider(AuthProvider):
auth=auth,
timeout=10.0, # Add a reasonable timeout
)
if response.status_code != 200:
if response.status_code != httpx.codes.OK:
logger.warning(f"Token introspection failed with status code: {response.status_code}")
raise ValueError(f"Token introspection failed: {response.status_code}")
@ -272,7 +274,7 @@ class CustomAuthProvider(AuthProvider):
json=auth_request.model_dump(),
timeout=10.0, # Add a reasonable timeout
)
if response.status_code != 200:
if response.status_code != httpx.codes.OK:
logger.warning(f"Authentication failed with status code: {response.status_code}")
raise ValueError(f"Authentication failed: {response.status_code}")
@ -374,6 +376,89 @@ async def _get_github_user_info(access_token: str, github_api_base_url: str) ->
}
class KubernetesAuthProvider(AuthProvider):
"""
Kubernetes authentication provider that validates tokens using the Kubernetes SelfSubjectReview API.
This provider integrates with Kubernetes API server by using the
/apis/authentication.k8s.io/v1/selfsubjectreviews endpoint to validate tokens and extract user information.
"""
def __init__(self, config: KubernetesAuthProviderConfig):
self.config = config
def _httpx_verify_value(self) -> bool | str:
"""
Build the value for httpx's `verify` parameter.
- False disables verification.
- Path string points to a CA bundle.
- True uses system defaults.
"""
if not self.config.verify_tls:
return False
if self.config.tls_cafile:
return self.config.tls_cafile.as_posix()
return True
async def validate_token(self, token: str, scope: dict | None = None) -> User:
"""Validate a token using Kubernetes SelfSubjectReview API endpoint."""
# Build the Kubernetes SelfSubjectReview API endpoint URL
review_api_url = urljoin(self.config.api_server_url, "/apis/authentication.k8s.io/v1/selfsubjectreviews")
# Create SelfSubjectReview request body
review_request = {"apiVersion": "authentication.k8s.io/v1", "kind": "SelfSubjectReview"}
verify = self._httpx_verify_value()
try:
async with httpx.AsyncClient(verify=verify, timeout=10.0) as client:
response = await client.post(
review_api_url,
json=review_request,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
)
if response.status_code == httpx.codes.UNAUTHORIZED:
raise TokenValidationError("Invalid token")
if response.status_code != httpx.codes.CREATED:
logger.warning(f"Kubernetes SelfSubjectReview API failed with status code: {response.status_code}")
raise TokenValidationError(f"Token validation failed: {response.status_code}")
review_response = response.json()
# Extract user information from SelfSubjectReview response
status = review_response.get("status", {})
if not status:
raise ValueError("No status found in SelfSubjectReview response")
user_info = status.get("userInfo", {})
if not user_info:
raise ValueError("No userInfo found in SelfSubjectReview response")
username = user_info.get("username")
if not username:
raise ValueError("No username found in SelfSubjectReview response")
# Build user attributes from Kubernetes user info
user_attributes = get_attributes_from_claims(user_info, self.config.claims_mapping)
return User(
principal=username,
attributes=user_attributes,
)
except httpx.TimeoutException:
logger.warning("Kubernetes SelfSubjectReview API request timed out")
raise ValueError("Token validation timeout") from None
except Exception as e:
logger.warning(f"Error during token validation: {str(e)}")
raise ValueError(f"Token validation error: {str(e)}") from e
async def close(self):
"""Close any resources."""
pass
def create_auth_provider(config: AuthenticationConfig) -> AuthProvider:
"""Factory function to create the appropriate auth provider."""
provider_config = config.provider_config
@ -384,5 +469,7 @@ def create_auth_provider(config: AuthenticationConfig) -> AuthProvider:
return OAuth2TokenAuthProvider(provider_config)
elif isinstance(provider_config, GitHubTokenAuthConfig):
return GitHubTokenAuthProvider(provider_config)
elif isinstance(provider_config, KubernetesAuthProviderConfig):
return KubernetesAuthProvider(provider_config)
else:
raise ValueError(f"Unknown authentication provider config type: {type(provider_config)}")