Merge 44ac023755 into sapling-pr-archive-ehhuang

This commit is contained in:
ehhuang 2025-10-14 13:57:10 -07:00 committed by GitHub
commit 7c86996fb9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 95 additions and 867 deletions

View file

@ -10,58 +10,8 @@ import TabItem from '@theme/TabItem';
# Telemetry
The Llama Stack telemetry system provides comprehensive tracing, metrics, and logging capabilities. It supports multiple sink types including OpenTelemetry, SQLite, and Console output for complete observability of your AI applications.
The Llama Stack uses OpenTelemetry to provide comprehensive tracing, metrics, and logging capabilities.
## Event Types
The telemetry system supports three main types of events:
<Tabs>
<TabItem value="unstructured" label="Unstructured Logs">
Free-form log messages with severity levels for general application logging:
```python
unstructured_log_event = UnstructuredLogEvent(
message="This is a log message",
severity=LogSeverity.INFO
)
```
</TabItem>
<TabItem value="metrics" label="Metric Events">
Numerical measurements with units for tracking performance and usage:
```python
metric_event = MetricEvent(
metric="my_metric",
value=10,
unit="count"
)
```
</TabItem>
<TabItem value="structured" label="Structured Logs">
System events like span start/end that provide structured operation tracking:
```python
structured_log_event = SpanStartPayload(
name="my_span",
parent_span_id="parent_span_id"
)
```
</TabItem>
</Tabs>
## Spans and Traces
- **Spans**: Represent individual operations with timing information and hierarchical relationships
- **Traces**: Collections of related spans that form a complete request flow across your application
This hierarchical structure allows you to understand the complete execution path of requests through your Llama Stack application.
## Automatic Metrics Generation
@ -129,21 +79,6 @@ Send events to an OpenTelemetry Collector for integration with observability pla
- Compatible with all OpenTelemetry collectors
- Supports both traces and metrics
</TabItem>
<TabItem value="sqlite" label="SQLite">
Store events in a local SQLite database for direct querying:
**Use Cases:**
- Local development and debugging
- Custom analytics and reporting
- Offline analysis of application behavior
**Features:**
- Direct SQL querying capabilities
- Persistent local storage
- No external dependencies
</TabItem>
<TabItem value="console" label="Console">
@ -174,9 +109,8 @@ telemetry:
provider_type: inline::meta-reference
config:
service_name: "llama-stack-service"
sinks: ['console', 'sqlite', 'otel_trace', 'otel_metric']
sinks: ['console', 'otel_trace', 'otel_metric']
otel_exporter_otlp_endpoint: "http://localhost:4318"
sqlite_db_path: "/path/to/telemetry.db"
```
### Environment Variables
@ -185,7 +119,7 @@ Configure telemetry behavior using environment variables:
- **`OTEL_EXPORTER_OTLP_ENDPOINT`**: OpenTelemetry Collector endpoint (default: `http://localhost:4318`)
- **`OTEL_SERVICE_NAME`**: Service name for telemetry (default: empty string)
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `console,sqlite`)
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `console`)
### Quick Setup: Complete Telemetry Stack
@ -248,37 +182,10 @@ Forward metrics to other observability systems:
</TabItem>
</Tabs>
## SQLite Querying
The `sqlite` sink allows you to query traces without an external system. This is particularly useful for development and custom analytics.
### Example Queries
```sql
-- Query recent traces
SELECT * FROM traces WHERE timestamp > datetime('now', '-1 hour');
-- Analyze span durations
SELECT name, AVG(duration_ms) as avg_duration
FROM spans
GROUP BY name
ORDER BY avg_duration DESC;
-- Find slow operations
SELECT * FROM spans
WHERE duration_ms > 1000
ORDER BY duration_ms DESC;
```
:::tip[Advanced Analytics]
Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stack/blob/main/docs/getting_started.ipynb) for more examples on querying traces and spans programmatically.
:::
## Best Practices
### 🔍 **Monitoring Strategy**
- Use OpenTelemetry for production environments
- Combine multiple sinks for development (console + SQLite)
- Set up alerts on key metrics like token usage and error rates
### 📊 **Metrics Analysis**
@ -293,45 +200,8 @@ Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stac
### 🔧 **Configuration Management**
- Use environment variables for flexible deployment
- Configure appropriate retention policies for SQLite
- Ensure proper network access to OpenTelemetry collectors
## Integration Examples
### Basic Telemetry Setup
```python
from llama_stack_client import LlamaStackClient
# Client with telemetry headers
client = LlamaStackClient(
base_url="http://localhost:8000",
extra_headers={
"X-Telemetry-Service": "my-ai-app",
"X-Telemetry-Version": "1.0.0"
}
)
# All API calls will be automatically traced
response = client.chat.completions.create(
model="meta-llama/Llama-3.2-3B-Instruct",
messages=[{"role": "user", "content": "Hello!"}]
)
```
### Custom Telemetry Context
```python
# Add custom span attributes for better tracking
with tracer.start_as_current_span("custom_operation") as span:
span.set_attribute("user_id", "user123")
span.set_attribute("operation_type", "chat_completion")
response = client.chat.completions.create(
model="meta-llama/Llama-3.2-3B-Instruct",
messages=[{"role": "user", "content": "Hello!"}]
)
```
## Related Resources

