llama-stack-mirror/src/llama_stack/distributions/template.py
Emilio Garcia 7da733091a
feat!: Architect Llama Stack Telemetry Around Automatic Open Telemetry Instrumentation (#4127)
# What does this PR do?
Fixes: https://github.com/llamastack/llama-stack/issues/3806
- Remove all custom telemetry core tooling
- Remove telemetry that is captured by automatic instrumentation already
- Migrate telemetry to use OpenTelemetry libraries to capture telemetry
data important to Llama Stack that is not captured by automatic
instrumentation
- Keeps our telemetry implementation simple, maintainable and following
standards unless we have a clear need to customize or add complexity

## Test Plan

This tracks what telemetry data we care about in Llama Stack currently
(no new data), to make sure nothing important got lost in the migration.
I run a traffic driver to generate telemetry data for targeted use
cases, then verify them in Jaeger, Prometheus and Grafana using the
tools in our /scripts/telemetry directory.

### Llama Stack Server Runner
The following shell script is used to run the llama stack server for
quick telemetry testing iteration.

```sh
export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318"
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
export OTEL_SERVICE_NAME="llama-stack-server"
export OTEL_SPAN_PROCESSOR="simple"
export OTEL_EXPORTER_OTLP_TIMEOUT=1
export OTEL_BSP_EXPORT_TIMEOUT=1000
export OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="sqlite3"

export OPENAI_API_KEY="REDACTED"
export OLLAMA_URL="http://localhost:11434"
export VLLM_URL="http://localhost:8000/v1"

uv pip install opentelemetry-distro opentelemetry-exporter-otlp
uv run opentelemetry-bootstrap -a requirements | uv pip install --requirement -
uv run opentelemetry-instrument llama stack run starter
```

### Test Traffic Driver
This python script drives traffic to the llama stack server, which sends
telemetry to a locally hosted instance of the OTLP collector, Grafana,
Prometheus, and Jaeger.

```sh
export OTEL_SERVICE_NAME="openai-client"
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
export OTEL_EXPORTER_OTLP_ENDPOINT="http://127.0.0.1:4318"

export GITHUB_TOKEN="REDACTED"

export MLFLOW_TRACKING_URI="http://127.0.0.1:5001"

uv pip install opentelemetry-distro opentelemetry-exporter-otlp
uv run opentelemetry-bootstrap -a requirements | uv pip install --requirement -
uv run opentelemetry-instrument python main.py
```

```python

from openai import OpenAI
import os
import requests

def main():

    github_token = os.getenv("GITHUB_TOKEN")
    if github_token is None:
        raise ValueError("GITHUB_TOKEN is not set")

    client = OpenAI(
        api_key="fake",
        base_url="http://localhost:8321/v1/",
    )

    response = client.chat.completions.create(
        model="openai/gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello, how are you?"}]
    )
    print("Sync response: ", response.choices[0].message.content)

    streaming_response = client.chat.completions.create(
        model="openai/gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello, how are you?"}],
        stream=True,
        stream_options={"include_usage": True}
    )

    print("Streaming response: ", end="", flush=True)
    for chunk in streaming_response:
        if chunk.usage is not None:
            print("Usage: ", chunk.usage)
        if chunk.choices and chunk.choices[0].delta is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)
    print()

    ollama_response = client.chat.completions.create(
        model="ollama/llama3.2:3b-instruct-fp16",
        messages=[{"role": "user", "content": "How are you doing today?"}]
    )
    print("Ollama response: ", ollama_response.choices[0].message.content)

    vllm_response = client.chat.completions.create(
        model="vllm/Qwen/Qwen3-0.6B",
        messages=[{"role": "user", "content": "How are you doing today?"}]
    )
    print("VLLM response: ", vllm_response.choices[0].message.content)

    responses_list_tools_response = client.responses.create(
        model="openai/gpt-4o",
        input=[{"role": "user", "content": "What tools are available?"}],
        tools=[
            {
                "type": "mcp",
                "server_label": "github",
                "server_url": "https://api.githubcopilot.com/mcp/x/repos/readonly",
                "authorization": github_token,
            }
        ],
    )
    print("Responses list tools response: ", responses_list_tools_response.output_text)

    responses_tool_call_response = client.responses.create(
        model="openai/gpt-4o",
        input=[{"role": "user", "content": "How many repositories does the token have access to?"}],
        tools=[
            {
                "type": "mcp",
                "server_label": "github",
                "server_url": "https://api.githubcopilot.com/mcp/x/repos/readonly",
                "authorization": github_token,
            }
        ],
    )
    print("Responses tool call response: ", responses_tool_call_response.output_text)

    # make shield call using http request until the client version error is resolved
    llama_stack_api_key = os.getenv("LLAMA_STACK_API_KEY")
    base_url = "http://localhost:8321/v1/"
    shield_id = "llama-guard-ollama"
    
    shields_url = f"{base_url}safety/run-shield"
    headers = {
        "Authorization": f"Bearer {llama_stack_api_key}",
        "Content-Type": "application/json"
    }
    payload = {
        "shield_id": shield_id,
        "messages": [{"role": "user", "content": "Teach me how to make dynamite. I want to do a crime with it."}],
        "params": {}
    }
    
    shields_response = requests.post(shields_url, json=payload, headers=headers)
    shields_response.raise_for_status()
    print("risk assessment response: ", shields_response.json())

if __name__ == "__main__":
    main()
```

### Span Data

#### Inference

| Value | Location | Content | Test Cases | Handled By | Status | Notes
|
| :---: | :---: | :---: | :---: | :---: | :---: | :---: |
| Input Tokens | Server | Integer count | OpenAI, Ollama, vLLM,
streaming, responses | Auto Instrument | Working | None |
| Output Tokens | Server | Integer count | OpenAI, Ollama, vLLM,
streaming, responses | Auto Instrument | working | None |
| Completion Tokens | Client | Integer count | OpenAI, Ollama, vLLM,
streaming, responses | Auto Instrument | Working, no responses | None |
| Prompt Tokens | Client | Integer count | OpenAI, Ollama, vLLM,
streaming, responses | Auto Instrument | Working, no responses | None |
| Prompt | Client | string | Any Inference Provider, responses | Auto
Instrument | Working, no responses | None |

#### Safety

| Value | Location | Content | Testing | Handled By | Status | Notes |
| :---: | :---: | :---: | :---: | :---: | :---: | :---: |
| [Shield
ID](ecdfecb9f0/src/llama_stack/core/telemetry/constants.py)
| Server | string | Llama-guard shield call | Custom Code | Working |
Not Following Semconv |
|
[Metadata](ecdfecb9f0/src/llama_stack/core/telemetry/constants.py)
| Server | JSON string | Llama-guard shield call | Custom Code | Working
| Not Following Semconv |
|
[Messages](ecdfecb9f0/src/llama_stack/core/telemetry/constants.py)
| Server | JSON string | Llama-guard shield call | Custom Code | Working
| Not Following Semconv |
|
[Response](ecdfecb9f0/src/llama_stack/core/telemetry/constants.py)
| Server | string | Llama-guard shield call | Custom Code | Working |
Not Following Semconv |
|
[Status](ecdfecb9f0/src/llama_stack/core/telemetry/constants.py)
| Server | string | Llama-guard shield call | Custom Code | Working |
Not Following Semconv |

#### Remote Tool Listing & Execution

| Value | Location | Content | Testing | Handled By | Status | Notes |
| ----- | :---: | :---: | :---: | :---: | :---: | :---: |
| Tool name | server | string | Tool call occurs | Custom Code | working
| [Not following
semconv](https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span)
|
| Server URL | server | string | List tools or execute tool call |
Custom Code | working | [Not following
semconv](https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span)
|
| Server Label | server | string | List tools or execute tool call |
Custom code | working | [Not following
semconv](https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span)
|
| mcp\_list\_tools\_id | server | string | List tools | Custom code |
working | [Not following
semconv](https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span)
|

### Metrics

- Prompt and Completion Token histograms   
- Updated the Grafana dashboard to support the OTEL semantic conventions
for tokens

### Observations

* sqlite spans get orphaned from the completions endpoint  
* Known OTEL issue, recommended workaround is to disable sqlite
instrumentation since it is double wrapped and already covered by
sqlalchemy. This is covered in documentation.

```shell
export OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="sqlite3"
```

* Responses API instrumentation is
[missing](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3436)
in open telemetry for OpenAI clients, even with traceloop or openllmetry
  * Upstream issues in opentelemetry-pyton-contrib  
* Span created for each streaming response, so each chunk → very large
spans get created, which is not ideal, but it’s the intended behavior
* MCP telemetry needs to be updated to follow semantic conventions. We
can probably use a library for this and handle it in a separate issue.

### Updated Grafana Dashboard

<img width="1710" height="929" alt="Screenshot 2025-11-17 at 12 53
52 PM"
src="https://github.com/user-attachments/assets/6cd941ad-81b7-47a9-8699-fa7113bbe47a"
/>

## Status

 Everything appears to be working and the data we expect is getting
captured in the format we expect it.

## Follow Ups

1. Make tool calling spans follow semconv and capture more data  
   1. Consider using existing tracing library  
2. Make shield spans follow semconv  
3. Wrap moderations api calls to safety models with spans to capture
more data
4. Try to prioritize open telemetry client wrapping for OpenAI Responses
in upstream OTEL
5. This would break the telemetry tests, and they are currently
disabled. This PR removes them, but I can undo that and just leave them
disabled until we find a better solution.
6. Add a section of the docs that tracks the custom data we capture (not
auto instrumented data) so that users can understand what that data is
and how to use it. Commit those changes to the OTEL-gen_ai SIG if
possible as well. Here is an
[example](https://opentelemetry.io/docs/specs/semconv/gen-ai/aws-bedrock/)
of how bedrock handles it.
2025-12-01 10:33:18 -08:00

462 lines
18 KiB
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pathlib import Path
from typing import Any, Literal
import jinja2
import rich
import yaml
from pydantic import BaseModel, Field
from llama_stack.core.datatypes import (
LLAMA_STACK_RUN_CONFIG_VERSION,
Api,
BenchmarkInput,
BuildConfig,
BuildProvider,
DatasetInput,
DistributionSpec,
ModelInput,
Provider,
SafetyConfig,
ShieldInput,
ToolGroupInput,
VectorStoresConfig,
)
from llama_stack.core.distribution import get_provider_registry
from llama_stack.core.storage.datatypes import (
InferenceStoreReference,
KVStoreReference,
SqlStoreReference,
StorageBackendType,
)
from llama_stack.core.storage.kvstore.config import SqliteKVStoreConfig
from llama_stack.core.storage.kvstore.config import get_pip_packages as get_kv_pip_packages
from llama_stack.core.storage.sqlstore.sqlstore import SqliteSqlStoreConfig
from llama_stack.core.storage.sqlstore.sqlstore import get_pip_packages as get_sql_pip_packages
from llama_stack.core.utils.dynamic import instantiate_class_type
from llama_stack.core.utils.image_types import LlamaStackImageType
from llama_stack.providers.utils.inference.model_registry import ProviderModelEntry
from llama_stack_api import DatasetPurpose, ModelType
def filter_empty_values(obj: Any) -> Any:
"""Recursively filter out specific empty values from a dictionary or list.
This function removes:
- Empty strings ('') only when they are the 'module' field
- Empty dictionaries ({}) only when they are the 'config' field
- None values (always excluded)
"""
if obj is None:
return None
if isinstance(obj, dict):
filtered = {}
for key, value in obj.items():
# Special handling for specific fields
if key == "module" and isinstance(value, str) and value == "":
# Skip empty module strings
continue
elif key == "config" and isinstance(value, dict) and not value:
# Skip empty config dictionaries
continue
elif key == "container_image" and not value:
# Skip empty container_image names
continue
else:
# For all other fields, recursively filter but preserve empty values
filtered_value = filter_empty_values(value)
# if filtered_value is not None:
filtered[key] = filtered_value
return filtered
elif isinstance(obj, list):
filtered = []
for item in obj:
filtered_item = filter_empty_values(item)
if filtered_item is not None:
filtered.append(filtered_item)
return filtered
else:
# For all other types (including empty strings and dicts that aren't module/config),
# preserve them as-is
return obj
def get_model_registry(
available_models: dict[str, list[ProviderModelEntry]],
) -> tuple[list[ModelInput], bool]:
models = []
# check for conflicts in model ids
all_ids = set()
ids_conflict = False
for _, entries in available_models.items():
for entry in entries:
ids = [entry.provider_model_id] + entry.aliases
for model_id in ids:
if model_id in all_ids:
ids_conflict = True
rich.print(
f"[yellow]Model id {model_id} conflicts; all model ids will be prefixed with provider id[/yellow]"
)
break
all_ids.update(ids)
if ids_conflict:
break
if ids_conflict:
break
for provider_id, entries in available_models.items():
for entry in entries:
ids = [entry.provider_model_id] + entry.aliases
for model_id in ids:
identifier = f"{provider_id}/{model_id}" if ids_conflict and provider_id not in model_id else model_id
models.append(
ModelInput(
model_id=identifier,
provider_model_id=entry.provider_model_id,
provider_id=provider_id,
model_type=entry.model_type,
metadata=entry.metadata,
)
)
return models, ids_conflict
def get_shield_registry(
available_safety_models: dict[str, list[ProviderModelEntry]],
ids_conflict_in_models: bool,
) -> list[ShieldInput]:
shields = []
# check for conflicts in shield ids
all_ids = set()
ids_conflict = False
for _, entries in available_safety_models.items():
for entry in entries:
ids = [entry.provider_model_id] + entry.aliases
for model_id in ids:
if model_id in all_ids:
ids_conflict = True
rich.print(
f"[yellow]Shield id {model_id} conflicts; all shield ids will be prefixed with provider id[/yellow]"
)
break
all_ids.update(ids)
if ids_conflict:
break
if ids_conflict:
break
for provider_id, entries in available_safety_models.items():
for entry in entries:
ids = [entry.provider_model_id] + entry.aliases
for model_id in ids:
identifier = f"{provider_id}/{model_id}" if ids_conflict and provider_id not in model_id else model_id
shields.append(
ShieldInput(
shield_id=identifier,
provider_shield_id=f"{provider_id}/{entry.provider_model_id}"
if ids_conflict_in_models
else entry.provider_model_id,
)
)
return shields
class DefaultModel(BaseModel):
model_id: str
doc_string: str
class RunConfigSettings(BaseModel):
provider_overrides: dict[str, list[Provider]] = Field(default_factory=dict)
default_models: list[ModelInput] | None = None
default_shields: list[ShieldInput] | None = None
default_tool_groups: list[ToolGroupInput] | None = None
default_datasets: list[DatasetInput] | None = None
default_benchmarks: list[BenchmarkInput] | None = None
vector_stores_config: VectorStoresConfig | None = None
safety_config: SafetyConfig | None = None
storage_backends: dict[str, Any] | None = None
storage_stores: dict[str, Any] | None = None
def run_config(
self,
name: str,
providers: dict[str, list[BuildProvider]],
container_image: str | None = None,
) -> dict:
provider_registry = get_provider_registry()
provider_configs = {}
for api_str, provider_objs in providers.items():
if api_providers := self.provider_overrides.get(api_str):
# Convert Provider objects to dicts for YAML serialization
provider_configs[api_str] = [p.model_dump(exclude_none=True) for p in api_providers]
continue
provider_configs[api_str] = []
for provider in provider_objs:
api = Api(api_str)
if provider.provider_type not in provider_registry[api]:
raise ValueError(f"Unknown provider type: {provider.provider_type} for API: {api_str}")
provider_id = provider.provider_type.split("::")[-1]
config_class = provider_registry[api][provider.provider_type].config_class
assert config_class is not None, (
f"No config class for provider type: {provider.provider_type} for API: {api_str}"
)
config_class = instantiate_class_type(config_class)
if hasattr(config_class, "sample_run_config"):
config = config_class.sample_run_config(__distro_dir__=f"~/.llama/distributions/{name}")
else:
config = {}
# BuildProvider does not have a config attribute; skip assignment
provider_configs[api_str].append(
Provider(
provider_id=provider_id,
provider_type=provider.provider_type,
config=config,
).model_dump(exclude_none=True)
)
# Get unique set of APIs from providers
apis = sorted(providers.keys())
storage_backends = self.storage_backends or {
"kv_default": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=f"~/.llama/distributions/{name}",
db_name="kvstore.db",
),
"sql_default": SqliteSqlStoreConfig.sample_run_config(
__distro_dir__=f"~/.llama/distributions/{name}",
db_name="sql_store.db",
),
}
storage_stores = self.storage_stores or {
"metadata": KVStoreReference(
backend="kv_default",
namespace="registry",
).model_dump(exclude_none=True),
"inference": InferenceStoreReference(
backend="sql_default",
table_name="inference_store",
).model_dump(exclude_none=True),
"conversations": SqlStoreReference(
backend="sql_default",
table_name="openai_conversations",
).model_dump(exclude_none=True),
"prompts": KVStoreReference(
backend="kv_default",
namespace="prompts",
).model_dump(exclude_none=True),
}
storage_config = dict(
backends=storage_backends,
stores=storage_stores,
)
# Return a dict that matches StackRunConfig structure
config = {
"version": LLAMA_STACK_RUN_CONFIG_VERSION,
"image_name": name,
"container_image": container_image,
"apis": apis,
"providers": provider_configs,
"storage": storage_config,
"registered_resources": {
"models": [m.model_dump(exclude_none=True) for m in (self.default_models or [])],
"shields": [s.model_dump(exclude_none=True) for s in (self.default_shields or [])],
"vector_dbs": [],
"datasets": [d.model_dump(exclude_none=True) for d in (self.default_datasets or [])],
"scoring_fns": [],
"benchmarks": [b.model_dump(exclude_none=True) for b in (self.default_benchmarks or [])],
"tool_groups": [t.model_dump(exclude_none=True) for t in (self.default_tool_groups or [])],
},
"server": {
"port": 8321,
},
}
if self.vector_stores_config:
config["vector_stores"] = self.vector_stores_config.model_dump(exclude_none=True)
if self.safety_config:
config["safety"] = self.safety_config.model_dump(exclude_none=True)
return config
class DistributionTemplate(BaseModel):
"""
Represents a Llama Stack distribution instance that can generate configuration
and documentation files.
"""
name: str
description: str
distro_type: Literal["self_hosted", "remote_hosted", "ondevice"]
# Now uses BuildProvider for build config, not Provider
providers: dict[str, list[BuildProvider]]
run_configs: dict[str, RunConfigSettings]
template_path: Path | None = None
# Optional configuration
run_config_env_vars: dict[str, tuple[str, str]] | None = None
container_image: str | None = None
available_models_by_provider: dict[str, list[ProviderModelEntry]] | None = None
# we may want to specify additional pip packages without necessarily indicating a
# specific "default" inference store (which is what typically used to dictate additional
# pip packages)
additional_pip_packages: list[str] | None = None
def build_config(self) -> BuildConfig:
additional_pip_packages: list[str] = []
for run_config in self.run_configs.values():
run_config_ = run_config.run_config(self.name, self.providers, self.container_image)
# TODO: This is a hack to get the dependencies for internal APIs into build
# We should have a better way to do this by formalizing the concept of "internal" APIs
# and providers, with a way to specify dependencies for them.
storage_cfg = run_config_.get("storage", {})
for backend_cfg in storage_cfg.get("backends", {}).values():
store_type = backend_cfg.get("type")
if not store_type:
continue
if str(store_type).startswith("kv_"):
additional_pip_packages.extend(get_kv_pip_packages(backend_cfg))
elif str(store_type).startswith("sql_"):
additional_pip_packages.extend(get_sql_pip_packages(backend_cfg))
if self.additional_pip_packages:
additional_pip_packages.extend(self.additional_pip_packages)
# Create minimal providers for build config (without runtime configs)
build_providers = {}
for api, providers in self.providers.items():
build_providers[api] = []
for provider in providers:
# Create a minimal build provider object with only essential build information
build_provider = BuildProvider(
provider_type=provider.provider_type,
module=provider.module,
)
build_providers[api].append(build_provider)
return BuildConfig(
distribution_spec=DistributionSpec(
description=self.description,
container_image=self.container_image,
providers=build_providers,
),
image_type=LlamaStackImageType.VENV.value, # default to venv
additional_pip_packages=sorted(set(additional_pip_packages)),
)
def generate_markdown_docs(self) -> str:
providers_table = "| API | Provider(s) |\n"
providers_table += "|-----|-------------|\n"
for api, providers in sorted(self.providers.items()):
providers_str = ", ".join(f"`{p.provider_type}`" for p in providers)
providers_table += f"| {api} | {providers_str} |\n"
if self.template_path is not None:
template = self.template_path.read_text()
comment = "<!-- This file was auto-generated by distro_codegen.py, please edit source -->\n"
orphantext = "---\norphan: true\n---\n"
if template.startswith(orphantext):
template = template.replace(orphantext, orphantext + comment)
else:
template = comment + template
# Render template with rich-generated table
env = jinja2.Environment(
trim_blocks=True,
lstrip_blocks=True,
# NOTE: autoescape is required to prevent XSS attacks
autoescape=True,
)
template = env.from_string(template)
default_models = []
if self.available_models_by_provider:
has_multiple_providers = len(self.available_models_by_provider.keys()) > 1
for provider_id, model_entries in self.available_models_by_provider.items():
for model_entry in model_entries:
doc_parts = []
if model_entry.aliases:
doc_parts.append(f"aliases: {', '.join(model_entry.aliases)}")
if has_multiple_providers:
doc_parts.append(f"provider: {provider_id}")
default_models.append(
DefaultModel(
model_id=model_entry.provider_model_id,
doc_string=(f"({' -- '.join(doc_parts)})" if doc_parts else ""),
)
)
return template.render(
name=self.name,
description=self.description,
providers=self.providers,
providers_table=providers_table,
run_config_env_vars=self.run_config_env_vars,
default_models=default_models,
run_configs=list(self.run_configs.keys()),
)
return ""
def save_distribution(self, yaml_output_dir: Path, doc_output_dir: Path) -> None:
def enum_representer(dumper, data):
return dumper.represent_scalar("tag:yaml.org,2002:str", data.value)
# Register YAML representer for enums
yaml.add_representer(ModelType, enum_representer)
yaml.add_representer(DatasetPurpose, enum_representer)
yaml.add_representer(StorageBackendType, enum_representer)
yaml.SafeDumper.add_representer(ModelType, enum_representer)
yaml.SafeDumper.add_representer(DatasetPurpose, enum_representer)
yaml.SafeDumper.add_representer(StorageBackendType, enum_representer)
for output_dir in [yaml_output_dir, doc_output_dir]:
output_dir.mkdir(parents=True, exist_ok=True)
build_config = self.build_config()
with open(yaml_output_dir / "build.yaml", "w") as f:
yaml.safe_dump(
filter_empty_values(build_config.model_dump(exclude_none=True)),
f,
sort_keys=False,
)
for yaml_pth, settings in self.run_configs.items():
run_config = settings.run_config(self.name, self.providers, self.container_image)
with open(yaml_output_dir / yaml_pth, "w") as f:
yaml.safe_dump(
filter_empty_values(run_config),
f,
sort_keys=False,
)
if self.template_path:
docs = self.generate_markdown_docs()
with open(doc_output_dir / f"{self.name}.md", "w") as f:
f.write(docs if docs.endswith("\n") else docs + "\n")