chore!: BREAKING CHANGE: remove sqlite from telemetry config

# What does this PR do?


## Test Plan
This commit is contained in:
Eric Huang 2025-10-15 09:58:40 -07:00
parent e9b4278a51
commit 123b88879c
21 changed files with 26 additions and 1026 deletions

View file

@ -10,58 +10,8 @@ import TabItem from '@theme/TabItem';
# Telemetry
The Llama Stack telemetry system provides comprehensive tracing, metrics, and logging capabilities. It supports multiple sink types including OpenTelemetry, SQLite, and Console output for complete observability of your AI applications.
The Llama Stack uses OpenTelemetry to provide comprehensive tracing, metrics, and logging capabilities.
## Event Types
The telemetry system supports three main types of events:
<Tabs>
<TabItem value="unstructured" label="Unstructured Logs">
Free-form log messages with severity levels for general application logging:
```python
unstructured_log_event = UnstructuredLogEvent(
message="This is a log message",
severity=LogSeverity.INFO
)
```
</TabItem>
<TabItem value="metrics" label="Metric Events">
Numerical measurements with units for tracking performance and usage:
```python
metric_event = MetricEvent(
metric="my_metric",
value=10,
unit="count"
)
```
</TabItem>
<TabItem value="structured" label="Structured Logs">
System events like span start/end that provide structured operation tracking:
```python
structured_log_event = SpanStartPayload(
name="my_span",
parent_span_id="parent_span_id"
)
```
</TabItem>
</Tabs>
## Spans and Traces
- **Spans**: Represent individual operations with timing information and hierarchical relationships
- **Traces**: Collections of related spans that form a complete request flow across your application
This hierarchical structure allows you to understand the complete execution path of requests through your Llama Stack application.
## Automatic Metrics Generation
@ -129,21 +79,6 @@ Send events to an OpenTelemetry Collector for integration with observability pla
- Compatible with all OpenTelemetry collectors
- Supports both traces and metrics
</TabItem>
<TabItem value="sqlite" label="SQLite">
Store events in a local SQLite database for direct querying:
**Use Cases:**
- Local development and debugging
- Custom analytics and reporting
- Offline analysis of application behavior
**Features:**
- Direct SQL querying capabilities
- Persistent local storage
- No external dependencies
</TabItem>
<TabItem value="console" label="Console">
@ -174,9 +109,8 @@ telemetry:
provider_type: inline::meta-reference
config:
service_name: "llama-stack-service"
sinks: ['console', 'sqlite', 'otel_trace', 'otel_metric']
sinks: ['console', 'otel_trace', 'otel_metric']
otel_exporter_otlp_endpoint: "http://localhost:4318"
sqlite_db_path: "/path/to/telemetry.db"
```
### Environment Variables
@ -185,7 +119,7 @@ Configure telemetry behavior using environment variables:
- **`OTEL_EXPORTER_OTLP_ENDPOINT`**: OpenTelemetry Collector endpoint (default: `http://localhost:4318`)
- **`OTEL_SERVICE_NAME`**: Service name for telemetry (default: empty string)
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `console,sqlite`)
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `[]`)
### Quick Setup: Complete Telemetry Stack
@ -248,37 +182,10 @@ Forward metrics to other observability systems:
</TabItem>
</Tabs>
## SQLite Querying
The `sqlite` sink allows you to query traces without an external system. This is particularly useful for development and custom analytics.
### Example Queries
```sql
-- Query recent traces
SELECT * FROM traces WHERE timestamp > datetime('now', '-1 hour');
-- Analyze span durations
SELECT name, AVG(duration_ms) as avg_duration
FROM spans
GROUP BY name
ORDER BY avg_duration DESC;
-- Find slow operations
SELECT * FROM spans
WHERE duration_ms > 1000
ORDER BY duration_ms DESC;
```
:::tip[Advanced Analytics]
Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stack/blob/main/docs/getting_started.ipynb) for more examples on querying traces and spans programmatically.
:::
## Best Practices
### 🔍 **Monitoring Strategy**
- Use OpenTelemetry for production environments
- Combine multiple sinks for development (console + SQLite)
- Set up alerts on key metrics like token usage and error rates
### 📊 **Metrics Analysis**
@ -293,45 +200,8 @@ Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stac
### 🔧 **Configuration Management**
- Use environment variables for flexible deployment
- Configure appropriate retention policies for SQLite
- Ensure proper network access to OpenTelemetry collectors
## Integration Examples
### Basic Telemetry Setup
```python
from llama_stack_client import LlamaStackClient
# Client with telemetry headers
client = LlamaStackClient(
base_url="http://localhost:8000",
extra_headers={
"X-Telemetry-Service": "my-ai-app",
"X-Telemetry-Version": "1.0.0"
}
)
# All API calls will be automatically traced
response = client.chat.completions.create(
model="meta-llama/Llama-3.2-3B-Instruct",
messages=[{"role": "user", "content": "Hello!"}]
)
```
### Custom Telemetry Context
```python
# Add custom span attributes for better tracking
with tracer.start_as_current_span("custom_operation") as span:
span.set_attribute("user_id", "user123")
span.set_attribute("operation_type", "chat_completion")
response = client.chat.completions.create(
model="meta-llama/Llama-3.2-3B-Instruct",
messages=[{"role": "user", "content": "Hello!"}]
)
```
## Related Resources