View file

@ -16,14 +16,12 @@ Meta's reference implementation of telemetry and observability using OpenTelemet
|-------|------|----------|---------|-------------|
| `otel_exporter_otlp_endpoint` | `str \| None` | No | | The OpenTelemetry collector endpoint URL (base URL for traces, metrics, and logs). If not set, the SDK will use OTEL_EXPORTER_OTLP_ENDPOINT environment variable. |
| `service_name` | `<class 'str'>` | No | | The service name to use for telemetry |
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [&lt;TelemetrySink.SQLITE: 'sqlite'&gt;] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console) |
| `sqlite_db_path` | `<class 'str'>` | No | ~/.llama/runtime/trace_store.db | The path to the SQLite database to use for storing traces |
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console) |
## Sample Configuration
```yaml
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
```

View file

@ -159,8 +159,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
post_training:
- provider_id: torchtune-cpu

View file

@ -50,8 +50,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -46,8 +46,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -61,8 +61,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -51,8 +51,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -53,8 +53,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: nvidia

View file

@ -48,8 +48,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: nvidia

View file

@ -81,8 +81,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -159,8 +159,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter-gpu}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
post_training:
- provider_id: huggingface-gpu

View file

@ -159,8 +159,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
post_training:
- provider_id: torchtune-cpu

View file

@ -46,8 +46,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/watsonx}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -9,13 +9,10 @@ from typing import Any
from pydantic import BaseModel, Field, field_validator
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
class TelemetrySink(StrEnum):
OTEL_TRACE = "otel_trace"
OTEL_METRIC = "otel_metric"
SQLITE = "sqlite"
CONSOLE = "console"
@ -30,12 +27,8 @@ class TelemetryConfig(BaseModel):
description="The service name to use for telemetry",
)
sinks: list[TelemetrySink] = Field(
default=[TelemetrySink.SQLITE],
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console)",
)
sqlite_db_path: str = Field(
default_factory=lambda: (RUNTIME_BASE_DIR / "trace_store.db").as_posix(),
description="The path to the SQLite database to use for storing traces",
default=[],
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console)",
)
@field_validator("sinks", mode="before")
@ -43,13 +36,12 @@ class TelemetryConfig(BaseModel):
def validate_sinks(cls, v):
if isinstance(v, str):
return [TelemetrySink(sink.strip()) for sink in v.split(",")]
return v
return v or []
@classmethod
def sample_run_config(cls, __distro_dir__: str, db_name: str = "trace_store.db") -> dict[str, Any]:
return {
"service_name": "${env.OTEL_SERVICE_NAME:=\u200b}",
"sinks": "${env.TELEMETRY_SINKS:=sqlite}",
"sqlite_db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
"sinks": "${env.TELEMETRY_SINKS:=}",
"otel_exporter_otlp_endpoint": "${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}",
}

