diff --git a/docs/docs/building_applications/telemetry.mdx b/docs/docs/building_applications/telemetry.mdx index d991d97a1..2f1d80d41 100644 --- a/docs/docs/building_applications/telemetry.mdx +++ b/docs/docs/building_applications/telemetry.mdx @@ -10,58 +10,8 @@ import TabItem from '@theme/TabItem'; # Telemetry -The Llama Stack telemetry system provides comprehensive tracing, metrics, and logging capabilities. It supports multiple sink types including OpenTelemetry, SQLite, and Console output for complete observability of your AI applications. +The Llama Stack uses OpenTelemetry to provide comprehensive tracing, metrics, and logging capabilities. -## Event Types - -The telemetry system supports three main types of events: - - - - -Free-form log messages with severity levels for general application logging: - -```python -unstructured_log_event = UnstructuredLogEvent( - message="This is a log message", - severity=LogSeverity.INFO -) -``` - - - - -Numerical measurements with units for tracking performance and usage: - -```python -metric_event = MetricEvent( - metric="my_metric", - value=10, - unit="count" -) -``` - - - - -System events like span start/end that provide structured operation tracking: - -```python -structured_log_event = SpanStartPayload( - name="my_span", - parent_span_id="parent_span_id" -) -``` - - - - -## Spans and Traces - -- **Spans**: Represent individual operations with timing information and hierarchical relationships -- **Traces**: Collections of related spans that form a complete request flow across your application - -This hierarchical structure allows you to understand the complete execution path of requests through your Llama Stack application. ## Automatic Metrics Generation @@ -129,21 +79,6 @@ Send events to an OpenTelemetry Collector for integration with observability pla - Compatible with all OpenTelemetry collectors - Supports both traces and metrics - - - -Store events in a local SQLite database for direct querying: - -**Use Cases:** -- Local development and debugging -- Custom analytics and reporting -- Offline analysis of application behavior - -**Features:** -- Direct SQL querying capabilities -- Persistent local storage -- No external dependencies - @@ -174,9 +109,8 @@ telemetry: provider_type: inline::meta-reference config: service_name: "llama-stack-service" - sinks: ['console', 'sqlite', 'otel_trace', 'otel_metric'] + sinks: ['console', 'otel_trace', 'otel_metric'] otel_exporter_otlp_endpoint: "http://localhost:4318" - sqlite_db_path: "/path/to/telemetry.db" ``` ### Environment Variables @@ -185,7 +119,7 @@ Configure telemetry behavior using environment variables: - **`OTEL_EXPORTER_OTLP_ENDPOINT`**: OpenTelemetry Collector endpoint (default: `http://localhost:4318`) - **`OTEL_SERVICE_NAME`**: Service name for telemetry (default: empty string) -- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `console,sqlite`) +- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `[]`) ### Quick Setup: Complete Telemetry Stack @@ -248,37 +182,10 @@ Forward metrics to other observability systems: -## SQLite Querying - -The `sqlite` sink allows you to query traces without an external system. This is particularly useful for development and custom analytics. - -### Example Queries - -```sql --- Query recent traces -SELECT * FROM traces WHERE timestamp > datetime('now', '-1 hour'); - --- Analyze span durations -SELECT name, AVG(duration_ms) as avg_duration -FROM spans -GROUP BY name -ORDER BY avg_duration DESC; - --- Find slow operations -SELECT * FROM spans -WHERE duration_ms > 1000 -ORDER BY duration_ms DESC; -``` - -:::tip[Advanced Analytics] -Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stack/blob/main/docs/getting_started.ipynb) for more examples on querying traces and spans programmatically. -::: - ## Best Practices ### 🔍 **Monitoring Strategy** - Use OpenTelemetry for production environments -- Combine multiple sinks for development (console + SQLite) - Set up alerts on key metrics like token usage and error rates ### 📊 **Metrics Analysis** @@ -293,45 +200,8 @@ Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stac ### 🔧 **Configuration Management** - Use environment variables for flexible deployment -- Configure appropriate retention policies for SQLite - Ensure proper network access to OpenTelemetry collectors -## Integration Examples - -### Basic Telemetry Setup - -```python -from llama_stack_client import LlamaStackClient - -# Client with telemetry headers -client = LlamaStackClient( - base_url="http://localhost:8000", - extra_headers={ - "X-Telemetry-Service": "my-ai-app", - "X-Telemetry-Version": "1.0.0" - } -) - -# All API calls will be automatically traced -response = client.chat.completions.create( - model="meta-llama/Llama-3.2-3B-Instruct", - messages=[{"role": "user", "content": "Hello!"}] -) -``` - -### Custom Telemetry Context - -```python -# Add custom span attributes for better tracking -with tracer.start_as_current_span("custom_operation") as span: - span.set_attribute("user_id", "user123") - span.set_attribute("operation_type", "chat_completion") - - response = client.chat.completions.create( - model="meta-llama/Llama-3.2-3B-Instruct", - messages=[{"role": "user", "content": "Hello!"}] - ) -``` ## Related Resources diff --git a/docs/docs/distributions/self_hosted_distro/starter.md b/docs/docs/distributions/self_hosted_distro/starter.md index faa82bcfa..be303dbc2 100644 --- a/docs/docs/distributions/self_hosted_distro/starter.md +++ b/docs/docs/distributions/self_hosted_distro/starter.md @@ -216,7 +216,6 @@ The starter distribution uses SQLite for local storage of various components: - **Files metadata**: `~/.llama/distributions/starter/files_metadata.db` - **Agents store**: `~/.llama/distributions/starter/agents_store.db` - **Responses store**: `~/.llama/distributions/starter/responses_store.db` -- **Trace store**: `~/.llama/distributions/starter/trace_store.db` - **Evaluation store**: `~/.llama/distributions/starter/meta_reference_eval.db` - **Dataset I/O stores**: Various HuggingFace and local filesystem stores diff --git a/docs/docs/providers/telemetry/inline_meta-reference.mdx b/docs/docs/providers/telemetry/inline_meta-reference.mdx index ea2a690b3..d8b3157d1 100644 --- a/docs/docs/providers/telemetry/inline_meta-reference.mdx +++ b/docs/docs/providers/telemetry/inline_meta-reference.mdx @@ -16,14 +16,12 @@ Meta's reference implementation of telemetry and observability using OpenTelemet |-------|------|----------|---------|-------------| | `otel_exporter_otlp_endpoint` | `str \| None` | No | | The OpenTelemetry collector endpoint URL (base URL for traces, metrics, and logs). If not set, the SDK will use OTEL_EXPORTER_OTLP_ENDPOINT environment variable. | | `service_name` | `` | No | ​ | The service name to use for telemetry | -| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [<TelemetrySink.SQLITE: 'sqlite'>] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console) | -| `sqlite_db_path` | `` | No | ~/.llama/runtime/trace_store.db | The path to the SQLite database to use for storing traces | +| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console) | ## Sample Configuration ```yaml service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" -sinks: ${env.TELEMETRY_SINKS:=sqlite} -sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/trace_store.db +sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} ``` diff --git a/llama_stack/apis/telemetry/telemetry.py b/llama_stack/apis/telemetry/telemetry.py index b2999ad33..53387639b 100644 --- a/llama_stack/apis/telemetry/telemetry.py +++ b/llama_stack/apis/telemetry/telemetry.py @@ -421,104 +421,3 @@ class Telemetry(Protocol): :param ttl_seconds: The time to live of the event. """ ... - - async def query_traces( - self, - attribute_filters: list[QueryCondition] | None = None, - limit: int | None = 100, - offset: int | None = 0, - order_by: list[str] | None = None, - ) -> QueryTracesResponse: - """Query traces. - - :param attribute_filters: The attribute filters to apply to the traces. - :param limit: The limit of traces to return. - :param offset: The offset of the traces to return. - :param order_by: The order by of the traces to return. - :returns: A QueryTracesResponse. - """ - ... - - async def get_trace(self, trace_id: str) -> Trace: - """Get a trace by its ID. - - :param trace_id: The ID of the trace to get. - :returns: A Trace. - """ - ... - - async def get_span(self, trace_id: str, span_id: str) -> Span: - """Get a span by its ID. - - :param trace_id: The ID of the trace to get the span from. - :param span_id: The ID of the span to get. - :returns: A Span. - """ - ... - - async def get_span_tree( - self, - span_id: str, - attributes_to_return: list[str] | None = None, - max_depth: int | None = None, - ) -> QuerySpanTreeResponse: - """Get a span tree by its ID. - - :param span_id: The ID of the span to get the tree from. - :param attributes_to_return: The attributes to return in the tree. - :param max_depth: The maximum depth of the tree. - :returns: A QuerySpanTreeResponse. - """ - ... - - async def query_spans( - self, - attribute_filters: list[QueryCondition], - attributes_to_return: list[str], - max_depth: int | None = None, - ) -> QuerySpansResponse: - """Query spans. - - :param attribute_filters: The attribute filters to apply to the spans. - :param attributes_to_return: The attributes to return in the spans. - :param max_depth: The maximum depth of the tree. - :returns: A QuerySpansResponse. - """ - ... - - async def save_spans_to_dataset( - self, - attribute_filters: list[QueryCondition], - attributes_to_save: list[str], - dataset_id: str, - max_depth: int | None = None, - ) -> None: - """Save spans to a dataset. - - :param attribute_filters: The attribute filters to apply to the spans. - :param attributes_to_save: The attributes to save to the dataset. - :param dataset_id: The ID of the dataset to save the spans to. - :param max_depth: The maximum depth of the tree. - """ - ... - - async def query_metrics( - self, - metric_name: str, - start_time: int, - end_time: int | None = None, - granularity: str | None = None, - query_type: MetricQueryType = MetricQueryType.RANGE, - label_matchers: list[MetricLabelMatcher] | None = None, - ) -> QueryMetricsResponse: - """Query metrics. - - :param metric_name: The name of the metric to query. - :param start_time: The start time of the metric to query. - :param end_time: The end time of the metric to query. - :param granularity: The granularity of the metric to query. - :param query_type: The type of query to perform. - :param label_matchers: The label matchers to apply to the metric. - :returns: A QueryMetricsResponse. - """ - ... diff --git a/llama_stack/distributions/ci-tests/run.yaml b/llama_stack/distributions/ci-tests/run.yaml index 40f4d8a0a..0a8587328 100644 --- a/llama_stack/distributions/ci-tests/run.yaml +++ b/llama_stack/distributions/ci-tests/run.yaml @@ -159,8 +159,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} post_training: - provider_id: torchtune-cpu diff --git a/llama_stack/distributions/dell/run-with-safety.yaml b/llama_stack/distributions/dell/run-with-safety.yaml index 2c55c7b8c..0196f40c3 100644 --- a/llama_stack/distributions/dell/run-with-safety.yaml +++ b/llama_stack/distributions/dell/run-with-safety.yaml @@ -50,8 +50,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: meta-reference diff --git a/llama_stack/distributions/dell/run.yaml b/llama_stack/distributions/dell/run.yaml index 4779e7607..19b02dc9a 100644 --- a/llama_stack/distributions/dell/run.yaml +++ b/llama_stack/distributions/dell/run.yaml @@ -46,8 +46,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: meta-reference diff --git a/llama_stack/distributions/meta-reference-gpu/run-with-safety.yaml b/llama_stack/distributions/meta-reference-gpu/run-with-safety.yaml index 5c56f34ec..4acd19b38 100644 --- a/llama_stack/distributions/meta-reference-gpu/run-with-safety.yaml +++ b/llama_stack/distributions/meta-reference-gpu/run-with-safety.yaml @@ -61,8 +61,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: meta-reference diff --git a/llama_stack/distributions/meta-reference-gpu/run.yaml b/llama_stack/distributions/meta-reference-gpu/run.yaml index a2ec94454..1d0aa5172 100644 --- a/llama_stack/distributions/meta-reference-gpu/run.yaml +++ b/llama_stack/distributions/meta-reference-gpu/run.yaml @@ -51,8 +51,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: meta-reference diff --git a/llama_stack/distributions/nvidia/run-with-safety.yaml b/llama_stack/distributions/nvidia/run-with-safety.yaml index edd258ee4..3ee15b7c1 100644 --- a/llama_stack/distributions/nvidia/run-with-safety.yaml +++ b/llama_stack/distributions/nvidia/run-with-safety.yaml @@ -53,8 +53,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: nvidia diff --git a/llama_stack/distributions/nvidia/run.yaml b/llama_stack/distributions/nvidia/run.yaml index daa93093b..e947e1e2a 100644 --- a/llama_stack/distributions/nvidia/run.yaml +++ b/llama_stack/distributions/nvidia/run.yaml @@ -48,8 +48,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: nvidia diff --git a/llama_stack/distributions/open-benchmark/run.yaml b/llama_stack/distributions/open-benchmark/run.yaml index 89442d502..ef17a4d3b 100644 --- a/llama_stack/distributions/open-benchmark/run.yaml +++ b/llama_stack/distributions/open-benchmark/run.yaml @@ -81,8 +81,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: meta-reference diff --git a/llama_stack/distributions/starter-gpu/run.yaml b/llama_stack/distributions/starter-gpu/run.yaml index b28121815..05b88f012 100644 --- a/llama_stack/distributions/starter-gpu/run.yaml +++ b/llama_stack/distributions/starter-gpu/run.yaml @@ -159,8 +159,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter-gpu}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} post_training: - provider_id: huggingface-gpu diff --git a/llama_stack/distributions/starter/run.yaml b/llama_stack/distributions/starter/run.yaml index 341b51a97..74bbc6fca 100644 --- a/llama_stack/distributions/starter/run.yaml +++ b/llama_stack/distributions/starter/run.yaml @@ -159,8 +159,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} post_training: - provider_id: torchtune-cpu diff --git a/llama_stack/distributions/watsonx/run.yaml b/llama_stack/distributions/watsonx/run.yaml index aea2189bc..3fc2c9d0e 100644 --- a/llama_stack/distributions/watsonx/run.yaml +++ b/llama_stack/distributions/watsonx/run.yaml @@ -46,8 +46,7 @@ providers: provider_type: inline::meta-reference config: service_name: "${env.OTEL_SERVICE_NAME:=\u200B}" - sinks: ${env.TELEMETRY_SINKS:=sqlite} - sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/watsonx}/trace_store.db + sinks: ${env.TELEMETRY_SINKS:=} otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=} eval: - provider_id: meta-reference diff --git a/llama_stack/providers/inline/telemetry/meta_reference/config.py b/llama_stack/providers/inline/telemetry/meta_reference/config.py index 06420c671..2fa8b244b 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/config.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/config.py @@ -9,13 +9,10 @@ from typing import Any from pydantic import BaseModel, Field, field_validator -from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR - class TelemetrySink(StrEnum): OTEL_TRACE = "otel_trace" OTEL_METRIC = "otel_metric" - SQLITE = "sqlite" CONSOLE = "console" @@ -30,12 +27,8 @@ class TelemetryConfig(BaseModel): description="The service name to use for telemetry", ) sinks: list[TelemetrySink] = Field( - default=[TelemetrySink.SQLITE], - description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console)", - ) - sqlite_db_path: str = Field( - default_factory=lambda: (RUNTIME_BASE_DIR / "trace_store.db").as_posix(), - description="The path to the SQLite database to use for storing traces", + default=[], + description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console)", ) @field_validator("sinks", mode="before") @@ -43,13 +36,12 @@ class TelemetryConfig(BaseModel): def validate_sinks(cls, v): if isinstance(v, str): return [TelemetrySink(sink.strip()) for sink in v.split(",")] - return v + return v or [] @classmethod - def sample_run_config(cls, __distro_dir__: str, db_name: str = "trace_store.db") -> dict[str, Any]: + def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]: return { "service_name": "${env.OTEL_SERVICE_NAME:=\u200b}", - "sinks": "${env.TELEMETRY_SINKS:=sqlite}", - "sqlite_db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name, + "sinks": "${env.TELEMETRY_SINKS:=}", "otel_exporter_otlp_endpoint": "${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}", } diff --git a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py deleted file mode 100644 index 8ab491189..000000000 --- a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py +++ /dev/null @@ -1,190 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import json -import os -import sqlite3 -import threading -from datetime import UTC, datetime - -from opentelemetry.sdk.trace import SpanProcessor -from opentelemetry.trace import Span -from opentelemetry.trace.span import format_span_id, format_trace_id - -from llama_stack.providers.utils.telemetry.tracing import LOCAL_ROOT_SPAN_MARKER - - -class SQLiteSpanProcessor(SpanProcessor): - def __init__(self, conn_string): - """Initialize the SQLite span processor with a connection string.""" - self.conn_string = conn_string - self._local = threading.local() # Thread-local storage for connections - self.setup_database() - - def _get_connection(self): - """Get a thread-local database connection.""" - if not hasattr(self._local, "conn"): - try: - self._local.conn = sqlite3.connect(self.conn_string) - except Exception as e: - print(f"Error connecting to SQLite database: {e}") - raise - return self._local.conn - - def setup_database(self): - """Create the necessary tables if they don't exist.""" - # Create directory if it doesn't exist - os.makedirs(os.path.dirname(self.conn_string), exist_ok=True) - - conn = self._get_connection() - cursor = conn.cursor() - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS traces ( - trace_id TEXT PRIMARY KEY, - service_name TEXT, - root_span_id TEXT, - start_time TIMESTAMP, - end_time TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS spans ( - span_id TEXT PRIMARY KEY, - trace_id TEXT REFERENCES traces(trace_id), - parent_span_id TEXT, - name TEXT, - start_time TIMESTAMP, - end_time TIMESTAMP, - attributes TEXT, - status TEXT, - kind TEXT - ) - """ - ) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS span_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - span_id TEXT REFERENCES spans(span_id), - name TEXT, - timestamp TIMESTAMP, - attributes TEXT - ) - """ - ) - - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS idx_traces_created_at - ON traces(created_at) - """ - ) - - conn.commit() - cursor.close() - - def on_start(self, span: Span, parent_context=None): - """Called when a span starts.""" - pass - - def on_end(self, span: Span): - """Called when a span ends. Export the span data to SQLite.""" - try: - conn = self._get_connection() - cursor = conn.cursor() - - trace_id = format_trace_id(span.get_span_context().trace_id) - span_id = format_span_id(span.get_span_context().span_id) - service_name = span.resource.attributes.get("service.name", "unknown") - - parent_span_id = None - parent_context = span.parent - if parent_context: - parent_span_id = format_span_id(parent_context.span_id) - - # Insert into traces - cursor.execute( - """ - INSERT INTO traces ( - trace_id, service_name, root_span_id, start_time, end_time - ) VALUES (?, ?, ?, ?, ?) - ON CONFLICT(trace_id) DO UPDATE SET - root_span_id = COALESCE(root_span_id, excluded.root_span_id), - start_time = MIN(excluded.start_time, start_time), - end_time = MAX(excluded.end_time, end_time) - """, - ( - trace_id, - service_name, - (span_id if span.attributes.get(LOCAL_ROOT_SPAN_MARKER) else None), - datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(), - datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(), - ), - ) - - # Insert into spans - cursor.execute( - """ - INSERT INTO spans ( - span_id, trace_id, parent_span_id, name, - start_time, end_time, attributes, status, - kind - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - span_id, - trace_id, - parent_span_id, - span.name, - datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(), - datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(), - json.dumps(dict(span.attributes)), - span.status.status_code.name, - span.kind.name, - ), - ) - - for event in span.events: - cursor.execute( - """ - INSERT INTO span_events ( - span_id, name, timestamp, attributes - ) VALUES (?, ?, ?, ?) - """, - ( - span_id, - event.name, - datetime.fromtimestamp(event.timestamp / 1e9, UTC).isoformat(), - json.dumps(dict(event.attributes)), - ), - ) - - conn.commit() - cursor.close() - except Exception as e: - print(f"Error exporting span to SQLite: {e}") - - def shutdown(self): - """Cleanup any resources.""" - # We can't access other threads' connections, so we just close our own - if hasattr(self._local, "conn"): - try: - self._local.conn.close() - except Exception as e: - print(f"Error closing SQLite connection: {e}") - finally: - del self._local.conn - - def force_flush(self, timeout_millis=30000): - """Force export of spans.""" - pass diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index 4d30cbba3..f56609cab 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -4,7 +4,6 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import datetime import threading from typing import Any @@ -22,19 +21,11 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapProp from llama_stack.apis.telemetry import ( Event, MetricEvent, - MetricLabelMatcher, - MetricQueryType, - QueryCondition, - QueryMetricsResponse, - QuerySpanTreeResponse, - QueryTracesResponse, - Span, SpanEndPayload, SpanStartPayload, SpanStatus, StructuredLogEvent, Telemetry, - Trace, UnstructuredLogEvent, ) from llama_stack.core.datatypes import Api @@ -42,11 +33,6 @@ from llama_stack.log import get_logger from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import ( ConsoleSpanProcessor, ) -from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import ( - SQLiteSpanProcessor, -) -from llama_stack.providers.utils.telemetry.dataset_mixin import TelemetryDatasetMixin -from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore from llama_stack.providers.utils.telemetry.tracing import ROOT_SPAN_MARKERS from .config import TelemetryConfig, TelemetrySink @@ -68,7 +54,7 @@ def is_tracing_enabled(tracer): return span.is_recording() -class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): +class TelemetryAdapter(Telemetry): def __init__(self, config: TelemetryConfig, deps: dict[Api, Any]) -> None: self.config = config self.datasetio_api = deps.get(Api.datasetio) @@ -111,15 +97,11 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): metric_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) metrics.set_meter_provider(metric_provider) - if TelemetrySink.SQLITE in self.config.sinks: - trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path)) if TelemetrySink.CONSOLE in self.config.sinks: trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True)) if TelemetrySink.OTEL_METRIC in self.config.sinks: self.meter = metrics.get_meter(__name__) - if TelemetrySink.SQLITE in self.config.sinks: - self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) self._lock = _global_lock @@ -139,47 +121,6 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): else: raise ValueError(f"Unknown event type: {event}") - async def query_metrics( - self, - metric_name: str, - start_time: int, - end_time: int | None = None, - granularity: str | None = None, - query_type: MetricQueryType = MetricQueryType.RANGE, - label_matchers: list[MetricLabelMatcher] | None = None, - ) -> QueryMetricsResponse: - """Query metrics from the telemetry store. - - Args: - metric_name: The name of the metric to query (e.g., "prompt_tokens") - start_time: Start time as Unix timestamp - end_time: End time as Unix timestamp (defaults to now if None) - granularity: Time granularity for aggregation - query_type: Type of query (RANGE or INSTANT) - label_matchers: Label filters to apply - - Returns: - QueryMetricsResponse with metric time series data - """ - # Convert timestamps to datetime objects - start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC) - end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None - - # Use SQLite trace store if available - if hasattr(self, "trace_store") and self.trace_store: - return await self.trace_store.query_metrics( - metric_name=metric_name, - start_time=start_dt, - end_time=end_dt, - granularity=granularity, - query_type=query_type, - label_matchers=label_matchers, - ) - else: - raise ValueError( - f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks" - ) - def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None: with self._lock: # Use global storage instead of instance storage @@ -326,39 +267,3 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): _GLOBAL_STORAGE["active_spans"].pop(span_id, None) else: raise ValueError(f"Unknown structured log event: {event}") - - async def query_traces( - self, - attribute_filters: list[QueryCondition] | None = None, - limit: int | None = 100, - offset: int | None = 0, - order_by: list[str] | None = None, - ) -> QueryTracesResponse: - return QueryTracesResponse( - data=await self.trace_store.query_traces( - attribute_filters=attribute_filters, - limit=limit, - offset=offset, - order_by=order_by, - ) - ) - - async def get_trace(self, trace_id: str) -> Trace: - return await self.trace_store.get_trace(trace_id) - - async def get_span(self, trace_id: str, span_id: str) -> Span: - return await self.trace_store.get_span(trace_id, span_id) - - async def get_span_tree( - self, - span_id: str, - attributes_to_return: list[str] | None = None, - max_depth: int | None = None, - ) -> QuerySpanTreeResponse: - return QuerySpanTreeResponse( - data=await self.trace_store.get_span_tree( - span_id=span_id, - attributes_to_return=attributes_to_return, - max_depth=max_depth, - ) - ) diff --git a/llama_stack/providers/utils/telemetry/dataset_mixin.py b/llama_stack/providers/utils/telemetry/dataset_mixin.py deleted file mode 100644 index fe729a244..000000000 --- a/llama_stack/providers/utils/telemetry/dataset_mixin.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - - -from llama_stack.apis.datasetio import DatasetIO -from llama_stack.apis.telemetry import QueryCondition, QuerySpansResponse, Span - - -class TelemetryDatasetMixin: - """Mixin class that provides dataset-related functionality for telemetry providers.""" - - datasetio_api: DatasetIO | None - - async def save_spans_to_dataset( - self, - attribute_filters: list[QueryCondition], - attributes_to_save: list[str], - dataset_id: str, - max_depth: int | None = None, - ) -> None: - if self.datasetio_api is None: - raise RuntimeError("DatasetIO API not available") - - spans = await self.query_spans( - attribute_filters=attribute_filters, - attributes_to_return=attributes_to_save, - max_depth=max_depth, - ) - - rows = [ - { - "trace_id": span.trace_id, - "span_id": span.span_id, - "parent_span_id": span.parent_span_id, - "name": span.name, - "start_time": span.start_time, - "end_time": span.end_time, - **{attr: span.attributes.get(attr) for attr in attributes_to_save}, - } - for span in spans - ] - - await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows) - - async def query_spans( - self, - attribute_filters: list[QueryCondition], - attributes_to_return: list[str], - max_depth: int | None = None, - ) -> QuerySpansResponse: - traces = await self.query_traces(attribute_filters=attribute_filters) - spans = [] - - for trace in traces.data: - spans_by_id_resp = await self.get_span_tree( - span_id=trace.root_span_id, - attributes_to_return=attributes_to_return, - max_depth=max_depth, - ) - - for span in spans_by_id_resp.data.values(): - if span.attributes and all( - attr in span.attributes and span.attributes[attr] is not None for attr in attributes_to_return - ): - spans.append( - Span( - trace_id=trace.root_span_id, - span_id=span.span_id, - parent_span_id=span.parent_span_id, - name=span.name, - start_time=span.start_time, - end_time=span.end_time, - attributes=span.attributes, - ) - ) - - return QuerySpansResponse(data=spans) diff --git a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py deleted file mode 100644 index 71480364c..000000000 --- a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py +++ /dev/null @@ -1,383 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import json -from datetime import UTC, datetime -from typing import Protocol - -import aiosqlite - -from llama_stack.apis.telemetry import ( - MetricDataPoint, - MetricLabel, - MetricLabelMatcher, - MetricQueryType, - MetricSeries, - QueryCondition, - QueryMetricsResponse, - Span, - SpanWithStatus, - Trace, -) - - -class TraceStore(Protocol): - async def query_traces( - self, - attribute_filters: list[QueryCondition] | None = None, - limit: int | None = 100, - offset: int | None = 0, - order_by: list[str] | None = None, - ) -> list[Trace]: ... - - async def get_span_tree( - self, - span_id: str, - attributes_to_return: list[str] | None = None, - max_depth: int | None = None, - ) -> dict[str, SpanWithStatus]: ... - - async def query_metrics( - self, - metric_name: str, - start_time: datetime, - end_time: datetime | None = None, - granularity: str | None = "1d", - query_type: MetricQueryType = MetricQueryType.RANGE, - label_matchers: list[MetricLabelMatcher] | None = None, - ) -> QueryMetricsResponse: ... - - -class SQLiteTraceStore(TraceStore): - def __init__(self, conn_string: str): - self.conn_string = conn_string - - async def query_metrics( - self, - metric_name: str, - start_time: datetime, - end_time: datetime | None = None, - granularity: str | None = None, - query_type: MetricQueryType = MetricQueryType.RANGE, - label_matchers: list[MetricLabelMatcher] | None = None, - ) -> QueryMetricsResponse: - if end_time is None: - end_time = datetime.now(UTC) - - # Build base query - if query_type == MetricQueryType.INSTANT: - query = """ - SELECT - se.name, - SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value, - json_extract(se.attributes, '$.unit') as unit, - se.attributes - FROM span_events se - WHERE se.name = ? - AND se.timestamp BETWEEN ? AND ? - """ - else: - if granularity: - time_format = self._get_time_format_for_granularity(granularity) - query = f""" - SELECT - se.name, - SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value, - json_extract(se.attributes, '$.unit') as unit, - se.attributes, - strftime('{time_format}', se.timestamp) as bucket_start - FROM span_events se - WHERE se.name = ? - AND se.timestamp BETWEEN ? AND ? - """ - else: - query = """ - SELECT - se.name, - json_extract(se.attributes, '$.value') as value, - json_extract(se.attributes, '$.unit') as unit, - se.attributes, - se.timestamp - FROM span_events se - WHERE se.name = ? - AND se.timestamp BETWEEN ? AND ? - """ - - params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()] - - # Labels that will be attached to the MetricSeries (preserve matcher labels) - all_labels: list[MetricLabel] = [] - matcher_label_names = set() - if label_matchers: - for matcher in label_matchers: - json_path = f"$.{matcher.name}" - if matcher.operator == "=": - query += f" AND json_extract(se.attributes, '{json_path}') = ?" - params.append(matcher.value) - elif matcher.operator == "!=": - query += f" AND json_extract(se.attributes, '{json_path}') != ?" - params.append(matcher.value) - elif matcher.operator == "=~": - query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?" - params.append(f"%{matcher.value}%") - elif matcher.operator == "!~": - query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?" - params.append(f"%{matcher.value}%") - # Preserve filter context in output - all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value))) - matcher_label_names.add(matcher.name) - - # GROUP BY / ORDER BY logic - if query_type == MetricQueryType.RANGE and granularity: - group_time_format = self._get_time_format_for_granularity(granularity) - query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')" - query += " ORDER BY bucket_start" - elif query_type == MetricQueryType.INSTANT: - query += " GROUP BY json_extract(se.attributes, '$.unit')" - else: - query += " ORDER BY se.timestamp" - - # Execute query - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, params) as cursor: - rows = await cursor.fetchall() - - if not rows: - return QueryMetricsResponse(data=[]) - - data_points = [] - # We want to add attribute labels, but only those not already present as matcher labels. - attr_label_names = set() - for row in rows: - # Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result. - try: - attributes = json.loads(row["attributes"] or "{}") - except (TypeError, json.JSONDecodeError): - attributes = {} - - value = row["value"] - unit = row["unit"] or "" - - # Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result. - for k, v in attributes.items(): - if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names: - all_labels.append(MetricLabel(name=k, value=str(v))) - attr_label_names.add(k) - - # Determine timestamp - if query_type == MetricQueryType.RANGE and granularity: - try: - bucket_start_raw = row["bucket_start"] - except KeyError as e: - raise ValueError( - "DB did not have a bucket_start time in row when using granularity, this indicates improper formatting" - ) from e - # this value could also be there, but be NULL, I think. - if bucket_start_raw is None: - raise ValueError("bucket_start is None check time format and data") - bucket_start = datetime.fromisoformat(bucket_start_raw) - timestamp = int(bucket_start.timestamp()) - elif query_type == MetricQueryType.INSTANT: - timestamp = int(datetime.now(UTC).timestamp()) - else: - try: - timestamp_raw = row["timestamp"] - except KeyError as e: - raise ValueError( - "DB did not have a timestamp in row, this indicates improper formatting" - ) from e - # this value could also be there, but be NULL, I think. - if timestamp_raw is None: - raise ValueError("timestamp is None check time format and data") - timestamp_iso = datetime.fromisoformat(timestamp_raw) - timestamp = int(timestamp_iso.timestamp()) - - data_points.append( - MetricDataPoint( - timestamp=timestamp, - value=value, - unit=unit, - ) - ) - - metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)] - return QueryMetricsResponse(data=metric_series) - - def _get_time_format_for_granularity(self, granularity: str | None) -> str: - """Get the SQLite strftime format string for a given granularity. - Args: - granularity: Granularity string (e.g., "1m", "5m", "1h", "1d") - Returns: - SQLite strftime format string for the granularity - """ - if granularity is None: - raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation") - - if granularity.endswith("d"): - return "%Y-%m-%d 00:00:00" - elif granularity.endswith("h"): - return "%Y-%m-%d %H:00:00" - elif granularity.endswith("m"): - return "%Y-%m-%d %H:%M:00" - else: - return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps. - - async def query_traces( - self, - attribute_filters: list[QueryCondition] | None = None, - limit: int | None = 100, - offset: int | None = 0, - order_by: list[str] | None = None, - ) -> list[Trace]: - def build_where_clause() -> tuple[str, list]: - if not attribute_filters: - return "", [] - - ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"} - - conditions = [ - f"json_extract(s.attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?" - for condition in attribute_filters - ] - params = [condition.value for condition in attribute_filters] - where_clause = " WHERE " + " AND ".join(conditions) - return where_clause, params - - def build_order_clause() -> str: - if not order_by: - return "" - - order_clauses = [] - for field in order_by: - desc = field.startswith("-") - clean_field = field[1:] if desc else field - order_clauses.append(f"t.{clean_field} {'DESC' if desc else 'ASC'}") - return " ORDER BY " + ", ".join(order_clauses) - - # Build the main query - base_query = """ - WITH matching_traces AS ( - SELECT DISTINCT t.trace_id - FROM traces t - JOIN spans s ON t.trace_id = s.trace_id - {where_clause} - ), - filtered_traces AS ( - SELECT t.trace_id, t.root_span_id, t.start_time, t.end_time - FROM matching_traces mt - JOIN traces t ON mt.trace_id = t.trace_id - LEFT JOIN spans s ON t.trace_id = s.trace_id - {order_clause} - ) - SELECT DISTINCT trace_id, root_span_id, start_time, end_time - FROM filtered_traces - WHERE root_span_id IS NOT NULL - LIMIT {limit} OFFSET {offset} - """ - - where_clause, params = build_where_clause() - query = base_query.format( - where_clause=where_clause, - order_clause=build_order_clause(), - limit=limit, - offset=offset, - ) - - # Execute query and return results - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, params) as cursor: - rows = await cursor.fetchall() - return [ - Trace( - trace_id=row["trace_id"], - root_span_id=row["root_span_id"], - start_time=datetime.fromisoformat(row["start_time"]), - end_time=datetime.fromisoformat(row["end_time"]), - ) - for row in rows - ] - - async def get_span_tree( - self, - span_id: str, - attributes_to_return: list[str] | None = None, - max_depth: int | None = None, - ) -> dict[str, SpanWithStatus]: - # Build the attributes selection - attributes_select = "s.attributes" - if attributes_to_return: - json_object = ", ".join(f"'{key}', json_extract(s.attributes, '$.{key}')" for key in attributes_to_return) - attributes_select = f"json_object({json_object})" - - # SQLite CTE query with filtered attributes - query = f""" - WITH RECURSIVE span_tree AS ( - SELECT s.*, 1 as depth, {attributes_select} as filtered_attributes - FROM spans s - WHERE s.span_id = ? - - UNION ALL - - SELECT s.*, st.depth + 1, {attributes_select} as filtered_attributes - FROM spans s - JOIN span_tree st ON s.parent_span_id = st.span_id - WHERE (? IS NULL OR st.depth < ?) - ) - SELECT * - FROM span_tree - ORDER BY depth, start_time - """ - - spans_by_id = {} - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, (span_id, max_depth, max_depth)) as cursor: - rows = await cursor.fetchall() - - if not rows: - raise ValueError(f"Span {span_id} not found") - - for row in rows: - span = SpanWithStatus( - span_id=row["span_id"], - trace_id=row["trace_id"], - parent_span_id=row["parent_span_id"], - name=row["name"], - start_time=datetime.fromisoformat(row["start_time"]), - end_time=datetime.fromisoformat(row["end_time"]), - attributes=json.loads(row["filtered_attributes"]), - status=row["status"].lower(), - ) - - spans_by_id[span.span_id] = span - - return spans_by_id - - async def get_trace(self, trace_id: str) -> Trace: - query = """ - SELECT * - FROM traces t - WHERE t.trace_id = ? - """ - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, (trace_id,)) as cursor: - row = await cursor.fetchone() - if row is None: - raise ValueError(f"Trace {trace_id} not found") - return Trace(**row) - - async def get_span(self, trace_id: str, span_id: str) -> Span: - query = "SELECT * FROM spans WHERE trace_id = ? AND span_id = ?" - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, (trace_id, span_id)) as cursor: - row = await cursor.fetchone() - if row is None: - raise ValueError(f"Span {span_id} not found") - return Span(**row)