View file

@ -119,7 +119,7 @@ The following environment variables can be configured:
### Telemetry Configuration
- `OTEL_SERVICE_NAME`: OpenTelemetry service name
- `TELEMETRY_SINKS`: Telemetry sinks (default: `console,sqlite`)
- `TELEMETRY_SINKS`: Telemetry sinks (default: `[]`)
## Enabling Providers
@ -216,7 +216,6 @@ The starter distribution uses SQLite for local storage of various components:
- **Files metadata**: `~/.llama/distributions/starter/files_metadata.db`
- **Agents store**: `~/.llama/distributions/starter/agents_store.db`
- **Responses store**: `~/.llama/distributions/starter/responses_store.db`
- **Trace store**: `~/.llama/distributions/starter/trace_store.db`
- **Evaluation store**: `~/.llama/distributions/starter/meta_reference_eval.db`
- **Dataset I/O stores**: Various HuggingFace and local filesystem stores

View file

@ -16,14 +16,12 @@ Meta's reference implementation of telemetry and observability using OpenTelemet
|-------|------|----------|---------|-------------|
| `otel_exporter_otlp_endpoint` | `str \| None` | No | | The OpenTelemetry collector endpoint URL (base URL for traces, metrics, and logs). If not set, the SDK will use OTEL_EXPORTER_OTLP_ENDPOINT environment variable. |
| `service_name` | `<class 'str'>` | No | | The service name to use for telemetry |
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [&lt;TelemetrySink.SQLITE: 'sqlite'&gt;] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console) |
| `sqlite_db_path` | `<class 'str'>` | No | ~/.llama/runtime/trace_store.db | The path to the SQLite database to use for storing traces |
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console) |
## Sample Configuration
```yaml
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
```

View file