View file

@ -1,190 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
import os
import sqlite3
import threading
from datetime import UTC, datetime
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace import Span
from opentelemetry.trace.span import format_span_id, format_trace_id
from llama_stack.providers.utils.telemetry.tracing import LOCAL_ROOT_SPAN_MARKER
class SQLiteSpanProcessor(SpanProcessor):
def __init__(self, conn_string):
"""Initialize the SQLite span processor with a connection string."""
self.conn_string = conn_string
self._local = threading.local() # Thread-local storage for connections
self.setup_database()
def _get_connection(self):
"""Get a thread-local database connection."""
if not hasattr(self._local, "conn"):
try:
self._local.conn = sqlite3.connect(self.conn_string)
except Exception as e:
print(f"Error connecting to SQLite database: {e}")
raise
return self._local.conn
def setup_database(self):
"""Create the necessary tables if they don't exist."""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.conn_string), exist_ok=True)
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS traces (
trace_id TEXT PRIMARY KEY,
service_name TEXT,
root_span_id TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS spans (
span_id TEXT PRIMARY KEY,
trace_id TEXT REFERENCES traces(trace_id),
parent_span_id TEXT,
name TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
attributes TEXT,
status TEXT,
kind TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS span_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
span_id TEXT REFERENCES spans(span_id),
name TEXT,
timestamp TIMESTAMP,
attributes TEXT
)
"""
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traces_created_at
ON traces(created_at)
"""
)
conn.commit()
cursor.close()
def on_start(self, span: Span, parent_context=None):
"""Called when a span starts."""
pass
def on_end(self, span: Span):
"""Called when a span ends. Export the span data to SQLite."""
try:
conn = self._get_connection()
cursor = conn.cursor()
trace_id = format_trace_id(span.get_span_context().trace_id)
span_id = format_span_id(span.get_span_context().span_id)
service_name = span.resource.attributes.get("service.name", "unknown")
parent_span_id = None
parent_context = span.parent
if parent_context:
parent_span_id = format_span_id(parent_context.span_id)
# Insert into traces
cursor.execute(
"""
INSERT INTO traces (
trace_id, service_name, root_span_id, start_time, end_time
) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(trace_id) DO UPDATE SET
root_span_id = COALESCE(root_span_id, excluded.root_span_id),
start_time = MIN(excluded.start_time, start_time),
end_time = MAX(excluded.end_time, end_time)
""",
(
trace_id,
service_name,
(span_id if span.attributes.get(LOCAL_ROOT_SPAN_MARKER) else None),
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
),
)
# Insert into spans
cursor.execute(
"""
INSERT INTO spans (
span_id, trace_id, parent_span_id, name,
start_time, end_time, attributes, status,
kind
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
span_id,
trace_id,
parent_span_id,
span.name,
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
json.dumps(dict(span.attributes)),
span.status.status_code.name,
span.kind.name,
),
)
for event in span.events:
cursor.execute(
"""
INSERT INTO span_events (
span_id, name, timestamp, attributes
) VALUES (?, ?, ?, ?)
""",
(
span_id,
event.name,
datetime.fromtimestamp(event.timestamp / 1e9, UTC).isoformat(),
json.dumps(dict(event.attributes)),
),
)
conn.commit()
cursor.close()
except Exception as e:
print(f"Error exporting span to SQLite: {e}")
def shutdown(self):
"""Cleanup any resources."""
# We can't access other threads' connections, so we just close our own
if hasattr(self._local, "conn"):
try:
self._local.conn.close()
except Exception as e:
print(f"Error closing SQLite connection: {e}")
finally:
del self._local.conn
def force_flush(self, timeout_millis=30000):
"""Force export of spans."""
pass

View file

@ -4,7 +4,6 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import datetime
import threading
from typing import Any
@ -22,10 +21,7 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapProp
from llama_stack.apis.telemetry import (
Event,
MetricEvent,
MetricLabelMatcher,
MetricQueryType,
QueryCondition,
QueryMetricsResponse,
QuerySpanTreeResponse,
QueryTracesResponse,
Span,
@ -42,11 +38,7 @@ from llama_stack.log import get_logger
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
ConsoleSpanProcessor,
)
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
SQLiteSpanProcessor,
)
from llama_stack.providers.utils.telemetry.dataset_mixin import TelemetryDatasetMixin
from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore
from llama_stack.providers.utils.telemetry.tracing import ROOT_SPAN_MARKERS
from .config import TelemetryConfig, TelemetrySink
@ -111,15 +103,11 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
metric_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(metric_provider)
if TelemetrySink.SQLITE in self.config.sinks:
trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path))
if TelemetrySink.CONSOLE in self.config.sinks:
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True))
if TelemetrySink.OTEL_METRIC in self.config.sinks:
self.meter = metrics.get_meter(__name__)
if TelemetrySink.SQLITE in self.config.sinks:
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path)
self._lock = _global_lock
@ -139,47 +127,6 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
else:
raise ValueError(f"Unknown event type: {event}")
async def query_metrics(
self,
metric_name: str,
start_time: int,
end_time: int | None = None,
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
"""Query metrics from the telemetry store.
Args:
metric_name: The name of the metric to query (e.g., "prompt_tokens")
start_time: Start time as Unix timestamp
end_time: End time as Unix timestamp (defaults to now if None)
granularity: Time granularity for aggregation
query_type: Type of query (RANGE or INSTANT)
label_matchers: Label filters to apply
Returns:
QueryMetricsResponse with metric time series data
"""
# Convert timestamps to datetime objects
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None
# Use SQLite trace store if available
if hasattr(self, "trace_store") and self.trace_store:
return await self.trace_store.query_metrics(
metric_name=metric_name,
start_time=start_dt,
end_time=end_dt,
granularity=granularity,
query_type=query_type,
label_matchers=label_matchers,
)
else:
raise ValueError(
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
)
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
with self._lock:
# Use global storage instead of instance storage

View file

@ -139,16 +139,13 @@ print(f"Structured Response: {structured_response.choices[0].message.content}")
The following example shows how to create embeddings for an NVIDIA NIM.
> [!NOTE]
> NVIDIA asymmetric embedding models (e.g., `nvidia/llama-3.2-nv-embedqa-1b-v2`) require an `input_type` parameter not present in the standard OpenAI embeddings API. The NVIDIA Inference Adapter automatically sets `input_type="query"` when using the OpenAI-compatible embeddings endpoint for NVIDIA. For passage embeddings, use the `embeddings` API with `task_type="document"`.
```python
response = client.inference.embeddings(
model_id="nvidia/llama-3.2-nv-embedqa-1b-v2",
contents=["What is the capital of France?"],
task_type="query",
response = client.embeddings.create(
model="nvidia/llama-3.2-nv-embedqa-1b-v2",
input=["What is the capital of France?"],
extra_body={"input_type": "query"},
)
print(f"Embeddings: {response.embeddings}")
print(f"Embeddings: {response.data}")
```
### Vision Language Models Example

View file

@ -5,14 +5,6 @@
# the root directory of this source tree.
from openai import NOT_GIVEN
from llama_stack.apis.inference import (
OpenAIEmbeddingData,
OpenAIEmbeddingsRequestWithExtraBody,
OpenAIEmbeddingsResponse,
OpenAIEmbeddingUsage,
)
from llama_stack.log import get_logger
from llama_stack.providers.utils.inference.openai_mixin import OpenAIMixin
@ -76,50 +68,3 @@ class NVIDIAInferenceAdapter(OpenAIMixin):
:return: The NVIDIA API base URL
"""
return f"{self.config.url}/v1" if self.config.append_api_version else self.config.url
async def openai_embeddings(
self,
params: OpenAIEmbeddingsRequestWithExtraBody,
) -> OpenAIEmbeddingsResponse:
"""
OpenAI-compatible embeddings for NVIDIA NIM.
Note: NVIDIA NIM asymmetric embedding models require an "input_type" field not present in the standard OpenAI embeddings API.
We default this to "query" to ensure requests succeed when using the
OpenAI-compatible endpoint. For passage embeddings, use the embeddings API with
`task_type='document'`.
"""
extra_body: dict[str, object] = {"input_type": "query"}
logger.warning(
"NVIDIA OpenAI-compatible embeddings: defaulting to input_type='query'. "
"For passage embeddings, use the embeddings API with task_type='document'."
)
response = await self.client.embeddings.create(
model=await self._get_provider_model_id(params.model),
input=params.input,
encoding_format=params.encoding_format if params.encoding_format is not None else NOT_GIVEN,
dimensions=params.dimensions if params.dimensions is not None else NOT_GIVEN,
user=params.user if params.user is not None else NOT_GIVEN,
extra_body=extra_body,
)
data = []
for i, embedding_data in enumerate(response.data):
data.append(
OpenAIEmbeddingData(
embedding=embedding_data.embedding,
index=i,
)
)
usage = OpenAIEmbeddingUsage(
prompt_tokens=response.usage.prompt_tokens,
total_tokens=response.usage.total_tokens,
)
return OpenAIEmbeddingsResponse(
data=data,
model=response.model,
usage=usage,
)

View file

@ -1,383 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
from datetime import UTC, datetime
from typing import Protocol
import aiosqlite
from llama_stack.apis.telemetry import (
MetricDataPoint,
MetricLabel,
MetricLabelMatcher,
MetricQueryType,
MetricSeries,
QueryCondition,
QueryMetricsResponse,
Span,
SpanWithStatus,
Trace,
)
class TraceStore(Protocol):
async def query_traces(
self,
attribute_filters: list[QueryCondition] | None = None,
limit: int | None = 100,
offset: int | None = 0,
order_by: list[str] | None = None,
) -> list[Trace]: ...
async def get_span_tree(
self,
span_id: str,
attributes_to_return: list[str] | None = None,
max_depth: int | None = None,
) -> dict[str, SpanWithStatus]: ...
async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = "1d",
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse: ...
class SQLiteTraceStore(TraceStore):
def __init__(self, conn_string: str):
self.conn_string = conn_string
async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
if end_time is None:
end_time = datetime.now(UTC)
# Build base query
if query_type == MetricQueryType.INSTANT:
query = """
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
if granularity:
time_format = self._get_time_format_for_granularity(granularity)
query = f"""
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
strftime('{time_format}', se.timestamp) as bucket_start
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
query = """
SELECT
se.name,
json_extract(se.attributes, '$.value') as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
se.timestamp
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()]
# Labels that will be attached to the MetricSeries (preserve matcher labels)
all_labels: list[MetricLabel] = []
matcher_label_names = set()
if label_matchers:
for matcher in label_matchers:
json_path = f"$.{matcher.name}"
if matcher.operator == "=":
query += f" AND json_extract(se.attributes, '{json_path}') = ?"
params.append(matcher.value)
elif matcher.operator == "!=":
query += f" AND json_extract(se.attributes, '{json_path}') != ?"
params.append(matcher.value)
elif matcher.operator == "=~":
query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?"
params.append(f"%{matcher.value}%")
elif matcher.operator == "!~":
query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?"
params.append(f"%{matcher.value}%")
# Preserve filter context in output
all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value)))
matcher_label_names.add(matcher.name)
# GROUP BY / ORDER BY logic
if query_type == MetricQueryType.RANGE and granularity:
group_time_format = self._get_time_format_for_granularity(granularity)
query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')"
query += " ORDER BY bucket_start"
elif query_type == MetricQueryType.INSTANT:
query += " GROUP BY json_extract(se.attributes, '$.unit')"
else:
query += " ORDER BY se.timestamp"
# Execute query
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
if not rows:
return QueryMetricsResponse(data=[])
data_points = []
# We want to add attribute labels, but only those not already present as matcher labels.
attr_label_names = set()
for row in rows:
# Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
try:
attributes = json.loads(row["attributes"] or "{}")
except (TypeError, json.JSONDecodeError):
attributes = {}
value = row["value"]
unit = row["unit"] or ""
# Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
for k, v in attributes.items():
if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names:
all_labels.append(MetricLabel(name=k, value=str(v)))
attr_label_names.add(k)
# Determine timestamp
if query_type == MetricQueryType.RANGE and granularity:
try:
bucket_start_raw = row["bucket_start"]
except KeyError as e:
raise ValueError(
"DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if bucket_start_raw is None:
raise ValueError("bucket_start is None check time format and data")
bucket_start = datetime.fromisoformat(bucket_start_raw)
timestamp = int(bucket_start.timestamp())
elif query_type == MetricQueryType.INSTANT:
timestamp = int(datetime.now(UTC).timestamp())
else:
try:
timestamp_raw = row["timestamp"]
except KeyError as e:
raise ValueError(
"DB did not have a timestamp in row, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if timestamp_raw is None:
raise ValueError("timestamp is None check time format and data")
timestamp_iso = datetime.fromisoformat(timestamp_raw)
timestamp = int(timestamp_iso.timestamp())
data_points.append(
MetricDataPoint(
timestamp=timestamp,
value=value,
unit=unit,
)
)
metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)]
return QueryMetricsResponse(data=metric_series)
def _get_time_format_for_granularity(self, granularity: str | None) -> str:
"""Get the SQLite strftime format string for a given granularity.
Args:
granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
Returns:
SQLite strftime format string for the granularity
"""
if granularity is None:
raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation")
if granularity.endswith("d"):
return "%Y-%m-%d 00:00:00"
elif granularity.endswith("h"):
return "%Y-%m-%d %H:00:00"
elif granularity.endswith("m"):
return "%Y-%m-%d %H:%M:00"
else:
return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
async def query_traces(
self,
attribute_filters: list[QueryCondition] | None = None,
limit: int | None = 100,
offset: int | None = 0,
order_by: list[str] | None = None,
) -> list[Trace]:
def build_where_clause() -> tuple[str, list]:
if not attribute_filters:
return "", []
ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"}
conditions = [
f"json_extract(s.attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?"
for condition in attribute_filters
]
params = [condition.value for condition in attribute_filters]
where_clause = " WHERE " + " AND ".join(conditions)
return where_clause, params
def build_order_clause() -> str:
if not order_by:
return ""
order_clauses = []
for field in order_by:
desc = field.startswith("-")
clean_field = field[1:] if desc else field
order_clauses.append(f"t.{clean_field} {'DESC' if desc else 'ASC'}")
return " ORDER BY " + ", ".join(order_clauses)
# Build the main query
base_query = """
WITH matching_traces AS (
SELECT DISTINCT t.trace_id
FROM traces t
JOIN spans s ON t.trace_id = s.trace_id
{where_clause}
),
filtered_traces AS (
SELECT t.trace_id, t.root_span_id, t.start_time, t.end_time
FROM matching_traces mt
JOIN traces t ON mt.trace_id = t.trace_id
LEFT JOIN spans s ON t.trace_id = s.trace_id
{order_clause}
)
SELECT DISTINCT trace_id, root_span_id, start_time, end_time
FROM filtered_traces
WHERE root_span_id IS NOT NULL
LIMIT {limit} OFFSET {offset}
"""
where_clause, params = build_where_clause()
query = base_query.format(
where_clause=where_clause,
order_clause=build_order_clause(),
limit=limit,
offset=offset,
)
# Execute query and return results
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [
Trace(
trace_id=row["trace_id"],
root_span_id=row["root_span_id"],
start_time=datetime.fromisoformat(row["start_time"]),
end_time=datetime.fromisoformat(row["end_time"]),
)
for row in rows
]
async def get_span_tree(
self,
span_id: str,
attributes_to_return: list[str] | None = None,
max_depth: int | None = None,
) -> dict[str, SpanWithStatus]:
# Build the attributes selection
attributes_select = "s.attributes"
if attributes_to_return:
json_object = ", ".join(f"'{key}', json_extract(s.attributes, '$.{key}')" for key in attributes_to_return)
attributes_select = f"json_object({json_object})"
# SQLite CTE query with filtered attributes
query = f"""
WITH RECURSIVE span_tree AS (
SELECT s.*, 1 as depth, {attributes_select} as filtered_attributes
FROM spans s
WHERE s.span_id = ?
UNION ALL
SELECT s.*, st.depth + 1, {attributes_select} as filtered_attributes
FROM spans s
JOIN span_tree st ON s.parent_span_id = st.span_id
WHERE (? IS NULL OR st.depth < ?)
)
SELECT *
FROM span_tree
ORDER BY depth, start_time
"""
spans_by_id = {}
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (span_id, max_depth, max_depth)) as cursor:
rows = await cursor.fetchall()
if not rows:
raise ValueError(f"Span {span_id} not found")
for row in rows:
span = SpanWithStatus(
span_id=row["span_id"],
trace_id=row["trace_id"],
parent_span_id=row["parent_span_id"],
name=row["name"],
start_time=datetime.fromisoformat(row["start_time"]),
end_time=datetime.fromisoformat(row["end_time"]),
attributes=json.loads(row["filtered_attributes"]),
status=row["status"].lower(),
)
spans_by_id[span.span_id] = span
return spans_by_id
async def get_trace(self, trace_id: str) -> Trace:
query = """
SELECT *
FROM traces t
WHERE t.trace_id = ?
"""
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (trace_id,)) as cursor:
row = await cursor.fetchone()
if row is None:
raise ValueError(f"Trace {trace_id} not found")
return Trace(**row)
async def get_span(self, trace_id: str, span_id: str) -> Span:
query = "SELECT * FROM spans WHERE trace_id = ? AND span_id = ?"
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (trace_id, span_id)) as cursor:
row = await cursor.fetchone()
if row is None:
raise ValueError(f"Span {span_id} not found")
return Span(**row)

View file

@ -12,6 +12,15 @@ from openai import OpenAI
from llama_stack.core.library_client import LlamaStackAsLibraryClient
ASYMMETRIC_EMBEDDING_MODELS_BY_PROVIDER = {
"remote::nvidia": [
"nvidia/llama-3.2-nv-embedqa-1b-v2",
"nvidia/nv-embedqa-e5-v5",
"nvidia/nv-embedqa-mistral-7b-v2",
"snowflake/arctic-embed-l",
],
}
def decode_base64_to_floats(base64_string: str) -> list[float]:
"""Helper function to decode base64 string to list of float32 values."""
@ -29,6 +38,28 @@ def provider_from_model(client_with_models, model_id):
return providers[provider_id]
def is_asymmetric_model(client_with_models, model_id):
provider = provider_from_model(client_with_models, model_id)
provider_type = provider.provider_type
if provider_type not in ASYMMETRIC_EMBEDDING_MODELS_BY_PROVIDER:
return False
return model_id in ASYMMETRIC_EMBEDDING_MODELS_BY_PROVIDER[provider_type]
def get_extra_body_for_model(client_with_models, model_id, input_type="query"):
if not is_asymmetric_model(client_with_models, model_id):
return None
provider = provider_from_model(client_with_models, model_id)
if provider.provider_type == "remote::nvidia":
return {"input_type": input_type}
return None
def skip_if_model_doesnt_support_user_param(client, model_id):
provider = provider_from_model(client, model_id)
if provider.provider_type in (
@ -40,17 +71,29 @@ def skip_if_model_doesnt_support_user_param(client, model_id):
def skip_if_model_doesnt_support_encoding_format_base64(client, model_id):
provider = provider_from_model(client, model_id)
if provider.provider_type in (
should_skip = provider.provider_type in (
"remote::databricks", # param silently ignored, always returns floats
"remote::fireworks", # param silently ignored, always returns list of floats
"remote::ollama", # param silently ignored, always returns list of floats
):
) or (
provider.provider_type == "remote::nvidia"
and model_id
in [
"nvidia/nv-embedqa-e5-v5",
"nvidia/nv-embedqa-mistral-7b-v2",
"snowflake/arctic-embed-l",
]
)
if should_skip:
pytest.skip(f"Model {model_id} hosted by {provider.provider_type} does not support encoding_format='base64'.")
def skip_if_model_doesnt_support_variable_dimensions(client_with_models, model_id):
provider = provider_from_model(client_with_models, model_id)
if (
should_skip = (
provider.provider_type
in (
"remote::together", # returns 400
@ -59,11 +102,19 @@ def skip_if_model_doesnt_support_variable_dimensions(client_with_models, model_i
"remote::databricks",
"remote::watsonx", # openai.BadRequestError: Error code: 400 - {'detail': "litellm.UnsupportedParamsError: watsonx does not support parameters: {'dimensions': 384}
)
):
pytest.skip(
f"Model {model_id} hosted by {provider.provider_type} does not support variable output embedding dimensions."
or (provider.provider_type == "remote::openai" and "text-embedding-3" not in model_id)
or (
provider.provider_type == "remote::nvidia"
and model_id
in [
"nvidia/nv-embedqa-e5-v5",
"nvidia/nv-embedqa-mistral-7b-v2",
"snowflake/arctic-embed-l",
]
)
if provider.provider_type == "remote::openai" and "text-embedding-3" not in model_id:
)
if should_skip:
pytest.skip(
f"Model {model_id} hosted by {provider.provider_type} does not support variable output embedding dimensions."
)
@ -105,6 +156,7 @@ def test_openai_embeddings_single_string(compat_client, client_with_models, embe
model=embedding_model_id,
input=input_text,
encoding_format="float",
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
assert response.object == "list"
@ -129,6 +181,7 @@ def test_openai_embeddings_multiple_strings(compat_client, client_with_models, e
model=embedding_model_id,
input=input_texts,
encoding_format="float",
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
assert response.object == "list"
@ -155,6 +208,7 @@ def test_openai_embeddings_with_encoding_format_float(compat_client, client_with
model=embedding_model_id,
input=input_text,
encoding_format="float",
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
assert response.object == "list"
@ -175,6 +229,7 @@ def test_openai_embeddings_with_dimensions(compat_client, client_with_models, em
model=embedding_model_id,
input=input_text,
dimensions=dimensions,
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
assert response.object == "list"
@ -196,6 +251,7 @@ def test_openai_embeddings_with_user_parameter(compat_client, client_with_models
model=embedding_model_id,
input=input_text,
user=user_id,
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
assert response.object == "list"
@ -212,6 +268,7 @@ def test_openai_embeddings_empty_list_error(compat_client, client_with_models, e
compat_client.embeddings.create(
model=embedding_model_id,
input=[],
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
@ -223,6 +280,7 @@ def test_openai_embeddings_invalid_model_error(compat_client, client_with_models
compat_client.embeddings.create(
model="invalid-model-id",
input="Test text",
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
@ -233,16 +291,19 @@ def test_openai_embeddings_different_inputs_different_outputs(compat_client, cli
input_text1 = "This is the first text"
input_text2 = "This is completely different content"
extra_body = get_extra_body_for_model(client_with_models, embedding_model_id)
response1 = compat_client.embeddings.create(
model=embedding_model_id,
input=input_text1,
encoding_format="float",
extra_body=extra_body,
)
response2 = compat_client.embeddings.create(
model=embedding_model_id,
input=input_text2,
encoding_format="float",
extra_body=extra_body,
)
embedding1 = response1.data[0].embedding
@ -267,6 +328,7 @@ def test_openai_embeddings_with_encoding_format_base64(compat_client, client_wit
input=input_text,
encoding_format="base64",
dimensions=dimensions,
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
# Validate response structure
@ -298,6 +360,7 @@ def test_openai_embeddings_base64_batch_processing(compat_client, client_with_mo
model=embedding_model_id,
input=input_texts,
encoding_format="base64",
extra_body=get_extra_body_for_model(client_with_models, embedding_model_id),
)
# Validate response structure
assert response.object == "list"