llama-stack-mirror/tests/unit/server/test_auth.py
Ashwin Bharambe 5b39d5a76a
feat(auth, rfc): Add support for Bearer (api_key) Authentication (#1626)
This PR adds support (or is a proposal for) for supporting API KEY
authentication on the Llama Stack server end. `llama-stack-client`
already supports accepting an api_key parameter and passes it down
through every request as an `Authentication: ` header.

Currently, Llama Stack does not propose APIs for handling authentication
or authorization for resources of any kind. Given that, and the fact
that any deployment will typically have _some_ authentication system
present, we simply adopt a delegation mechanism: delegate to an HTTPS
endpoint performing key management / authentication.

It is configured via: 
```yaml
server: 
   auth:
     endpoint: <...>
```

in the run.yaml configuration.


## How It Works

When authentication is enabled:

1. Every API request must include an `Authorization: Bearer <token>`
header
2. The server will send a _POST_ validation request to the configured
endpoint with the following payload:
   ```json
   {
     "api_key": "<token>",
     "request": {
       "path": "/api/path",
       "headers": { "header1": "value1", ... },
       "params": { "param1": "value1", ... }
     }
   }
   ```
3. If the authentication endpoint returns a 200 status code, the request
is allowed to proceed
4. If the authentication endpoint returns any other status code, a 401
Unauthorized response is returned

## Test Plan

Unit tests
2025-03-18 16:24:18 -07:00

124 lines
3.8 KiB
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from llama_stack.distribution.server.auth import AuthenticationMiddleware
@pytest.fixture
def mock_auth_endpoint():
return "http://mock-auth-service/validate"
@pytest.fixture
def valid_api_key():
return "valid_api_key_12345"
@pytest.fixture
def invalid_api_key():
return "invalid_api_key_67890"
@pytest.fixture
def app(mock_auth_endpoint):
app = FastAPI()
app.add_middleware(AuthenticationMiddleware, auth_endpoint=mock_auth_endpoint)
@app.get("/test")
def test_endpoint():
return {"message": "Authentication successful"}
return app
@pytest.fixture
def client(app):
return TestClient(app)
async def mock_post_success(*args, **kwargs):
mock_response = AsyncMock()
mock_response.status_code = 200
return mock_response
async def mock_post_failure(*args, **kwargs):
mock_response = AsyncMock()
mock_response.status_code = 401
return mock_response
async def mock_post_exception(*args, **kwargs):
raise Exception("Connection error")
def test_missing_auth_header(client):
response = client.get("/test")
assert response.status_code == 401
assert "Missing or invalid Authorization header" in response.json()["error"]["message"]
def test_invalid_auth_header_format(client):
response = client.get("/test", headers={"Authorization": "InvalidFormat token123"})
assert response.status_code == 401
assert "Missing or invalid Authorization header" in response.json()["error"]["message"]
@patch("httpx.AsyncClient.post", new=mock_post_success)
def test_valid_authentication(client, valid_api_key):
response = client.get("/test", headers={"Authorization": f"Bearer {valid_api_key}"})
assert response.status_code == 200
assert response.json() == {"message": "Authentication successful"}
@patch("httpx.AsyncClient.post", new=mock_post_failure)
def test_invalid_authentication(client, invalid_api_key):
response = client.get("/test", headers={"Authorization": f"Bearer {invalid_api_key}"})
assert response.status_code == 401
assert "Authentication failed" in response.json()["error"]["message"]
@patch("httpx.AsyncClient.post", new=mock_post_exception)
def test_auth_service_error(client, valid_api_key):
response = client.get("/test", headers={"Authorization": f"Bearer {valid_api_key}"})
assert response.status_code == 401
assert "Authentication service error" in response.json()["error"]["message"]
def test_auth_request_payload(client, valid_api_key, mock_auth_endpoint):
with patch("httpx.AsyncClient.post") as mock_post:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_post.return_value = mock_response
client.get(
"/test?param1=value1&param2=value2",
headers={
"Authorization": f"Bearer {valid_api_key}",
"User-Agent": "TestClient",
"Content-Type": "application/json",
},
)
# Check that the auth endpoint was called with the correct payload
call_args = mock_post.call_args
assert call_args is not None
url, kwargs = call_args[0][0], call_args[1]
assert url == mock_auth_endpoint
payload = kwargs["json"]
assert payload["api_key"] == valid_api_key
assert payload["request"]["path"] == "/test"
assert "authorization" in payload["request"]["headers"]
assert "param1" in payload["request"]["params"]
assert "param2" in payload["request"]["params"]