@ -421,104 +421,3 @@ class Telemetry(Protocol):
:param ttl_seconds: The time to live of the event.
"""
...
async def query_traces(
self,
attribute_filters: list[QueryCondition] | None = None,
limit: int | None = 100,
offset: int | None = 0,
order_by: list[str] | None = None,
) -> QueryTracesResponse:
"""Query traces.
:param attribute_filters: The attribute filters to apply to the traces.
:param limit: The limit of traces to return.
:param offset: The offset of the traces to return.
:param order_by: The order by of the traces to return.
:returns: A QueryTracesResponse.
"""
...
async def get_trace(self, trace_id: str) -> Trace:
"""Get a trace by its ID.
:param trace_id: The ID of the trace to get.
:returns: A Trace.
"""
...
async def get_span(self, trace_id: str, span_id: str) -> Span:
"""Get a span by its ID.
:param trace_id: The ID of the trace to get the span from.
:param span_id: The ID of the span to get.
:returns: A Span.
"""
...
async def get_span_tree(
self,
span_id: str,
attributes_to_return: list[str] | None = None,
max_depth: int | None = None,
) -> QuerySpanTreeResponse:
"""Get a span tree by its ID.
:param span_id: The ID of the span to get the tree from.
:param attributes_to_return: The attributes to return in the tree.
:param max_depth: The maximum depth of the tree.
:returns: A QuerySpanTreeResponse.
"""
...
async def query_spans(
self,
attribute_filters: list[QueryCondition],
attributes_to_return: list[str],
max_depth: int | None = None,
) -> QuerySpansResponse:
"""Query spans.
:param attribute_filters: The attribute filters to apply to the spans.
:param attributes_to_return: The attributes to return in the spans.
:param max_depth: The maximum depth of the tree.
:returns: A QuerySpansResponse.
"""
...
async def save_spans_to_dataset(
self,
attribute_filters: list[QueryCondition],
attributes_to_save: list[str],
dataset_id: str,
max_depth: int | None = None,
) -> None:
"""Save spans to a dataset.
:param attribute_filters: The attribute filters to apply to the spans.
:param attributes_to_save: The attributes to save to the dataset.
:param dataset_id: The ID of the dataset to save the spans to.
:param max_depth: The maximum depth of the tree.
"""
...
async def query_metrics(
self,
metric_name: str,
start_time: int,
end_time: int | None = None,
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
"""Query metrics.
:param metric_name: The name of the metric to query.
:param start_time: The start time of the metric to query.
:param end_time: The end time of the metric to query.
:param granularity: The granularity of the metric to query.
:param query_type: The type of query to perform.
:param label_matchers: The label matchers to apply to the metric.
:returns: A QueryMetricsResponse.
"""
...

View file

@ -207,8 +207,9 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient):
super().__init__()
# when using the library client, we should not log to console since many
# of our logs are intended for server-side usage
current_sinks = os.environ.get("TELEMETRY_SINKS", "sqlite").split(",")
os.environ["TELEMETRY_SINKS"] = ",".join(sink for sink in current_sinks if sink != "console")
if sinks_from_env := os.environ.get("TELEMETRY_SINKS", None):
current_sinks = sinks_from_env.strip().lower().split(",")
os.environ["TELEMETRY_SINKS"] = ",".join(sink for sink in current_sinks if sink != "console")
if in_notebook():
import nest_asyncio

View file

@ -159,8 +159,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
post_training:
- provider_id: torchtune-cpu

View file

@ -50,8 +50,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -46,8 +46,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -61,8 +61,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -51,8 +51,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -53,8 +53,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: nvidia

View file

@ -48,8 +48,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: nvidia

View file

@ -81,8 +81,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -159,8 +159,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter-gpu}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
post_training:
- provider_id: huggingface-gpu

View file

@ -159,8 +159,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
post_training:
- provider_id: torchtune-cpu

View file

@ -46,8 +46,7 @@ providers:
provider_type: inline::meta-reference
config:
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
sinks: ${env.TELEMETRY_SINKS:=sqlite}
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/watsonx}/trace_store.db
sinks: ${env.TELEMETRY_SINKS:=}
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
eval:
- provider_id: meta-reference

View file

@ -9,13 +9,10 @@ from typing import Any
from pydantic import BaseModel, Field, field_validator
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
class TelemetrySink(StrEnum):
OTEL_TRACE = "otel_trace"
OTEL_METRIC = "otel_metric"
SQLITE = "sqlite"
CONSOLE = "console"
@ -30,12 +27,8 @@ class TelemetryConfig(BaseModel):
description="The service name to use for telemetry",
)
sinks: list[TelemetrySink] = Field(
default=[TelemetrySink.SQLITE],
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console)",
)
sqlite_db_path: str = Field(
default_factory=lambda: (RUNTIME_BASE_DIR / "trace_store.db").as_posix(),
description="The path to the SQLite database to use for storing traces",
default_factory=list,
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console)",
)
@field_validator("sinks", mode="before")
@ -43,13 +36,12 @@ class TelemetryConfig(BaseModel):
def validate_sinks(cls, v):
if isinstance(v, str):
return [TelemetrySink(sink.strip()) for sink in v.split(",")]
return v
return v or []
@classmethod
def sample_run_config(cls, __distro_dir__: str, db_name: str = "trace_store.db") -> dict[str, Any]:
def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]:
return {
"service_name": "${env.OTEL_SERVICE_NAME:=\u200b}",
"sinks": "${env.TELEMETRY_SINKS:=sqlite}",
"sqlite_db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
"sinks": "${env.TELEMETRY_SINKS:=}",
"otel_exporter_otlp_endpoint": "${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}",
}

View file

@ -1,190 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
import os
import sqlite3
import threading
from datetime import UTC, datetime
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace import Span
from opentelemetry.trace.span import format_span_id, format_trace_id
from llama_stack.providers.utils.telemetry.tracing import LOCAL_ROOT_SPAN_MARKER
class SQLiteSpanProcessor(SpanProcessor):
def __init__(self, conn_string):
"""Initialize the SQLite span processor with a connection string."""
self.conn_string = conn_string
self._local = threading.local() # Thread-local storage for connections
self.setup_database()
def _get_connection(self):
"""Get a thread-local database connection."""
if not hasattr(self._local, "conn"):
try:
self._local.conn = sqlite3.connect(self.conn_string)
except Exception as e:
print(f"Error connecting to SQLite database: {e}")
raise
return self._local.conn
def setup_database(self):
"""Create the necessary tables if they don't exist."""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.conn_string), exist_ok=True)
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS traces (
trace_id TEXT PRIMARY KEY,
service_name TEXT,
root_span_id TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS spans (
span_id TEXT PRIMARY KEY,
trace_id TEXT REFERENCES traces(trace_id),
parent_span_id TEXT,
name TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
attributes TEXT,
status TEXT,
kind TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS span_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
span_id TEXT REFERENCES spans(span_id),
name TEXT,
timestamp TIMESTAMP,
attributes TEXT
)
"""
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traces_created_at
ON traces(created_at)
"""
)
conn.commit()
cursor.close()
def on_start(self, span: Span, parent_context=None):
"""Called when a span starts."""
pass
def on_end(self, span: Span):
"""Called when a span ends. Export the span data to SQLite."""
try:
conn = self._get_connection()
cursor = conn.cursor()
trace_id = format_trace_id(span.get_span_context().trace_id)
span_id = format_span_id(span.get_span_context().span_id)
service_name = span.resource.attributes.get("service.name", "unknown")
parent_span_id = None
parent_context = span.parent
if parent_context:
parent_span_id = format_span_id(parent_context.span_id)
# Insert into traces
cursor.execute(
"""
INSERT INTO traces (
trace_id, service_name, root_span_id, start_time, end_time
) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(trace_id) DO UPDATE SET
root_span_id = COALESCE(root_span_id, excluded.root_span_id),
start_time = MIN(excluded.start_time, start_time),
end_time = MAX(excluded.end_time, end_time)
""",
(
trace_id,
service_name,
(span_id if span.attributes.get(LOCAL_ROOT_SPAN_MARKER) else None),
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
),
)
# Insert into spans
cursor.execute(
"""
INSERT INTO spans (
span_id, trace_id, parent_span_id, name,
start_time, end_time, attributes, status,
kind
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
span_id,
trace_id,
parent_span_id,
span.name,
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
json.dumps(dict(span.attributes)),
span.status.status_code.name,
span.kind.name,
),
)
for event in span.events:
cursor.execute(
"""
INSERT INTO span_events (
span_id, name, timestamp, attributes
) VALUES (?, ?, ?, ?)
""",
(
span_id,
event.name,
datetime.fromtimestamp(event.timestamp / 1e9, UTC).isoformat(),
json.dumps(dict(event.attributes)),
),
)
conn.commit()
cursor.close()
except Exception as e:
print(f"Error exporting span to SQLite: {e}")
def shutdown(self):
"""Cleanup any resources."""
# We can't access other threads' connections, so we just close our own
if hasattr(self._local, "conn"):
try:
self._local.conn.close()
except Exception as e:
print(f"Error closing SQLite connection: {e}")
finally:
del self._local.conn
def force_flush(self, timeout_millis=30000):
"""Force export of spans."""
pass

View file

@ -4,7 +4,6 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import datetime
import threading
from typing import Any
@ -22,19 +21,11 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapProp
from llama_stack.apis.telemetry import (
Event,
MetricEvent,
MetricLabelMatcher,
MetricQueryType,
QueryCondition,
QueryMetricsResponse,
QuerySpanTreeResponse,
QueryTracesResponse,
Span,
SpanEndPayload,
SpanStartPayload,
SpanStatus,
StructuredLogEvent,
Telemetry,
Trace,
UnstructuredLogEvent,
)
from llama_stack.core.datatypes import Api
@ -42,11 +33,6 @@ from llama_stack.log import get_logger
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
ConsoleSpanProcessor,
)
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
SQLiteSpanProcessor,
)
from llama_stack.providers.utils.telemetry.dataset_mixin import TelemetryDatasetMixin
from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore
from llama_stack.providers.utils.telemetry.tracing import ROOT_SPAN_MARKERS
from .config import TelemetryConfig, TelemetrySink
@ -68,7 +54,7 @@ def is_tracing_enabled(tracer):
return span.is_recording()
class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
class TelemetryAdapter(Telemetry):
def __init__(self, config: TelemetryConfig, deps: dict[Api, Any]) -> None:
self.config = config
self.datasetio_api = deps.get(Api.datasetio)
@ -111,15 +97,11 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
metric_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(metric_provider)
if TelemetrySink.SQLITE in self.config.sinks:
trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path))
if TelemetrySink.CONSOLE in self.config.sinks:
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True))
if TelemetrySink.OTEL_METRIC in self.config.sinks:
self.meter = metrics.get_meter(__name__)
if TelemetrySink.SQLITE in self.config.sinks:
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path)
self._lock = _global_lock
@ -139,47 +121,6 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
else:
raise ValueError(f"Unknown event type: {event}")
async def query_metrics(
self,
metric_name: str,
start_time: int,
end_time: int | None = None,
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
"""Query metrics from the telemetry store.
Args:
metric_name: The name of the metric to query (e.g., "prompt_tokens")
start_time: Start time as Unix timestamp
end_time: End time as Unix timestamp (defaults to now if None)
granularity: Time granularity for aggregation
query_type: Type of query (RANGE or INSTANT)
label_matchers: Label filters to apply
Returns:
QueryMetricsResponse with metric time series data
"""
# Convert timestamps to datetime objects
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None
# Use SQLite trace store if available
if hasattr(self, "trace_store") and self.trace_store:
return await self.trace_store.query_metrics(
metric_name=metric_name,
start_time=start_dt,
end_time=end_dt,
granularity=granularity,
query_type=query_type,
label_matchers=label_matchers,
)
else:
raise ValueError(
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
)
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
with self._lock:
# Use global storage instead of instance storage
@ -326,39 +267,3 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
_GLOBAL_STORAGE["active_spans"].pop(span_id, None)
else:
raise ValueError(f"Unknown structured log event: {event}")
async def query_traces(
self,
attribute_filters: list[QueryCondition] | None = None,
limit: int | None = 100,
offset: int | None = 0,
order_by: list[str] | None = None,
) -> QueryTracesResponse:
return QueryTracesResponse(
data=await self.trace_store.query_traces(
attribute_filters=attribute_filters,
limit=limit,
offset=offset,
order_by=order_by,
)
)
async def get_trace(self, trace_id: str) -> Trace:
return await self.trace_store.get_trace(trace_id)
async def get_span(self, trace_id: str, span_id: str) -> Span:
return await self.trace_store.get_span(trace_id, span_id)
async def get_span_tree(
self,
span_id: str,
attributes_to_return: list[str] | None = None,
max_depth: int | None = None,
) -> QuerySpanTreeResponse:
return QuerySpanTreeResponse(
data=await self.trace_store.get_span_tree(
span_id=span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
)
)

View file

@ -1,80 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.apis.datasetio import DatasetIO
from llama_stack.apis.telemetry import QueryCondition, QuerySpansResponse, Span
class TelemetryDatasetMixin:
"""Mixin class that provides dataset-related functionality for telemetry providers."""
datasetio_api: DatasetIO | None
async def save_spans_to_dataset(
self,
attribute_filters: list[QueryCondition],
attributes_to_save: list[str],
dataset_id: str,
max_depth: int | None = None,
) -> None:
if self.datasetio_api is None:
raise RuntimeError("DatasetIO API not available")
spans = await self.query_spans(
attribute_filters=attribute_filters,
attributes_to_return=attributes_to_save,
max_depth=max_depth,
)
rows = [
{
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"name": span.name,
"start_time": span.start_time,
"end_time": span.end_time,
**{attr: span.attributes.get(attr) for attr in attributes_to_save},
}
for span in spans
]
await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows)
async def query_spans(
self,
attribute_filters: list[QueryCondition],
attributes_to_return: list[str],
max_depth: int | None = None,
) -> QuerySpansResponse:
traces = await self.query_traces(attribute_filters=attribute_filters)
spans = []
for trace in traces.data:
spans_by_id_resp = await self.get_span_tree(
span_id=trace.root_span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
)
for span in spans_by_id_resp.data.values():
if span.attributes and all(
attr in span.attributes and span.attributes[attr] is not None for attr in attributes_to_return
):
spans.append(
Span(
trace_id=trace.root_span_id,
span_id=span.span_id,
parent_span_id=span.parent_span_id,
name=span.name,
start_time=span.start_time,
end_time=span.end_time,
attributes=span.attributes,
)
)
return QuerySpansResponse(data=spans)

View file

@ -1,383 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
from datetime import UTC, datetime
from typing import Protocol
import aiosqlite
from llama_stack.apis.telemetry import (
MetricDataPoint,
MetricLabel,
MetricLabelMatcher,
MetricQueryType,
MetricSeries,
QueryCondition,
QueryMetricsResponse,
Span,
SpanWithStatus,
Trace,
)
class TraceStore(Protocol):
async def query_traces(
self,
attribute_filters: list[QueryCondition] | None = None,
limit: int | None = 100,
offset: int | None = 0,
order_by: list[str] | None = None,
) -> list[Trace]: ...
async def get_span_tree(
self,
span_id: str,
attributes_to_return: list[str] | None = None,
max_depth: int | None = None,
) -> dict[str, SpanWithStatus]: ...
async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = "1d",
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse: ...
class SQLiteTraceStore(TraceStore):
def __init__(self, conn_string: str):
self.conn_string = conn_string
async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
if end_time is None:
end_time = datetime.now(UTC)
# Build base query
if query_type == MetricQueryType.INSTANT:
query = """
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
if granularity:
time_format = self._get_time_format_for_granularity(granularity)
query = f"""
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
strftime('{time_format}', se.timestamp) as bucket_start
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
query = """
SELECT
se.name,
json_extract(se.attributes, '$.value') as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
se.timestamp
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()]
# Labels that will be attached to the MetricSeries (preserve matcher labels)
all_labels: list[MetricLabel] = []
matcher_label_names = set()
if label_matchers:
for matcher in label_matchers:
json_path = f"$.{matcher.name}"
if matcher.operator == "=":
query += f" AND json_extract(se.attributes, '{json_path}') = ?"
params.append(matcher.value)
elif matcher.operator == "!=":
query += f" AND json_extract(se.attributes, '{json_path}') != ?"
params.append(matcher.value)
elif matcher.operator == "=~":
query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?"
params.append(f"%{matcher.value}%")
elif matcher.operator == "!~":
query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?"
params.append(f"%{matcher.value}%")
# Preserve filter context in output
all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value)))
matcher_label_names.add(matcher.name)
# GROUP BY / ORDER BY logic
if query_type == MetricQueryType.RANGE and granularity:
group_time_format = self._get_time_format_for_granularity(granularity)
query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')"
query += " ORDER BY bucket_start"
elif query_type == MetricQueryType.INSTANT:
query += " GROUP BY json_extract(se.attributes, '$.unit')"
else:
query += " ORDER BY se.timestamp"
# Execute query
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
if not rows:
return QueryMetricsResponse(data=[])
data_points = []
# We want to add attribute labels, but only those not already present as matcher labels.
attr_label_names = set()
for row in rows:
# Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
try:
attributes = json.loads(row["attributes"] or "{}")
except (TypeError, json.JSONDecodeError):
attributes = {}
value = row["value"]
unit = row["unit"] or ""
# Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
for k, v in attributes.items():
if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names:
all_labels.append(MetricLabel(name=k, value=str(v)))
attr_label_names.add(k)
# Determine timestamp
if query_type == MetricQueryType.RANGE and granularity:
try:
bucket_start_raw = row["bucket_start"]
except KeyError as e:
raise ValueError(
"DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if bucket_start_raw is None:
raise ValueError("bucket_start is None check time format and data")
bucket_start = datetime.fromisoformat(bucket_start_raw)
timestamp = int(bucket_start.timestamp())
elif query_type == MetricQueryType.INSTANT:
timestamp = int(datetime.now(UTC).timestamp())
else:
try:
timestamp_raw = row["timestamp"]
except KeyError as e:
raise ValueError(
"DB did not have a timestamp in row, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if timestamp_raw is None:
raise ValueError("timestamp is None check time format and data")
timestamp_iso = datetime.fromisoformat(timestamp_raw)
timestamp = int(timestamp_iso.timestamp())
data_points.append(
MetricDataPoint(
timestamp=timestamp,
value=value,
unit=unit,
)
)
metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)]
return QueryMetricsResponse(data=metric_series)
def _get_time_format_for_granularity(self, granularity: str | None) -> str:
"""Get the SQLite strftime format string for a given granularity.
Args:
granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
Returns:
SQLite strftime format string for the granularity
"""
if granularity is None:
raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation")
if granularity.endswith("d"):
return "%Y-%m-%d 00:00:00"
elif granularity.endswith("h"):
return "%Y-%m-%d %H:00:00"
elif granularity.endswith("m"):
return "%Y-%m-%d %H:%M:00"
else:
return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
async def query_traces(
self,
attribute_filters: list[QueryCondition] | None = None,
limit: int | None = 100,
offset: int | None = 0,
order_by: list[str] | None = None,
) -> list[Trace]:
def build_where_clause() -> tuple[str, list]:
if not attribute_filters:
return "", []
ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"}
conditions = [
f"json_extract(s.attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?"
for condition in attribute_filters
]
params = [condition.value for condition in attribute_filters]
where_clause = " WHERE " + " AND ".join(conditions)
return where_clause, params
def build_order_clause() -> str:
if not order_by:
return ""
order_clauses = []
for field in order_by:
desc = field.startswith("-")
clean_field = field[1:] if desc else field
order_clauses.append(f"t.{clean_field} {'DESC' if desc else 'ASC'}")
return " ORDER BY " + ", ".join(order_clauses)
# Build the main query
base_query = """
WITH matching_traces AS (
SELECT DISTINCT t.trace_id
FROM traces t
JOIN spans s ON t.trace_id = s.trace_id
{where_clause}
),
filtered_traces AS (
SELECT t.trace_id, t.root_span_id, t.start_time, t.end_time
FROM matching_traces mt
JOIN traces t ON mt.trace_id = t.trace_id
LEFT JOIN spans s ON t.trace_id = s.trace_id
{order_clause}
)
SELECT DISTINCT trace_id, root_span_id, start_time, end_time
FROM filtered_traces
WHERE root_span_id IS NOT NULL
LIMIT {limit} OFFSET {offset}
"""
where_clause, params = build_where_clause()
query = base_query.format(
where_clause=where_clause,
order_clause=build_order_clause(),
limit=limit,
offset=offset,
)
# Execute query and return results
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [
Trace(
trace_id=row["trace_id"],
root_span_id=row["root_span_id"],
start_time=datetime.fromisoformat(row["start_time"]),
end_time=datetime.fromisoformat(row["end_time"]),
)
for row in rows
]
async def get_span_tree(
self,
span_id: str,
attributes_to_return: list[str] | None = None,
max_depth: int | None = None,
) -> dict[str, SpanWithStatus]:
# Build the attributes selection
attributes_select = "s.attributes"
if attributes_to_return:
json_object = ", ".join(f"'{key}', json_extract(s.attributes, '$.{key}')" for key in attributes_to_return)
attributes_select = f"json_object({json_object})"
# SQLite CTE query with filtered attributes
query = f"""
WITH RECURSIVE span_tree AS (
SELECT s.*, 1 as depth, {attributes_select} as filtered_attributes
FROM spans s
WHERE s.span_id = ?
UNION ALL
SELECT s.*, st.depth + 1, {attributes_select} as filtered_attributes
FROM spans s
JOIN span_tree st ON s.parent_span_id = st.span_id
WHERE (? IS NULL OR st.depth < ?)
)
SELECT *
FROM span_tree
ORDER BY depth, start_time
"""
spans_by_id = {}
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (span_id, max_depth, max_depth)) as cursor:
rows = await cursor.fetchall()
if not rows:
raise ValueError(f"Span {span_id} not found")
for row in rows:
span = SpanWithStatus(
span_id=row["span_id"],
trace_id=row["trace_id"],
parent_span_id=row["parent_span_id"],
name=row["name"],
start_time=datetime.fromisoformat(row["start_time"]),
end_time=datetime.fromisoformat(row["end_time"]),
attributes=json.loads(row["filtered_attributes"]),
status=row["status"].lower(),
)
spans_by_id[span.span_id] = span
return spans_by_id
async def get_trace(self, trace_id: str) -> Trace:
query = """
SELECT *
FROM traces t
WHERE t.trace_id = ?
"""
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (trace_id,)) as cursor:
row = await cursor.fetchone()
if row is None:
raise ValueError(f"Trace {trace_id} not found")
return Trace(**row)
async def get_span(self, trace_id: str, span_id: str) -> Span:
query = "SELECT * FROM spans WHERE trace_id = ? AND span_id = ?"
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (trace_id, span_id)) as cursor:
row = await cursor.fetchone()
if row is None:
raise ValueError(f"Span {span_id} not found")
return Span(**row)