add observability to the recording system

This commit is contained in:
Ashwin Bharambe 2025-10-16 19:27:17 -07:00
parent 1fc0fd5935
commit 483ae9e918
3 changed files with 462 additions and 11 deletions

View file

@ -42,3 +42,8 @@ def sync_test_context_from_provider_data():
return TEST_CONTEXT.set(provider_data["__test_id"])
return None
def is_debug_mode() -> bool:
"""Check if test recording debug mode is enabled via LLAMA_STACK_TEST_DEBUG env var."""
return os.environ.get("LLAMA_STACK_TEST_DEBUG", "").lower() in ("1", "true", "yes")

View file

@ -37,7 +37,7 @@ _id_counters: dict[str, dict[str, int]] = {}
# Test context uses ContextVar since it changes per-test and needs async isolation
from openai.types.completion_choice import CompletionChoice
from llama_stack.core.testing_context import get_test_context
from llama_stack.core.testing_context import get_test_context, is_debug_mode
# update the "finish_reason" field, since its type definition is wrong (no None is accepted)
CompletionChoice.model_fields["finish_reason"].annotation = Literal["stop", "length", "content_filter"] | None
@ -146,6 +146,7 @@ def normalize_inference_request(method: str, url: str, headers: dict[str, Any],
body_for_hash = _normalize_body_for_hash(body)
test_id = get_test_context()
normalized: dict[str, Any] = {
"method": method.upper(),
"endpoint": parsed.path,
@ -154,10 +155,20 @@ def normalize_inference_request(method: str, url: str, headers: dict[str, Any],
# Include test_id for isolation, except for shared infrastructure endpoints
if parsed.path not in ("/api/tags", "/v1/models"):
normalized["test_id"] = get_test_context()
normalized["test_id"] = test_id
normalized_json = json.dumps(normalized, sort_keys=True)
return hashlib.sha256(normalized_json.encode()).hexdigest()
request_hash = hashlib.sha256(normalized_json.encode()).hexdigest()
if is_debug_mode():
logger.info("[RECORDING DEBUG] Hash computation:")
logger.info(f" Test ID: {test_id}")
logger.info(f" Method: {method.upper()}")
logger.info(f" Endpoint: {parsed.path}")
logger.info(f" Model: {body.get('model', 'N/A')}")
logger.info(f" Computed hash: {request_hash}")
return request_hash
def normalize_tool_request(provider_name: str, tool_name: str, kwargs: dict[str, Any]) -> str:
@ -212,6 +223,11 @@ def patch_httpx_for_test_id():
provider_data["__test_id"] = test_id
request.headers["X-LlamaStack-Provider-Data"] = json.dumps(provider_data)
if is_debug_mode():
logger.info("[RECORDING DEBUG] Injected test ID into request header:")
logger.info(f" Test ID: {test_id}")
logger.info(f" URL: {request.url}")
return None
LlamaStackClient._prepare_request = patched_prepare_request
@ -355,18 +371,35 @@ class ResponseStorage:
test_file = test_id.split("::")[0] # Remove test function part
test_dir = Path(test_file).parent # Get parent directory
# Make it absolute by resolving against base_dir
# If base_dir is absolute, use it as the root, otherwise resolve relative to cwd
if self.base_dir.is_absolute():
# base_dir is something like /app/llama-stack-source/tests/integration/common
# We need to go up to the repo root and then back down to the test dir
repo_root = self.base_dir.parent.parent.parent # go up from common -> integration -> tests -> repo
return repo_root / test_dir / "recordings"
repo_root = self.base_dir.parent.parent.parent
result = repo_root / test_dir / "recordings"
if is_debug_mode():
logger.info("[RECORDING DEBUG] Path resolution (absolute base_dir):")
logger.info(f" Test ID: {test_id}")
logger.info(f" Base dir: {self.base_dir}")
logger.info(f" Repo root: {repo_root}")
logger.info(f" Test file: {test_file}")
logger.info(f" Test dir: {test_dir}")
logger.info(f" Recordings dir: {result}")
return result
else:
return test_dir / "recordings"
result = test_dir / "recordings"
if is_debug_mode():
logger.info("[RECORDING DEBUG] Path resolution (relative base_dir):")
logger.info(f" Test ID: {test_id}")
logger.info(f" Base dir: {self.base_dir}")
logger.info(f" Test dir: {test_dir}")
logger.info(f" Recordings dir: {result}")
return result
else:
# Fallback for non-test contexts
return self.base_dir / "recordings"
result = self.base_dir / "recordings"
if is_debug_mode():
logger.info("[RECORDING DEBUG] Path resolution (no test context):")
logger.info(f" Base dir: {self.base_dir}")
logger.info(f" Recordings dir: {result}")
return result
def _ensure_directory(self):
"""Ensure test-specific directories exist."""
@ -401,6 +434,13 @@ class ResponseStorage:
response_path = responses_dir / response_file
if is_debug_mode():
logger.info("[RECORDING DEBUG] Storing recording:")
logger.info(f" Request hash: {request_hash}")
logger.info(f" File: {response_path}")
logger.info(f" Test ID: {get_test_context()}")
logger.info(f" Endpoint: {endpoint}")
# Save response to JSON file with metadata
with open(response_path, "w") as f:
json.dump(
@ -429,16 +469,33 @@ class ResponseStorage:
test_dir = self._get_test_dir()
response_path = test_dir / response_file
if is_debug_mode():
logger.info("[RECORDING DEBUG] Looking up recording:")
logger.info(f" Request hash: {request_hash}")
logger.info(f" Primary path: {response_path}")
logger.info(f" Primary exists: {response_path.exists()}")
if response_path.exists():
if is_debug_mode():
logger.info(" Found in primary location")
return _recording_from_file(response_path)
# Fallback to base recordings directory (for session-level recordings)
fallback_dir = self.base_dir / "recordings"
fallback_path = fallback_dir / response_file
if is_debug_mode():
logger.info(f" Fallback path: {fallback_path}")
logger.info(f" Fallback exists: {fallback_path.exists()}")
if fallback_path.exists():
if is_debug_mode():
logger.info(" Found in fallback location")
return _recording_from_file(fallback_path)
if is_debug_mode():
logger.info(" Recording not found in either location")
return None
def _model_list_responses(self, request_hash: str) -> list[dict[str, Any]]:
@ -594,6 +651,13 @@ async def _patched_inference_method(original_method, self, client_type, endpoint
mode = _current_mode
storage = _current_storage
if is_debug_mode():
logger.info("[RECORDING DEBUG] Entering inference method:")
logger.info(f" Mode: {mode}")
logger.info(f" Client type: {client_type}")
logger.info(f" Endpoint: {endpoint}")
logger.info(f" Test context: {get_test_context()}")
if mode == APIRecordingMode.LIVE or storage is None:
if endpoint == "/v1/models":
return original_method(self, *args, **kwargs)
@ -649,6 +713,18 @@ async def _patched_inference_method(original_method, self, client_type, endpoint
return response_body
elif mode == APIRecordingMode.REPLAY:
# REPLAY mode requires recording to exist
if is_debug_mode():
logger.error("[RECORDING DEBUG] Recording not found!")
logger.error(f" Mode: {mode}")
logger.error(f" Request hash: {request_hash}")
logger.error(f" Method: {method}")
logger.error(f" URL: {url}")
logger.error(f" Endpoint: {endpoint}")
logger.error(f" Model: {body.get('model', 'unknown')}")
logger.error(f" Test context: {get_test_context()}")
logger.error(
f" Stack config type: {os.environ.get('LLAMA_STACK_TEST_STACK_CONFIG_TYPE', 'library_client')}"
)
raise RuntimeError(
f"Recording not found for request hash: {request_hash}\n"
f"Model: {body.get('model', 'unknown')} | Request: {method} {url}\n"