mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-16 06:53:47 +00:00
chore!: BREAKING CHANGE: remove sqlite from telemetry config
# What does this PR do? ## Test Plan
This commit is contained in:
parent
d875e427bf
commit
b7be18f4db
20 changed files with 22 additions and 1023 deletions
|
@ -10,58 +10,8 @@ import TabItem from '@theme/TabItem';
|
|||
|
||||
# Telemetry
|
||||
|
||||
The Llama Stack telemetry system provides comprehensive tracing, metrics, and logging capabilities. It supports multiple sink types including OpenTelemetry, SQLite, and Console output for complete observability of your AI applications.
|
||||
The Llama Stack uses OpenTelemetry to provide comprehensive tracing, metrics, and logging capabilities.
|
||||
|
||||
## Event Types
|
||||
|
||||
The telemetry system supports three main types of events:
|
||||
|
||||
<Tabs>
|
||||
<TabItem value="unstructured" label="Unstructured Logs">
|
||||
|
||||
Free-form log messages with severity levels for general application logging:
|
||||
|
||||
```python
|
||||
unstructured_log_event = UnstructuredLogEvent(
|
||||
message="This is a log message",
|
||||
severity=LogSeverity.INFO
|
||||
)
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="metrics" label="Metric Events">
|
||||
|
||||
Numerical measurements with units for tracking performance and usage:
|
||||
|
||||
```python
|
||||
metric_event = MetricEvent(
|
||||
metric="my_metric",
|
||||
value=10,
|
||||
unit="count"
|
||||
)
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="structured" label="Structured Logs">
|
||||
|
||||
System events like span start/end that provide structured operation tracking:
|
||||
|
||||
```python
|
||||
structured_log_event = SpanStartPayload(
|
||||
name="my_span",
|
||||
parent_span_id="parent_span_id"
|
||||
)
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
## Spans and Traces
|
||||
|
||||
- **Spans**: Represent individual operations with timing information and hierarchical relationships
|
||||
- **Traces**: Collections of related spans that form a complete request flow across your application
|
||||
|
||||
This hierarchical structure allows you to understand the complete execution path of requests through your Llama Stack application.
|
||||
|
||||
## Automatic Metrics Generation
|
||||
|
||||
|
@ -129,21 +79,6 @@ Send events to an OpenTelemetry Collector for integration with observability pla
|
|||
- Compatible with all OpenTelemetry collectors
|
||||
- Supports both traces and metrics
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="sqlite" label="SQLite">
|
||||
|
||||
Store events in a local SQLite database for direct querying:
|
||||
|
||||
**Use Cases:**
|
||||
- Local development and debugging
|
||||
- Custom analytics and reporting
|
||||
- Offline analysis of application behavior
|
||||
|
||||
**Features:**
|
||||
- Direct SQL querying capabilities
|
||||
- Persistent local storage
|
||||
- No external dependencies
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="console" label="Console">
|
||||
|
||||
|
@ -174,9 +109,8 @@ telemetry:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "llama-stack-service"
|
||||
sinks: ['console', 'sqlite', 'otel_trace', 'otel_metric']
|
||||
sinks: ['console', 'otel_trace', 'otel_metric']
|
||||
otel_exporter_otlp_endpoint: "http://localhost:4318"
|
||||
sqlite_db_path: "/path/to/telemetry.db"
|
||||
```
|
||||
|
||||
### Environment Variables
|
||||
|
@ -185,7 +119,7 @@ Configure telemetry behavior using environment variables:
|
|||
|
||||
- **`OTEL_EXPORTER_OTLP_ENDPOINT`**: OpenTelemetry Collector endpoint (default: `http://localhost:4318`)
|
||||
- **`OTEL_SERVICE_NAME`**: Service name for telemetry (default: empty string)
|
||||
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `console,sqlite`)
|
||||
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `[]`)
|
||||
|
||||
### Quick Setup: Complete Telemetry Stack
|
||||
|
||||
|
@ -248,37 +182,10 @@ Forward metrics to other observability systems:
|
|||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
## SQLite Querying
|
||||
|
||||
The `sqlite` sink allows you to query traces without an external system. This is particularly useful for development and custom analytics.
|
||||
|
||||
### Example Queries
|
||||
|
||||
```sql
|
||||
-- Query recent traces
|
||||
SELECT * FROM traces WHERE timestamp > datetime('now', '-1 hour');
|
||||
|
||||
-- Analyze span durations
|
||||
SELECT name, AVG(duration_ms) as avg_duration
|
||||
FROM spans
|
||||
GROUP BY name
|
||||
ORDER BY avg_duration DESC;
|
||||
|
||||
-- Find slow operations
|
||||
SELECT * FROM spans
|
||||
WHERE duration_ms > 1000
|
||||
ORDER BY duration_ms DESC;
|
||||
```
|
||||
|
||||
:::tip[Advanced Analytics]
|
||||
Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stack/blob/main/docs/getting_started.ipynb) for more examples on querying traces and spans programmatically.
|
||||
:::
|
||||
|
||||
## Best Practices
|
||||
|
||||
### 🔍 **Monitoring Strategy**
|
||||
- Use OpenTelemetry for production environments
|
||||
- Combine multiple sinks for development (console + SQLite)
|
||||
- Set up alerts on key metrics like token usage and error rates
|
||||
|
||||
### 📊 **Metrics Analysis**
|
||||
|
@ -293,45 +200,8 @@ Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stac
|
|||
|
||||
### 🔧 **Configuration Management**
|
||||
- Use environment variables for flexible deployment
|
||||
- Configure appropriate retention policies for SQLite
|
||||
- Ensure proper network access to OpenTelemetry collectors
|
||||
|
||||
## Integration Examples
|
||||
|
||||
### Basic Telemetry Setup
|
||||
|
||||
```python
|
||||
from llama_stack_client import LlamaStackClient
|
||||
|
||||
# Client with telemetry headers
|
||||
client = LlamaStackClient(
|
||||
base_url="http://localhost:8000",
|
||||
extra_headers={
|
||||
"X-Telemetry-Service": "my-ai-app",
|
||||
"X-Telemetry-Version": "1.0.0"
|
||||
}
|
||||
)
|
||||
|
||||
# All API calls will be automatically traced
|
||||
response = client.chat.completions.create(
|
||||
model="meta-llama/Llama-3.2-3B-Instruct",
|
||||
messages=[{"role": "user", "content": "Hello!"}]
|
||||
)
|
||||
```
|
||||
|
||||
### Custom Telemetry Context
|
||||
|
||||
```python
|
||||
# Add custom span attributes for better tracking
|
||||
with tracer.start_as_current_span("custom_operation") as span:
|
||||
span.set_attribute("user_id", "user123")
|
||||
span.set_attribute("operation_type", "chat_completion")
|
||||
|
||||
response = client.chat.completions.create(
|
||||
model="meta-llama/Llama-3.2-3B-Instruct",
|
||||
messages=[{"role": "user", "content": "Hello!"}]
|
||||
)
|
||||
```
|
||||
|
||||
## Related Resources
|
||||
|
||||
|
|
|
@ -216,7 +216,6 @@ The starter distribution uses SQLite for local storage of various components:
|
|||
- **Files metadata**: `~/.llama/distributions/starter/files_metadata.db`
|
||||
- **Agents store**: `~/.llama/distributions/starter/agents_store.db`
|
||||
- **Responses store**: `~/.llama/distributions/starter/responses_store.db`
|
||||
- **Trace store**: `~/.llama/distributions/starter/trace_store.db`
|
||||
- **Evaluation store**: `~/.llama/distributions/starter/meta_reference_eval.db`
|
||||
- **Dataset I/O stores**: Various HuggingFace and local filesystem stores
|
||||
|
||||
|
|
|
@ -16,14 +16,12 @@ Meta's reference implementation of telemetry and observability using OpenTelemet
|
|||
|-------|------|----------|---------|-------------|
|
||||
| `otel_exporter_otlp_endpoint` | `str \| None` | No | | The OpenTelemetry collector endpoint URL (base URL for traces, metrics, and logs). If not set, the SDK will use OTEL_EXPORTER_OTLP_ENDPOINT environment variable. |
|
||||
| `service_name` | `<class 'str'>` | No | | The service name to use for telemetry |
|
||||
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [<TelemetrySink.SQLITE: 'sqlite'>] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console) |
|
||||
| `sqlite_db_path` | `<class 'str'>` | No | ~/.llama/runtime/trace_store.db | The path to the SQLite database to use for storing traces |
|
||||
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console) |
|
||||
|
||||
## Sample Configuration
|
||||
|
||||
```yaml
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
```
|
||||
|
|
|
@ -421,104 +421,3 @@ class Telemetry(Protocol):
|
|||
:param ttl_seconds: The time to live of the event.
|
||||
"""
|
||||
...
|
||||
|
||||
async def query_traces(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition] | None = None,
|
||||
limit: int | None = 100,
|
||||
offset: int | None = 0,
|
||||
order_by: list[str] | None = None,
|
||||
) -> QueryTracesResponse:
|
||||
"""Query traces.
|
||||
|
||||
:param attribute_filters: The attribute filters to apply to the traces.
|
||||
:param limit: The limit of traces to return.
|
||||
:param offset: The offset of the traces to return.
|
||||
:param order_by: The order by of the traces to return.
|
||||
:returns: A QueryTracesResponse.
|
||||
"""
|
||||
...
|
||||
|
||||
async def get_trace(self, trace_id: str) -> Trace:
|
||||
"""Get a trace by its ID.
|
||||
|
||||
:param trace_id: The ID of the trace to get.
|
||||
:returns: A Trace.
|
||||
"""
|
||||
...
|
||||
|
||||
async def get_span(self, trace_id: str, span_id: str) -> Span:
|
||||
"""Get a span by its ID.
|
||||
|
||||
:param trace_id: The ID of the trace to get the span from.
|
||||
:param span_id: The ID of the span to get.
|
||||
:returns: A Span.
|
||||
"""
|
||||
...
|
||||
|
||||
async def get_span_tree(
|
||||
self,
|
||||
span_id: str,
|
||||
attributes_to_return: list[str] | None = None,
|
||||
max_depth: int | None = None,
|
||||
) -> QuerySpanTreeResponse:
|
||||
"""Get a span tree by its ID.
|
||||
|
||||
:param span_id: The ID of the span to get the tree from.
|
||||
:param attributes_to_return: The attributes to return in the tree.
|
||||
:param max_depth: The maximum depth of the tree.
|
||||
:returns: A QuerySpanTreeResponse.
|
||||
"""
|
||||
...
|
||||
|
||||
async def query_spans(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition],
|
||||
attributes_to_return: list[str],
|
||||
max_depth: int | None = None,
|
||||
) -> QuerySpansResponse:
|
||||
"""Query spans.
|
||||
|
||||
:param attribute_filters: The attribute filters to apply to the spans.
|
||||
:param attributes_to_return: The attributes to return in the spans.
|
||||
:param max_depth: The maximum depth of the tree.
|
||||
:returns: A QuerySpansResponse.
|
||||
"""
|
||||
...
|
||||
|
||||
async def save_spans_to_dataset(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition],
|
||||
attributes_to_save: list[str],
|
||||
dataset_id: str,
|
||||
max_depth: int | None = None,
|
||||
) -> None:
|
||||
"""Save spans to a dataset.
|
||||
|
||||
:param attribute_filters: The attribute filters to apply to the spans.
|
||||
:param attributes_to_save: The attributes to save to the dataset.
|
||||
:param dataset_id: The ID of the dataset to save the spans to.
|
||||
:param max_depth: The maximum depth of the tree.
|
||||
"""
|
||||
...
|
||||
|
||||
async def query_metrics(
|
||||
self,
|
||||
metric_name: str,
|
||||
start_time: int,
|
||||
end_time: int | None = None,
|
||||
granularity: str | None = None,
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse:
|
||||
"""Query metrics.
|
||||
|
||||
:param metric_name: The name of the metric to query.
|
||||
:param start_time: The start time of the metric to query.
|
||||
:param end_time: The end time of the metric to query.
|
||||
:param granularity: The granularity of the metric to query.
|
||||
:param query_type: The type of query to perform.
|
||||
:param label_matchers: The label matchers to apply to the metric.
|
||||
:returns: A QueryMetricsResponse.
|
||||
"""
|
||||
...
|
||||
|
|
|
@ -159,8 +159,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
post_training:
|
||||
- provider_id: torchtune-cpu
|
||||
|
|
|
@ -50,8 +50,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: meta-reference
|
||||
|
|
|
@ -46,8 +46,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: meta-reference
|
||||
|
|
|
@ -61,8 +61,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: meta-reference
|
||||
|
|
|
@ -51,8 +51,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: meta-reference
|
||||
|
|
|
@ -53,8 +53,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: nvidia
|
||||
|
|
|
@ -48,8 +48,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: nvidia
|
||||
|
|
|
@ -81,8 +81,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: meta-reference
|
||||
|
|
|
@ -159,8 +159,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter-gpu}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
post_training:
|
||||
- provider_id: huggingface-gpu
|
||||
|
|
|
@ -159,8 +159,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
post_training:
|
||||
- provider_id: torchtune-cpu
|
||||
|
|
|
@ -46,8 +46,7 @@ providers:
|
|||
provider_type: inline::meta-reference
|
||||
config:
|
||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/watsonx}/trace_store.db
|
||||
sinks: ${env.TELEMETRY_SINKS:=}
|
||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||
eval:
|
||||
- provider_id: meta-reference
|
||||
|
|
|
@ -9,13 +9,10 @@ from typing import Any
|
|||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
|
||||
|
||||
|
||||
class TelemetrySink(StrEnum):
|
||||
OTEL_TRACE = "otel_trace"
|
||||
OTEL_METRIC = "otel_metric"
|
||||
SQLITE = "sqlite"
|
||||
CONSOLE = "console"
|
||||
|
||||
|
||||
|
@ -30,12 +27,8 @@ class TelemetryConfig(BaseModel):
|
|||
description="The service name to use for telemetry",
|
||||
)
|
||||
sinks: list[TelemetrySink] = Field(
|
||||
default=[TelemetrySink.SQLITE],
|
||||
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console)",
|
||||
)
|
||||
sqlite_db_path: str = Field(
|
||||
default_factory=lambda: (RUNTIME_BASE_DIR / "trace_store.db").as_posix(),
|
||||
description="The path to the SQLite database to use for storing traces",
|
||||
default=[],
|
||||
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console)",
|
||||
)
|
||||
|
||||
@field_validator("sinks", mode="before")
|
||||
|
@ -43,13 +36,12 @@ class TelemetryConfig(BaseModel):
|
|||
def validate_sinks(cls, v):
|
||||
if isinstance(v, str):
|
||||
return [TelemetrySink(sink.strip()) for sink in v.split(",")]
|
||||
return v
|
||||
return v or []
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, __distro_dir__: str, db_name: str = "trace_store.db") -> dict[str, Any]:
|
||||
def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]:
|
||||
return {
|
||||
"service_name": "${env.OTEL_SERVICE_NAME:=\u200b}",
|
||||
"sinks": "${env.TELEMETRY_SINKS:=sqlite}",
|
||||
"sqlite_db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
|
||||
"sinks": "${env.TELEMETRY_SINKS:=}",
|
||||
"otel_exporter_otlp_endpoint": "${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}",
|
||||
}
|
||||
|
|
|
@ -1,190 +0,0 @@
|
|||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import threading
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from opentelemetry.sdk.trace import SpanProcessor
|
||||
from opentelemetry.trace import Span
|
||||
from opentelemetry.trace.span import format_span_id, format_trace_id
|
||||
|
||||
from llama_stack.providers.utils.telemetry.tracing import LOCAL_ROOT_SPAN_MARKER
|
||||
|
||||
|
||||
class SQLiteSpanProcessor(SpanProcessor):
|
||||
def __init__(self, conn_string):
|
||||
"""Initialize the SQLite span processor with a connection string."""
|
||||
self.conn_string = conn_string
|
||||
self._local = threading.local() # Thread-local storage for connections
|
||||
self.setup_database()
|
||||
|
||||
def _get_connection(self):
|
||||
"""Get a thread-local database connection."""
|
||||
if not hasattr(self._local, "conn"):
|
||||
try:
|
||||
self._local.conn = sqlite3.connect(self.conn_string)
|
||||
except Exception as e:
|
||||
print(f"Error connecting to SQLite database: {e}")
|
||||
raise
|
||||
return self._local.conn
|
||||
|
||||
def setup_database(self):
|
||||
"""Create the necessary tables if they don't exist."""
|
||||
# Create directory if it doesn't exist
|
||||
os.makedirs(os.path.dirname(self.conn_string), exist_ok=True)
|
||||
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS traces (
|
||||
trace_id TEXT PRIMARY KEY,
|
||||
service_name TEXT,
|
||||
root_span_id TEXT,
|
||||
start_time TIMESTAMP,
|
||||
end_time TIMESTAMP,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS spans (
|
||||
span_id TEXT PRIMARY KEY,
|
||||
trace_id TEXT REFERENCES traces(trace_id),
|
||||
parent_span_id TEXT,
|
||||
name TEXT,
|
||||
start_time TIMESTAMP,
|
||||
end_time TIMESTAMP,
|
||||
attributes TEXT,
|
||||
status TEXT,
|
||||
kind TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS span_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
span_id TEXT REFERENCES spans(span_id),
|
||||
name TEXT,
|
||||
timestamp TIMESTAMP,
|
||||
attributes TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_traces_created_at
|
||||
ON traces(created_at)
|
||||
"""
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
def on_start(self, span: Span, parent_context=None):
|
||||
"""Called when a span starts."""
|
||||
pass
|
||||
|
||||
def on_end(self, span: Span):
|
||||
"""Called when a span ends. Export the span data to SQLite."""
|
||||
try:
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
trace_id = format_trace_id(span.get_span_context().trace_id)
|
||||
span_id = format_span_id(span.get_span_context().span_id)
|
||||
service_name = span.resource.attributes.get("service.name", "unknown")
|
||||
|
||||
parent_span_id = None
|
||||
parent_context = span.parent
|
||||
if parent_context:
|
||||
parent_span_id = format_span_id(parent_context.span_id)
|
||||
|
||||
# Insert into traces
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO traces (
|
||||
trace_id, service_name, root_span_id, start_time, end_time
|
||||
) VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(trace_id) DO UPDATE SET
|
||||
root_span_id = COALESCE(root_span_id, excluded.root_span_id),
|
||||
start_time = MIN(excluded.start_time, start_time),
|
||||
end_time = MAX(excluded.end_time, end_time)
|
||||
""",
|
||||
(
|
||||
trace_id,
|
||||
service_name,
|
||||
(span_id if span.attributes.get(LOCAL_ROOT_SPAN_MARKER) else None),
|
||||
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
|
||||
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
|
||||
),
|
||||
)
|
||||
|
||||
# Insert into spans
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO spans (
|
||||
span_id, trace_id, parent_span_id, name,
|
||||
start_time, end_time, attributes, status,
|
||||
kind
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
span_id,
|
||||
trace_id,
|
||||
parent_span_id,
|
||||
span.name,
|
||||
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
|
||||
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
|
||||
json.dumps(dict(span.attributes)),
|
||||
span.status.status_code.name,
|
||||
span.kind.name,
|
||||
),
|
||||
)
|
||||
|
||||
for event in span.events:
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO span_events (
|
||||
span_id, name, timestamp, attributes
|
||||
) VALUES (?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
span_id,
|
||||
event.name,
|
||||
datetime.fromtimestamp(event.timestamp / 1e9, UTC).isoformat(),
|
||||
json.dumps(dict(event.attributes)),
|
||||
),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
except Exception as e:
|
||||
print(f"Error exporting span to SQLite: {e}")
|
||||
|
||||
def shutdown(self):
|
||||
"""Cleanup any resources."""
|
||||
# We can't access other threads' connections, so we just close our own
|
||||
if hasattr(self._local, "conn"):
|
||||
try:
|
||||
self._local.conn.close()
|
||||
except Exception as e:
|
||||
print(f"Error closing SQLite connection: {e}")
|
||||
finally:
|
||||
del self._local.conn
|
||||
|
||||
def force_flush(self, timeout_millis=30000):
|
||||
"""Force export of spans."""
|
||||
pass
|
|
@ -4,7 +4,6 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import datetime
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
|
@ -22,19 +21,11 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapProp
|
|||
from llama_stack.apis.telemetry import (
|
||||
Event,
|
||||
MetricEvent,
|
||||
MetricLabelMatcher,
|
||||
MetricQueryType,
|
||||
QueryCondition,
|
||||
QueryMetricsResponse,
|
||||
QuerySpanTreeResponse,
|
||||
QueryTracesResponse,
|
||||
Span,
|
||||
SpanEndPayload,
|
||||
SpanStartPayload,
|
||||
SpanStatus,
|
||||
StructuredLogEvent,
|
||||
Telemetry,
|
||||
Trace,
|
||||
UnstructuredLogEvent,
|
||||
)
|
||||
from llama_stack.core.datatypes import Api
|
||||
|
@ -42,11 +33,6 @@ from llama_stack.log import get_logger
|
|||
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
|
||||
ConsoleSpanProcessor,
|
||||
)
|
||||
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
|
||||
SQLiteSpanProcessor,
|
||||
)
|
||||
from llama_stack.providers.utils.telemetry.dataset_mixin import TelemetryDatasetMixin
|
||||
from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore
|
||||
from llama_stack.providers.utils.telemetry.tracing import ROOT_SPAN_MARKERS
|
||||
|
||||
from .config import TelemetryConfig, TelemetrySink
|
||||
|
@ -68,7 +54,7 @@ def is_tracing_enabled(tracer):
|
|||
return span.is_recording()
|
||||
|
||||
|
||||
class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||
class TelemetryAdapter(Telemetry):
|
||||
def __init__(self, config: TelemetryConfig, deps: dict[Api, Any]) -> None:
|
||||
self.config = config
|
||||
self.datasetio_api = deps.get(Api.datasetio)
|
||||
|
@ -111,15 +97,11 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
|||
metric_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
|
||||
metrics.set_meter_provider(metric_provider)
|
||||
|
||||
if TelemetrySink.SQLITE in self.config.sinks:
|
||||
trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path))
|
||||
if TelemetrySink.CONSOLE in self.config.sinks:
|
||||
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True))
|
||||
|
||||
if TelemetrySink.OTEL_METRIC in self.config.sinks:
|
||||
self.meter = metrics.get_meter(__name__)
|
||||
if TelemetrySink.SQLITE in self.config.sinks:
|
||||
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path)
|
||||
|
||||
self._lock = _global_lock
|
||||
|
||||
|
@ -139,47 +121,6 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
|||
else:
|
||||
raise ValueError(f"Unknown event type: {event}")
|
||||
|
||||
async def query_metrics(
|
||||
self,
|
||||
metric_name: str,
|
||||
start_time: int,
|
||||
end_time: int | None = None,
|
||||
granularity: str | None = None,
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse:
|
||||
"""Query metrics from the telemetry store.
|
||||
|
||||
Args:
|
||||
metric_name: The name of the metric to query (e.g., "prompt_tokens")
|
||||
start_time: Start time as Unix timestamp
|
||||
end_time: End time as Unix timestamp (defaults to now if None)
|
||||
granularity: Time granularity for aggregation
|
||||
query_type: Type of query (RANGE or INSTANT)
|
||||
label_matchers: Label filters to apply
|
||||
|
||||
Returns:
|
||||
QueryMetricsResponse with metric time series data
|
||||
"""
|
||||
# Convert timestamps to datetime objects
|
||||
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
|
||||
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None
|
||||
|
||||
# Use SQLite trace store if available
|
||||
if hasattr(self, "trace_store") and self.trace_store:
|
||||
return await self.trace_store.query_metrics(
|
||||
metric_name=metric_name,
|
||||
start_time=start_dt,
|
||||
end_time=end_dt,
|
||||
granularity=granularity,
|
||||
query_type=query_type,
|
||||
label_matchers=label_matchers,
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
|
||||
)
|
||||
|
||||
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
|
||||
with self._lock:
|
||||
# Use global storage instead of instance storage
|
||||
|
@ -326,39 +267,3 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
|||
_GLOBAL_STORAGE["active_spans"].pop(span_id, None)
|
||||
else:
|
||||
raise ValueError(f"Unknown structured log event: {event}")
|
||||
|
||||
async def query_traces(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition] | None = None,
|
||||
limit: int | None = 100,
|
||||
offset: int | None = 0,
|
||||
order_by: list[str] | None = None,
|
||||
) -> QueryTracesResponse:
|
||||
return QueryTracesResponse(
|
||||
data=await self.trace_store.query_traces(
|
||||
attribute_filters=attribute_filters,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
order_by=order_by,
|
||||
)
|
||||
)
|
||||
|
||||
async def get_trace(self, trace_id: str) -> Trace:
|
||||
return await self.trace_store.get_trace(trace_id)
|
||||
|
||||
async def get_span(self, trace_id: str, span_id: str) -> Span:
|
||||
return await self.trace_store.get_span(trace_id, span_id)
|
||||
|
||||
async def get_span_tree(
|
||||
self,
|
||||
span_id: str,
|
||||
attributes_to_return: list[str] | None = None,
|
||||
max_depth: int | None = None,
|
||||
) -> QuerySpanTreeResponse:
|
||||
return QuerySpanTreeResponse(
|
||||
data=await self.trace_store.get_span_tree(
|
||||
span_id=span_id,
|
||||
attributes_to_return=attributes_to_return,
|
||||
max_depth=max_depth,
|
||||
)
|
||||
)
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
|
||||
from llama_stack.apis.datasetio import DatasetIO
|
||||
from llama_stack.apis.telemetry import QueryCondition, QuerySpansResponse, Span
|
||||
|
||||
|
||||
class TelemetryDatasetMixin:
|
||||
"""Mixin class that provides dataset-related functionality for telemetry providers."""
|
||||
|
||||
datasetio_api: DatasetIO | None
|
||||
|
||||
async def save_spans_to_dataset(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition],
|
||||
attributes_to_save: list[str],
|
||||
dataset_id: str,
|
||||
max_depth: int | None = None,
|
||||
) -> None:
|
||||
if self.datasetio_api is None:
|
||||
raise RuntimeError("DatasetIO API not available")
|
||||
|
||||
spans = await self.query_spans(
|
||||
attribute_filters=attribute_filters,
|
||||
attributes_to_return=attributes_to_save,
|
||||
max_depth=max_depth,
|
||||
)
|
||||
|
||||
rows = [
|
||||
{
|
||||
"trace_id": span.trace_id,
|
||||
"span_id": span.span_id,
|
||||
"parent_span_id": span.parent_span_id,
|
||||
"name": span.name,
|
||||
"start_time": span.start_time,
|
||||
"end_time": span.end_time,
|
||||
**{attr: span.attributes.get(attr) for attr in attributes_to_save},
|
||||
}
|
||||
for span in spans
|
||||
]
|
||||
|
||||
await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows)
|
||||
|
||||
async def query_spans(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition],
|
||||
attributes_to_return: list[str],
|
||||
max_depth: int | None = None,
|
||||
) -> QuerySpansResponse:
|
||||
traces = await self.query_traces(attribute_filters=attribute_filters)
|
||||
spans = []
|
||||
|
||||
for trace in traces.data:
|
||||
spans_by_id_resp = await self.get_span_tree(
|
||||
span_id=trace.root_span_id,
|
||||
attributes_to_return=attributes_to_return,
|
||||
max_depth=max_depth,
|
||||
)
|
||||
|
||||
for span in spans_by_id_resp.data.values():
|
||||
if span.attributes and all(
|
||||
attr in span.attributes and span.attributes[attr] is not None for attr in attributes_to_return
|
||||
):
|
||||
spans.append(
|
||||
Span(
|
||||
trace_id=trace.root_span_id,
|
||||
span_id=span.span_id,
|
||||
parent_span_id=span.parent_span_id,
|
||||
name=span.name,
|
||||
start_time=span.start_time,
|
||||
end_time=span.end_time,
|
||||
attributes=span.attributes,
|
||||
)
|
||||
)
|
||||
|
||||
return QuerySpansResponse(data=spans)
|
|
@ -1,383 +0,0 @@
|
|||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from typing import Protocol
|
||||
|
||||
import aiosqlite
|
||||
|
||||
from llama_stack.apis.telemetry import (
|
||||
MetricDataPoint,
|
||||
MetricLabel,
|
||||
MetricLabelMatcher,
|
||||
MetricQueryType,
|
||||
MetricSeries,
|
||||
QueryCondition,
|
||||
QueryMetricsResponse,
|
||||
Span,
|
||||
SpanWithStatus,
|
||||
Trace,
|
||||
)
|
||||
|
||||
|
||||
class TraceStore(Protocol):
|
||||
async def query_traces(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition] | None = None,
|
||||
limit: int | None = 100,
|
||||
offset: int | None = 0,
|
||||
order_by: list[str] | None = None,
|
||||
) -> list[Trace]: ...
|
||||
|
||||
async def get_span_tree(
|
||||
self,
|
||||
span_id: str,
|
||||
attributes_to_return: list[str] | None = None,
|
||||
max_depth: int | None = None,
|
||||
) -> dict[str, SpanWithStatus]: ...
|
||||
|
||||
async def query_metrics(
|
||||
self,
|
||||
metric_name: str,
|
||||
start_time: datetime,
|
||||
end_time: datetime | None = None,
|
||||
granularity: str | None = "1d",
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse: ...
|
||||
|
||||
|
||||
class SQLiteTraceStore(TraceStore):
|
||||
def __init__(self, conn_string: str):
|
||||
self.conn_string = conn_string
|
||||
|
||||
async def query_metrics(
|
||||
self,
|
||||
metric_name: str,
|
||||
start_time: datetime,
|
||||
end_time: datetime | None = None,
|
||||
granularity: str | None = None,
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse:
|
||||
if end_time is None:
|
||||
end_time = datetime.now(UTC)
|
||||
|
||||
# Build base query
|
||||
if query_type == MetricQueryType.INSTANT:
|
||||
query = """
|
||||
SELECT
|
||||
se.name,
|
||||
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
|
||||
json_extract(se.attributes, '$.unit') as unit,
|
||||
se.attributes
|
||||
FROM span_events se
|
||||
WHERE se.name = ?
|
||||
AND se.timestamp BETWEEN ? AND ?
|
||||
"""
|
||||
else:
|
||||
if granularity:
|
||||
time_format = self._get_time_format_for_granularity(granularity)
|
||||
query = f"""
|
||||
SELECT
|
||||
se.name,
|
||||
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
|
||||
json_extract(se.attributes, '$.unit') as unit,
|
||||
se.attributes,
|
||||
strftime('{time_format}', se.timestamp) as bucket_start
|
||||
FROM span_events se
|
||||
WHERE se.name = ?
|
||||
AND se.timestamp BETWEEN ? AND ?
|
||||
"""
|
||||
else:
|
||||
query = """
|
||||
SELECT
|
||||
se.name,
|
||||
json_extract(se.attributes, '$.value') as value,
|
||||
json_extract(se.attributes, '$.unit') as unit,
|
||||
se.attributes,
|
||||
se.timestamp
|
||||
FROM span_events se
|
||||
WHERE se.name = ?
|
||||
AND se.timestamp BETWEEN ? AND ?
|
||||
"""
|
||||
|
||||
params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()]
|
||||
|
||||
# Labels that will be attached to the MetricSeries (preserve matcher labels)
|
||||
all_labels: list[MetricLabel] = []
|
||||
matcher_label_names = set()
|
||||
if label_matchers:
|
||||
for matcher in label_matchers:
|
||||
json_path = f"$.{matcher.name}"
|
||||
if matcher.operator == "=":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') = ?"
|
||||
params.append(matcher.value)
|
||||
elif matcher.operator == "!=":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') != ?"
|
||||
params.append(matcher.value)
|
||||
elif matcher.operator == "=~":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?"
|
||||
params.append(f"%{matcher.value}%")
|
||||
elif matcher.operator == "!~":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?"
|
||||
params.append(f"%{matcher.value}%")
|
||||
# Preserve filter context in output
|
||||
all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value)))
|
||||
matcher_label_names.add(matcher.name)
|
||||
|
||||
# GROUP BY / ORDER BY logic
|
||||
if query_type == MetricQueryType.RANGE and granularity:
|
||||
group_time_format = self._get_time_format_for_granularity(granularity)
|
||||
query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')"
|
||||
query += " ORDER BY bucket_start"
|
||||
elif query_type == MetricQueryType.INSTANT:
|
||||
query += " GROUP BY json_extract(se.attributes, '$.unit')"
|
||||
else:
|
||||
query += " ORDER BY se.timestamp"
|
||||
|
||||
# Execute query
|
||||
async with aiosqlite.connect(self.conn_string) as conn:
|
||||
conn.row_factory = aiosqlite.Row
|
||||
async with conn.execute(query, params) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
return QueryMetricsResponse(data=[])
|
||||
|
||||
data_points = []
|
||||
# We want to add attribute labels, but only those not already present as matcher labels.
|
||||
attr_label_names = set()
|
||||
for row in rows:
|
||||
# Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
|
||||
try:
|
||||
attributes = json.loads(row["attributes"] or "{}")
|
||||
except (TypeError, json.JSONDecodeError):
|
||||
attributes = {}
|
||||
|
||||
value = row["value"]
|
||||
unit = row["unit"] or ""
|
||||
|
||||
# Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
|
||||
for k, v in attributes.items():
|
||||
if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names:
|
||||
all_labels.append(MetricLabel(name=k, value=str(v)))
|
||||
attr_label_names.add(k)
|
||||
|
||||
# Determine timestamp
|
||||
if query_type == MetricQueryType.RANGE and granularity:
|
||||
try:
|
||||
bucket_start_raw = row["bucket_start"]
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
"DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
|
||||
) from e
|
||||
# this value could also be there, but be NULL, I think.
|
||||
if bucket_start_raw is None:
|
||||
raise ValueError("bucket_start is None check time format and data")
|
||||
bucket_start = datetime.fromisoformat(bucket_start_raw)
|
||||
timestamp = int(bucket_start.timestamp())
|
||||
elif query_type == MetricQueryType.INSTANT:
|
||||
timestamp = int(datetime.now(UTC).timestamp())
|
||||
else:
|
||||
try:
|
||||
timestamp_raw = row["timestamp"]
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
"DB did not have a timestamp in row, this indicates improper formatting"
|
||||
) from e
|
||||
# this value could also be there, but be NULL, I think.
|
||||
if timestamp_raw is None:
|
||||
raise ValueError("timestamp is None check time format and data")
|
||||
timestamp_iso = datetime.fromisoformat(timestamp_raw)
|
||||
timestamp = int(timestamp_iso.timestamp())
|
||||
|
||||
data_points.append(
|
||||
MetricDataPoint(
|
||||
timestamp=timestamp,
|
||||
value=value,
|
||||
unit=unit,
|
||||
)
|
||||
)
|
||||
|
||||
metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)]
|
||||
return QueryMetricsResponse(data=metric_series)
|
||||
|
||||
def _get_time_format_for_granularity(self, granularity: str | None) -> str:
|
||||
"""Get the SQLite strftime format string for a given granularity.
|
||||
Args:
|
||||
granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
|
||||
Returns:
|
||||
SQLite strftime format string for the granularity
|
||||
"""
|
||||
if granularity is None:
|
||||
raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation")
|
||||
|
||||
if granularity.endswith("d"):
|
||||
return "%Y-%m-%d 00:00:00"
|
||||
elif granularity.endswith("h"):
|
||||
return "%Y-%m-%d %H:00:00"
|
||||
elif granularity.endswith("m"):
|
||||
return "%Y-%m-%d %H:%M:00"
|
||||
else:
|
||||
return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
|
||||
|
||||
async def query_traces(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition] | None = None,
|
||||
limit: int | None = 100,
|
||||
offset: int | None = 0,
|
||||
order_by: list[str] | None = None,
|
||||
) -> list[Trace]:
|
||||
def build_where_clause() -> tuple[str, list]:
|
||||
if not attribute_filters:
|
||||
return "", []
|
||||
|
||||
ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"}
|
||||
|
||||
conditions = [
|
||||
f"json_extract(s.attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?"
|
||||
for condition in attribute_filters
|
||||
]
|
||||
params = [condition.value for condition in attribute_filters]
|
||||
where_clause = " WHERE " + " AND ".join(conditions)
|
||||
return where_clause, params
|
||||
|
||||
def build_order_clause() -> str:
|
||||
if not order_by:
|
||||
return ""
|
||||
|
||||
order_clauses = []
|
||||
for field in order_by:
|
||||
desc = field.startswith("-")
|
||||
clean_field = field[1:] if desc else field
|
||||
order_clauses.append(f"t.{clean_field} {'DESC' if desc else 'ASC'}")
|
||||
return " ORDER BY " + ", ".join(order_clauses)
|
||||
|
||||
# Build the main query
|
||||
base_query = """
|
||||
WITH matching_traces AS (
|
||||
SELECT DISTINCT t.trace_id
|
||||
FROM traces t
|
||||
JOIN spans s ON t.trace_id = s.trace_id
|
||||
{where_clause}
|
||||
),
|
||||
filtered_traces AS (
|
||||
SELECT t.trace_id, t.root_span_id, t.start_time, t.end_time
|
||||
FROM matching_traces mt
|
||||
JOIN traces t ON mt.trace_id = t.trace_id
|
||||
LEFT JOIN spans s ON t.trace_id = s.trace_id
|
||||
{order_clause}
|
||||
)
|
||||
SELECT DISTINCT trace_id, root_span_id, start_time, end_time
|
||||
FROM filtered_traces
|
||||
WHERE root_span_id IS NOT NULL
|
||||
LIMIT {limit} OFFSET {offset}
|
||||
"""
|
||||
|
||||
where_clause, params = build_where_clause()
|
||||
query = base_query.format(
|
||||
where_clause=where_clause,
|
||||
order_clause=build_order_clause(),
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
# Execute query and return results
|
||||
async with aiosqlite.connect(self.conn_string) as conn:
|
||||
conn.row_factory = aiosqlite.Row
|
||||
async with conn.execute(query, params) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [
|
||||
Trace(
|
||||
trace_id=row["trace_id"],
|
||||
root_span_id=row["root_span_id"],
|
||||
start_time=datetime.fromisoformat(row["start_time"]),
|
||||
end_time=datetime.fromisoformat(row["end_time"]),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
async def get_span_tree(
|
||||
self,
|
||||
span_id: str,
|
||||
attributes_to_return: list[str] | None = None,
|
||||
max_depth: int | None = None,
|
||||
) -> dict[str, SpanWithStatus]:
|
||||
# Build the attributes selection
|
||||
attributes_select = "s.attributes"
|
||||
if attributes_to_return:
|
||||
json_object = ", ".join(f"'{key}', json_extract(s.attributes, '$.{key}')" for key in attributes_to_return)
|
||||
attributes_select = f"json_object({json_object})"
|
||||
|
||||
# SQLite CTE query with filtered attributes
|
||||
query = f"""
|
||||
WITH RECURSIVE span_tree AS (
|
||||
SELECT s.*, 1 as depth, {attributes_select} as filtered_attributes
|
||||
FROM spans s
|
||||
WHERE s.span_id = ?
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT s.*, st.depth + 1, {attributes_select} as filtered_attributes
|
||||
FROM spans s
|
||||
JOIN span_tree st ON s.parent_span_id = st.span_id
|
||||
WHERE (? IS NULL OR st.depth < ?)
|
||||
)
|
||||
SELECT *
|
||||
FROM span_tree
|
||||
ORDER BY depth, start_time
|
||||
"""
|
||||
|
||||
spans_by_id = {}
|
||||
async with aiosqlite.connect(self.conn_string) as conn:
|
||||
conn.row_factory = aiosqlite.Row
|
||||
async with conn.execute(query, (span_id, max_depth, max_depth)) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
raise ValueError(f"Span {span_id} not found")
|
||||
|
||||
for row in rows:
|
||||
span = SpanWithStatus(
|
||||
span_id=row["span_id"],
|
||||
trace_id=row["trace_id"],
|
||||
parent_span_id=row["parent_span_id"],
|
||||
name=row["name"],
|
||||
start_time=datetime.fromisoformat(row["start_time"]),
|
||||
end_time=datetime.fromisoformat(row["end_time"]),
|
||||
attributes=json.loads(row["filtered_attributes"]),
|
||||
status=row["status"].lower(),
|
||||
)
|
||||
|
||||
spans_by_id[span.span_id] = span
|
||||
|
||||
return spans_by_id
|
||||
|
||||
async def get_trace(self, trace_id: str) -> Trace:
|
||||
query = """
|
||||
SELECT *
|
||||
FROM traces t
|
||||
WHERE t.trace_id = ?
|
||||
"""
|
||||
async with aiosqlite.connect(self.conn_string) as conn:
|
||||
conn.row_factory = aiosqlite.Row
|
||||
async with conn.execute(query, (trace_id,)) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
raise ValueError(f"Trace {trace_id} not found")
|
||||
return Trace(**row)
|
||||
|
||||
async def get_span(self, trace_id: str, span_id: str) -> Span:
|
||||
query = "SELECT * FROM spans WHERE trace_id = ? AND span_id = ?"
|
||||
async with aiosqlite.connect(self.conn_string) as conn:
|
||||
conn.row_factory = aiosqlite.Row
|
||||
async with conn.execute(query, (trace_id, span_id)) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
raise ValueError(f"Span {span_id} not found")
|
||||
return Span(**row)
|
Loading…
Add table
Add a link
Reference in a new issue