mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-03 18:00:36 +00:00
chore!: BREAKING CHANGE: remove sqlite from telemetry config (#3808)
# What does this PR do? - Removed sqlite sink from telemetry config. - Removed related code - Updated doc related to telemetry ## Test Plan CI
This commit is contained in:
parent
0a96a7faa5
commit
6ba9db3929
21 changed files with 26 additions and 1026 deletions
|
|
@ -10,58 +10,8 @@ import TabItem from '@theme/TabItem';
|
||||||
|
|
||||||
# Telemetry
|
# Telemetry
|
||||||
|
|
||||||
The Llama Stack telemetry system provides comprehensive tracing, metrics, and logging capabilities. It supports multiple sink types including OpenTelemetry, SQLite, and Console output for complete observability of your AI applications.
|
The Llama Stack uses OpenTelemetry to provide comprehensive tracing, metrics, and logging capabilities.
|
||||||
|
|
||||||
## Event Types
|
|
||||||
|
|
||||||
The telemetry system supports three main types of events:
|
|
||||||
|
|
||||||
<Tabs>
|
|
||||||
<TabItem value="unstructured" label="Unstructured Logs">
|
|
||||||
|
|
||||||
Free-form log messages with severity levels for general application logging:
|
|
||||||
|
|
||||||
```python
|
|
||||||
unstructured_log_event = UnstructuredLogEvent(
|
|
||||||
message="This is a log message",
|
|
||||||
severity=LogSeverity.INFO
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
</TabItem>
|
|
||||||
<TabItem value="metrics" label="Metric Events">
|
|
||||||
|
|
||||||
Numerical measurements with units for tracking performance and usage:
|
|
||||||
|
|
||||||
```python
|
|
||||||
metric_event = MetricEvent(
|
|
||||||
metric="my_metric",
|
|
||||||
value=10,
|
|
||||||
unit="count"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
</TabItem>
|
|
||||||
<TabItem value="structured" label="Structured Logs">
|
|
||||||
|
|
||||||
System events like span start/end that provide structured operation tracking:
|
|
||||||
|
|
||||||
```python
|
|
||||||
structured_log_event = SpanStartPayload(
|
|
||||||
name="my_span",
|
|
||||||
parent_span_id="parent_span_id"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
</TabItem>
|
|
||||||
</Tabs>
|
|
||||||
|
|
||||||
## Spans and Traces
|
|
||||||
|
|
||||||
- **Spans**: Represent individual operations with timing information and hierarchical relationships
|
|
||||||
- **Traces**: Collections of related spans that form a complete request flow across your application
|
|
||||||
|
|
||||||
This hierarchical structure allows you to understand the complete execution path of requests through your Llama Stack application.
|
|
||||||
|
|
||||||
## Automatic Metrics Generation
|
## Automatic Metrics Generation
|
||||||
|
|
||||||
|
|
@ -129,21 +79,6 @@ Send events to an OpenTelemetry Collector for integration with observability pla
|
||||||
- Compatible with all OpenTelemetry collectors
|
- Compatible with all OpenTelemetry collectors
|
||||||
- Supports both traces and metrics
|
- Supports both traces and metrics
|
||||||
|
|
||||||
</TabItem>
|
|
||||||
<TabItem value="sqlite" label="SQLite">
|
|
||||||
|
|
||||||
Store events in a local SQLite database for direct querying:
|
|
||||||
|
|
||||||
**Use Cases:**
|
|
||||||
- Local development and debugging
|
|
||||||
- Custom analytics and reporting
|
|
||||||
- Offline analysis of application behavior
|
|
||||||
|
|
||||||
**Features:**
|
|
||||||
- Direct SQL querying capabilities
|
|
||||||
- Persistent local storage
|
|
||||||
- No external dependencies
|
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem value="console" label="Console">
|
<TabItem value="console" label="Console">
|
||||||
|
|
||||||
|
|
@ -174,9 +109,8 @@ telemetry:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "llama-stack-service"
|
service_name: "llama-stack-service"
|
||||||
sinks: ['console', 'sqlite', 'otel_trace', 'otel_metric']
|
sinks: ['console', 'otel_trace', 'otel_metric']
|
||||||
otel_exporter_otlp_endpoint: "http://localhost:4318"
|
otel_exporter_otlp_endpoint: "http://localhost:4318"
|
||||||
sqlite_db_path: "/path/to/telemetry.db"
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Environment Variables
|
### Environment Variables
|
||||||
|
|
@ -185,7 +119,7 @@ Configure telemetry behavior using environment variables:
|
||||||
|
|
||||||
- **`OTEL_EXPORTER_OTLP_ENDPOINT`**: OpenTelemetry Collector endpoint (default: `http://localhost:4318`)
|
- **`OTEL_EXPORTER_OTLP_ENDPOINT`**: OpenTelemetry Collector endpoint (default: `http://localhost:4318`)
|
||||||
- **`OTEL_SERVICE_NAME`**: Service name for telemetry (default: empty string)
|
- **`OTEL_SERVICE_NAME`**: Service name for telemetry (default: empty string)
|
||||||
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `console,sqlite`)
|
- **`TELEMETRY_SINKS`**: Comma-separated list of sinks (default: `[]`)
|
||||||
|
|
||||||
### Quick Setup: Complete Telemetry Stack
|
### Quick Setup: Complete Telemetry Stack
|
||||||
|
|
||||||
|
|
@ -248,37 +182,10 @@ Forward metrics to other observability systems:
|
||||||
</TabItem>
|
</TabItem>
|
||||||
</Tabs>
|
</Tabs>
|
||||||
|
|
||||||
## SQLite Querying
|
|
||||||
|
|
||||||
The `sqlite` sink allows you to query traces without an external system. This is particularly useful for development and custom analytics.
|
|
||||||
|
|
||||||
### Example Queries
|
|
||||||
|
|
||||||
```sql
|
|
||||||
-- Query recent traces
|
|
||||||
SELECT * FROM traces WHERE timestamp > datetime('now', '-1 hour');
|
|
||||||
|
|
||||||
-- Analyze span durations
|
|
||||||
SELECT name, AVG(duration_ms) as avg_duration
|
|
||||||
FROM spans
|
|
||||||
GROUP BY name
|
|
||||||
ORDER BY avg_duration DESC;
|
|
||||||
|
|
||||||
-- Find slow operations
|
|
||||||
SELECT * FROM spans
|
|
||||||
WHERE duration_ms > 1000
|
|
||||||
ORDER BY duration_ms DESC;
|
|
||||||
```
|
|
||||||
|
|
||||||
:::tip[Advanced Analytics]
|
|
||||||
Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stack/blob/main/docs/getting_started.ipynb) for more examples on querying traces and spans programmatically.
|
|
||||||
:::
|
|
||||||
|
|
||||||
## Best Practices
|
## Best Practices
|
||||||
|
|
||||||
### 🔍 **Monitoring Strategy**
|
### 🔍 **Monitoring Strategy**
|
||||||
- Use OpenTelemetry for production environments
|
- Use OpenTelemetry for production environments
|
||||||
- Combine multiple sinks for development (console + SQLite)
|
|
||||||
- Set up alerts on key metrics like token usage and error rates
|
- Set up alerts on key metrics like token usage and error rates
|
||||||
|
|
||||||
### 📊 **Metrics Analysis**
|
### 📊 **Metrics Analysis**
|
||||||
|
|
@ -293,45 +200,8 @@ Refer to the [Getting Started notebook](https://github.com/meta-llama/llama-stac
|
||||||
|
|
||||||
### 🔧 **Configuration Management**
|
### 🔧 **Configuration Management**
|
||||||
- Use environment variables for flexible deployment
|
- Use environment variables for flexible deployment
|
||||||
- Configure appropriate retention policies for SQLite
|
|
||||||
- Ensure proper network access to OpenTelemetry collectors
|
- Ensure proper network access to OpenTelemetry collectors
|
||||||
|
|
||||||
## Integration Examples
|
|
||||||
|
|
||||||
### Basic Telemetry Setup
|
|
||||||
|
|
||||||
```python
|
|
||||||
from llama_stack_client import LlamaStackClient
|
|
||||||
|
|
||||||
# Client with telemetry headers
|
|
||||||
client = LlamaStackClient(
|
|
||||||
base_url="http://localhost:8000",
|
|
||||||
extra_headers={
|
|
||||||
"X-Telemetry-Service": "my-ai-app",
|
|
||||||
"X-Telemetry-Version": "1.0.0"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# All API calls will be automatically traced
|
|
||||||
response = client.chat.completions.create(
|
|
||||||
model="meta-llama/Llama-3.2-3B-Instruct",
|
|
||||||
messages=[{"role": "user", "content": "Hello!"}]
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Custom Telemetry Context
|
|
||||||
|
|
||||||
```python
|
|
||||||
# Add custom span attributes for better tracking
|
|
||||||
with tracer.start_as_current_span("custom_operation") as span:
|
|
||||||
span.set_attribute("user_id", "user123")
|
|
||||||
span.set_attribute("operation_type", "chat_completion")
|
|
||||||
|
|
||||||
response = client.chat.completions.create(
|
|
||||||
model="meta-llama/Llama-3.2-3B-Instruct",
|
|
||||||
messages=[{"role": "user", "content": "Hello!"}]
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Related Resources
|
## Related Resources
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -119,7 +119,7 @@ The following environment variables can be configured:
|
||||||
|
|
||||||
### Telemetry Configuration
|
### Telemetry Configuration
|
||||||
- `OTEL_SERVICE_NAME`: OpenTelemetry service name
|
- `OTEL_SERVICE_NAME`: OpenTelemetry service name
|
||||||
- `TELEMETRY_SINKS`: Telemetry sinks (default: `console,sqlite`)
|
- `TELEMETRY_SINKS`: Telemetry sinks (default: `[]`)
|
||||||
|
|
||||||
## Enabling Providers
|
## Enabling Providers
|
||||||
|
|
||||||
|
|
@ -216,7 +216,6 @@ The starter distribution uses SQLite for local storage of various components:
|
||||||
- **Files metadata**: `~/.llama/distributions/starter/files_metadata.db`
|
- **Files metadata**: `~/.llama/distributions/starter/files_metadata.db`
|
||||||
- **Agents store**: `~/.llama/distributions/starter/agents_store.db`
|
- **Agents store**: `~/.llama/distributions/starter/agents_store.db`
|
||||||
- **Responses store**: `~/.llama/distributions/starter/responses_store.db`
|
- **Responses store**: `~/.llama/distributions/starter/responses_store.db`
|
||||||
- **Trace store**: `~/.llama/distributions/starter/trace_store.db`
|
|
||||||
- **Evaluation store**: `~/.llama/distributions/starter/meta_reference_eval.db`
|
- **Evaluation store**: `~/.llama/distributions/starter/meta_reference_eval.db`
|
||||||
- **Dataset I/O stores**: Various HuggingFace and local filesystem stores
|
- **Dataset I/O stores**: Various HuggingFace and local filesystem stores
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,14 +16,12 @@ Meta's reference implementation of telemetry and observability using OpenTelemet
|
||||||
|-------|------|----------|---------|-------------|
|
|-------|------|----------|---------|-------------|
|
||||||
| `otel_exporter_otlp_endpoint` | `str \| None` | No | | The OpenTelemetry collector endpoint URL (base URL for traces, metrics, and logs). If not set, the SDK will use OTEL_EXPORTER_OTLP_ENDPOINT environment variable. |
|
| `otel_exporter_otlp_endpoint` | `str \| None` | No | | The OpenTelemetry collector endpoint URL (base URL for traces, metrics, and logs). If not set, the SDK will use OTEL_EXPORTER_OTLP_ENDPOINT environment variable. |
|
||||||
| `service_name` | `<class 'str'>` | No | | The service name to use for telemetry |
|
| `service_name` | `<class 'str'>` | No | | The service name to use for telemetry |
|
||||||
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [<TelemetrySink.SQLITE: 'sqlite'>] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console) |
|
| `sinks` | `list[inline.telemetry.meta_reference.config.TelemetrySink` | No | [] | List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console) |
|
||||||
| `sqlite_db_path` | `<class 'str'>` | No | ~/.llama/runtime/trace_store.db | The path to the SQLite database to use for storing traces |
|
|
||||||
|
|
||||||
## Sample Configuration
|
## Sample Configuration
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
```
|
```
|
||||||
|
|
|
||||||
|
|
@ -421,104 +421,3 @@ class Telemetry(Protocol):
|
||||||
:param ttl_seconds: The time to live of the event.
|
:param ttl_seconds: The time to live of the event.
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
async def query_traces(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition] | None = None,
|
|
||||||
limit: int | None = 100,
|
|
||||||
offset: int | None = 0,
|
|
||||||
order_by: list[str] | None = None,
|
|
||||||
) -> QueryTracesResponse:
|
|
||||||
"""Query traces.
|
|
||||||
|
|
||||||
:param attribute_filters: The attribute filters to apply to the traces.
|
|
||||||
:param limit: The limit of traces to return.
|
|
||||||
:param offset: The offset of the traces to return.
|
|
||||||
:param order_by: The order by of the traces to return.
|
|
||||||
:returns: A QueryTracesResponse.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
async def get_trace(self, trace_id: str) -> Trace:
|
|
||||||
"""Get a trace by its ID.
|
|
||||||
|
|
||||||
:param trace_id: The ID of the trace to get.
|
|
||||||
:returns: A Trace.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
async def get_span(self, trace_id: str, span_id: str) -> Span:
|
|
||||||
"""Get a span by its ID.
|
|
||||||
|
|
||||||
:param trace_id: The ID of the trace to get the span from.
|
|
||||||
:param span_id: The ID of the span to get.
|
|
||||||
:returns: A Span.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
async def get_span_tree(
|
|
||||||
self,
|
|
||||||
span_id: str,
|
|
||||||
attributes_to_return: list[str] | None = None,
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> QuerySpanTreeResponse:
|
|
||||||
"""Get a span tree by its ID.
|
|
||||||
|
|
||||||
:param span_id: The ID of the span to get the tree from.
|
|
||||||
:param attributes_to_return: The attributes to return in the tree.
|
|
||||||
:param max_depth: The maximum depth of the tree.
|
|
||||||
:returns: A QuerySpanTreeResponse.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
async def query_spans(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition],
|
|
||||||
attributes_to_return: list[str],
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> QuerySpansResponse:
|
|
||||||
"""Query spans.
|
|
||||||
|
|
||||||
:param attribute_filters: The attribute filters to apply to the spans.
|
|
||||||
:param attributes_to_return: The attributes to return in the spans.
|
|
||||||
:param max_depth: The maximum depth of the tree.
|
|
||||||
:returns: A QuerySpansResponse.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
async def save_spans_to_dataset(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition],
|
|
||||||
attributes_to_save: list[str],
|
|
||||||
dataset_id: str,
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> None:
|
|
||||||
"""Save spans to a dataset.
|
|
||||||
|
|
||||||
:param attribute_filters: The attribute filters to apply to the spans.
|
|
||||||
:param attributes_to_save: The attributes to save to the dataset.
|
|
||||||
:param dataset_id: The ID of the dataset to save the spans to.
|
|
||||||
:param max_depth: The maximum depth of the tree.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
async def query_metrics(
|
|
||||||
self,
|
|
||||||
metric_name: str,
|
|
||||||
start_time: int,
|
|
||||||
end_time: int | None = None,
|
|
||||||
granularity: str | None = None,
|
|
||||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
|
||||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
|
||||||
) -> QueryMetricsResponse:
|
|
||||||
"""Query metrics.
|
|
||||||
|
|
||||||
:param metric_name: The name of the metric to query.
|
|
||||||
:param start_time: The start time of the metric to query.
|
|
||||||
:param end_time: The end time of the metric to query.
|
|
||||||
:param granularity: The granularity of the metric to query.
|
|
||||||
:param query_type: The type of query to perform.
|
|
||||||
:param label_matchers: The label matchers to apply to the metric.
|
|
||||||
:returns: A QueryMetricsResponse.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
|
||||||
|
|
@ -207,8 +207,9 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
# when using the library client, we should not log to console since many
|
# when using the library client, we should not log to console since many
|
||||||
# of our logs are intended for server-side usage
|
# of our logs are intended for server-side usage
|
||||||
current_sinks = os.environ.get("TELEMETRY_SINKS", "sqlite").split(",")
|
if sinks_from_env := os.environ.get("TELEMETRY_SINKS", None):
|
||||||
os.environ["TELEMETRY_SINKS"] = ",".join(sink for sink in current_sinks if sink != "console")
|
current_sinks = sinks_from_env.strip().lower().split(",")
|
||||||
|
os.environ["TELEMETRY_SINKS"] = ",".join(sink for sink in current_sinks if sink != "console")
|
||||||
|
|
||||||
if in_notebook():
|
if in_notebook():
|
||||||
import nest_asyncio
|
import nest_asyncio
|
||||||
|
|
|
||||||
|
|
@ -159,8 +159,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
post_training:
|
post_training:
|
||||||
- provider_id: torchtune-cpu
|
- provider_id: torchtune-cpu
|
||||||
|
|
|
||||||
|
|
@ -50,8 +50,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: meta-reference
|
- provider_id: meta-reference
|
||||||
|
|
|
||||||
|
|
@ -46,8 +46,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: meta-reference
|
- provider_id: meta-reference
|
||||||
|
|
|
||||||
|
|
@ -61,8 +61,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: meta-reference
|
- provider_id: meta-reference
|
||||||
|
|
|
||||||
|
|
@ -51,8 +51,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: meta-reference
|
- provider_id: meta-reference
|
||||||
|
|
|
||||||
|
|
@ -53,8 +53,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: nvidia
|
- provider_id: nvidia
|
||||||
|
|
|
||||||
|
|
@ -48,8 +48,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: nvidia
|
- provider_id: nvidia
|
||||||
|
|
|
||||||
|
|
@ -81,8 +81,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: meta-reference
|
- provider_id: meta-reference
|
||||||
|
|
|
||||||
|
|
@ -159,8 +159,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter-gpu}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
post_training:
|
post_training:
|
||||||
- provider_id: huggingface-gpu
|
- provider_id: huggingface-gpu
|
||||||
|
|
|
||||||
|
|
@ -159,8 +159,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
post_training:
|
post_training:
|
||||||
- provider_id: torchtune-cpu
|
- provider_id: torchtune-cpu
|
||||||
|
|
|
||||||
|
|
@ -46,8 +46,7 @@ providers:
|
||||||
provider_type: inline::meta-reference
|
provider_type: inline::meta-reference
|
||||||
config:
|
config:
|
||||||
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
service_name: "${env.OTEL_SERVICE_NAME:=\u200B}"
|
||||||
sinks: ${env.TELEMETRY_SINKS:=sqlite}
|
sinks: ${env.TELEMETRY_SINKS:=}
|
||||||
sqlite_db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/watsonx}/trace_store.db
|
|
||||||
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
otel_exporter_otlp_endpoint: ${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}
|
||||||
eval:
|
eval:
|
||||||
- provider_id: meta-reference
|
- provider_id: meta-reference
|
||||||
|
|
|
||||||
|
|
@ -9,13 +9,10 @@ from typing import Any
|
||||||
|
|
||||||
from pydantic import BaseModel, Field, field_validator
|
from pydantic import BaseModel, Field, field_validator
|
||||||
|
|
||||||
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
|
|
||||||
|
|
||||||
|
|
||||||
class TelemetrySink(StrEnum):
|
class TelemetrySink(StrEnum):
|
||||||
OTEL_TRACE = "otel_trace"
|
OTEL_TRACE = "otel_trace"
|
||||||
OTEL_METRIC = "otel_metric"
|
OTEL_METRIC = "otel_metric"
|
||||||
SQLITE = "sqlite"
|
|
||||||
CONSOLE = "console"
|
CONSOLE = "console"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -30,12 +27,8 @@ class TelemetryConfig(BaseModel):
|
||||||
description="The service name to use for telemetry",
|
description="The service name to use for telemetry",
|
||||||
)
|
)
|
||||||
sinks: list[TelemetrySink] = Field(
|
sinks: list[TelemetrySink] = Field(
|
||||||
default=[TelemetrySink.SQLITE],
|
default_factory=list,
|
||||||
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, sqlite, console)",
|
description="List of telemetry sinks to enable (possible values: otel_trace, otel_metric, console)",
|
||||||
)
|
|
||||||
sqlite_db_path: str = Field(
|
|
||||||
default_factory=lambda: (RUNTIME_BASE_DIR / "trace_store.db").as_posix(),
|
|
||||||
description="The path to the SQLite database to use for storing traces",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@field_validator("sinks", mode="before")
|
@field_validator("sinks", mode="before")
|
||||||
|
|
@ -43,13 +36,12 @@ class TelemetryConfig(BaseModel):
|
||||||
def validate_sinks(cls, v):
|
def validate_sinks(cls, v):
|
||||||
if isinstance(v, str):
|
if isinstance(v, str):
|
||||||
return [TelemetrySink(sink.strip()) for sink in v.split(",")]
|
return [TelemetrySink(sink.strip()) for sink in v.split(",")]
|
||||||
return v
|
return v or []
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def sample_run_config(cls, __distro_dir__: str, db_name: str = "trace_store.db") -> dict[str, Any]:
|
def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]:
|
||||||
return {
|
return {
|
||||||
"service_name": "${env.OTEL_SERVICE_NAME:=\u200b}",
|
"service_name": "${env.OTEL_SERVICE_NAME:=\u200b}",
|
||||||
"sinks": "${env.TELEMETRY_SINKS:=sqlite}",
|
"sinks": "${env.TELEMETRY_SINKS:=}",
|
||||||
"sqlite_db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
|
|
||||||
"otel_exporter_otlp_endpoint": "${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}",
|
"otel_exporter_otlp_endpoint": "${env.OTEL_EXPORTER_OTLP_ENDPOINT:=}",
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,190 +0,0 @@
|
||||||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
|
||||||
# the root directory of this source tree.
|
|
||||||
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import sqlite3
|
|
||||||
import threading
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
|
|
||||||
from opentelemetry.sdk.trace import SpanProcessor
|
|
||||||
from opentelemetry.trace import Span
|
|
||||||
from opentelemetry.trace.span import format_span_id, format_trace_id
|
|
||||||
|
|
||||||
from llama_stack.providers.utils.telemetry.tracing import LOCAL_ROOT_SPAN_MARKER
|
|
||||||
|
|
||||||
|
|
||||||
class SQLiteSpanProcessor(SpanProcessor):
|
|
||||||
def __init__(self, conn_string):
|
|
||||||
"""Initialize the SQLite span processor with a connection string."""
|
|
||||||
self.conn_string = conn_string
|
|
||||||
self._local = threading.local() # Thread-local storage for connections
|
|
||||||
self.setup_database()
|
|
||||||
|
|
||||||
def _get_connection(self):
|
|
||||||
"""Get a thread-local database connection."""
|
|
||||||
if not hasattr(self._local, "conn"):
|
|
||||||
try:
|
|
||||||
self._local.conn = sqlite3.connect(self.conn_string)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error connecting to SQLite database: {e}")
|
|
||||||
raise
|
|
||||||
return self._local.conn
|
|
||||||
|
|
||||||
def setup_database(self):
|
|
||||||
"""Create the necessary tables if they don't exist."""
|
|
||||||
# Create directory if it doesn't exist
|
|
||||||
os.makedirs(os.path.dirname(self.conn_string), exist_ok=True)
|
|
||||||
|
|
||||||
conn = self._get_connection()
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
CREATE TABLE IF NOT EXISTS traces (
|
|
||||||
trace_id TEXT PRIMARY KEY,
|
|
||||||
service_name TEXT,
|
|
||||||
root_span_id TEXT,
|
|
||||||
start_time TIMESTAMP,
|
|
||||||
end_time TIMESTAMP,
|
|
||||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
CREATE TABLE IF NOT EXISTS spans (
|
|
||||||
span_id TEXT PRIMARY KEY,
|
|
||||||
trace_id TEXT REFERENCES traces(trace_id),
|
|
||||||
parent_span_id TEXT,
|
|
||||||
name TEXT,
|
|
||||||
start_time TIMESTAMP,
|
|
||||||
end_time TIMESTAMP,
|
|
||||||
attributes TEXT,
|
|
||||||
status TEXT,
|
|
||||||
kind TEXT
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
CREATE TABLE IF NOT EXISTS span_events (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
span_id TEXT REFERENCES spans(span_id),
|
|
||||||
name TEXT,
|
|
||||||
timestamp TIMESTAMP,
|
|
||||||
attributes TEXT
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_traces_created_at
|
|
||||||
ON traces(created_at)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
conn.commit()
|
|
||||||
cursor.close()
|
|
||||||
|
|
||||||
def on_start(self, span: Span, parent_context=None):
|
|
||||||
"""Called when a span starts."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_end(self, span: Span):
|
|
||||||
"""Called when a span ends. Export the span data to SQLite."""
|
|
||||||
try:
|
|
||||||
conn = self._get_connection()
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
trace_id = format_trace_id(span.get_span_context().trace_id)
|
|
||||||
span_id = format_span_id(span.get_span_context().span_id)
|
|
||||||
service_name = span.resource.attributes.get("service.name", "unknown")
|
|
||||||
|
|
||||||
parent_span_id = None
|
|
||||||
parent_context = span.parent
|
|
||||||
if parent_context:
|
|
||||||
parent_span_id = format_span_id(parent_context.span_id)
|
|
||||||
|
|
||||||
# Insert into traces
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO traces (
|
|
||||||
trace_id, service_name, root_span_id, start_time, end_time
|
|
||||||
) VALUES (?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT(trace_id) DO UPDATE SET
|
|
||||||
root_span_id = COALESCE(root_span_id, excluded.root_span_id),
|
|
||||||
start_time = MIN(excluded.start_time, start_time),
|
|
||||||
end_time = MAX(excluded.end_time, end_time)
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
trace_id,
|
|
||||||
service_name,
|
|
||||||
(span_id if span.attributes.get(LOCAL_ROOT_SPAN_MARKER) else None),
|
|
||||||
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
|
|
||||||
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Insert into spans
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO spans (
|
|
||||||
span_id, trace_id, parent_span_id, name,
|
|
||||||
start_time, end_time, attributes, status,
|
|
||||||
kind
|
|
||||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
span_id,
|
|
||||||
trace_id,
|
|
||||||
parent_span_id,
|
|
||||||
span.name,
|
|
||||||
datetime.fromtimestamp(span.start_time / 1e9, UTC).isoformat(),
|
|
||||||
datetime.fromtimestamp(span.end_time / 1e9, UTC).isoformat(),
|
|
||||||
json.dumps(dict(span.attributes)),
|
|
||||||
span.status.status_code.name,
|
|
||||||
span.kind.name,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
for event in span.events:
|
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO span_events (
|
|
||||||
span_id, name, timestamp, attributes
|
|
||||||
) VALUES (?, ?, ?, ?)
|
|
||||||
""",
|
|
||||||
(
|
|
||||||
span_id,
|
|
||||||
event.name,
|
|
||||||
datetime.fromtimestamp(event.timestamp / 1e9, UTC).isoformat(),
|
|
||||||
json.dumps(dict(event.attributes)),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
conn.commit()
|
|
||||||
cursor.close()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error exporting span to SQLite: {e}")
|
|
||||||
|
|
||||||
def shutdown(self):
|
|
||||||
"""Cleanup any resources."""
|
|
||||||
# We can't access other threads' connections, so we just close our own
|
|
||||||
if hasattr(self._local, "conn"):
|
|
||||||
try:
|
|
||||||
self._local.conn.close()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error closing SQLite connection: {e}")
|
|
||||||
finally:
|
|
||||||
del self._local.conn
|
|
||||||
|
|
||||||
def force_flush(self, timeout_millis=30000):
|
|
||||||
"""Force export of spans."""
|
|
||||||
pass
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
# This source code is licensed under the terms described in the LICENSE file in
|
||||||
# the root directory of this source tree.
|
# the root directory of this source tree.
|
||||||
|
|
||||||
import datetime
|
|
||||||
import threading
|
import threading
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
@ -22,19 +21,11 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapProp
|
||||||
from llama_stack.apis.telemetry import (
|
from llama_stack.apis.telemetry import (
|
||||||
Event,
|
Event,
|
||||||
MetricEvent,
|
MetricEvent,
|
||||||
MetricLabelMatcher,
|
|
||||||
MetricQueryType,
|
|
||||||
QueryCondition,
|
|
||||||
QueryMetricsResponse,
|
|
||||||
QuerySpanTreeResponse,
|
|
||||||
QueryTracesResponse,
|
|
||||||
Span,
|
|
||||||
SpanEndPayload,
|
SpanEndPayload,
|
||||||
SpanStartPayload,
|
SpanStartPayload,
|
||||||
SpanStatus,
|
SpanStatus,
|
||||||
StructuredLogEvent,
|
StructuredLogEvent,
|
||||||
Telemetry,
|
Telemetry,
|
||||||
Trace,
|
|
||||||
UnstructuredLogEvent,
|
UnstructuredLogEvent,
|
||||||
)
|
)
|
||||||
from llama_stack.core.datatypes import Api
|
from llama_stack.core.datatypes import Api
|
||||||
|
|
@ -42,11 +33,6 @@ from llama_stack.log import get_logger
|
||||||
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
|
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
|
||||||
ConsoleSpanProcessor,
|
ConsoleSpanProcessor,
|
||||||
)
|
)
|
||||||
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
|
|
||||||
SQLiteSpanProcessor,
|
|
||||||
)
|
|
||||||
from llama_stack.providers.utils.telemetry.dataset_mixin import TelemetryDatasetMixin
|
|
||||||
from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore
|
|
||||||
from llama_stack.providers.utils.telemetry.tracing import ROOT_SPAN_MARKERS
|
from llama_stack.providers.utils.telemetry.tracing import ROOT_SPAN_MARKERS
|
||||||
|
|
||||||
from .config import TelemetryConfig, TelemetrySink
|
from .config import TelemetryConfig, TelemetrySink
|
||||||
|
|
@ -68,7 +54,7 @@ def is_tracing_enabled(tracer):
|
||||||
return span.is_recording()
|
return span.is_recording()
|
||||||
|
|
||||||
|
|
||||||
class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
class TelemetryAdapter(Telemetry):
|
||||||
def __init__(self, config: TelemetryConfig, deps: dict[Api, Any]) -> None:
|
def __init__(self, config: TelemetryConfig, deps: dict[Api, Any]) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.datasetio_api = deps.get(Api.datasetio)
|
self.datasetio_api = deps.get(Api.datasetio)
|
||||||
|
|
@ -111,15 +97,11 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||||
metric_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
|
metric_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
|
||||||
metrics.set_meter_provider(metric_provider)
|
metrics.set_meter_provider(metric_provider)
|
||||||
|
|
||||||
if TelemetrySink.SQLITE in self.config.sinks:
|
|
||||||
trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path))
|
|
||||||
if TelemetrySink.CONSOLE in self.config.sinks:
|
if TelemetrySink.CONSOLE in self.config.sinks:
|
||||||
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True))
|
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True))
|
||||||
|
|
||||||
if TelemetrySink.OTEL_METRIC in self.config.sinks:
|
if TelemetrySink.OTEL_METRIC in self.config.sinks:
|
||||||
self.meter = metrics.get_meter(__name__)
|
self.meter = metrics.get_meter(__name__)
|
||||||
if TelemetrySink.SQLITE in self.config.sinks:
|
|
||||||
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path)
|
|
||||||
|
|
||||||
self._lock = _global_lock
|
self._lock = _global_lock
|
||||||
|
|
||||||
|
|
@ -139,47 +121,6 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown event type: {event}")
|
raise ValueError(f"Unknown event type: {event}")
|
||||||
|
|
||||||
async def query_metrics(
|
|
||||||
self,
|
|
||||||
metric_name: str,
|
|
||||||
start_time: int,
|
|
||||||
end_time: int | None = None,
|
|
||||||
granularity: str | None = None,
|
|
||||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
|
||||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
|
||||||
) -> QueryMetricsResponse:
|
|
||||||
"""Query metrics from the telemetry store.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
metric_name: The name of the metric to query (e.g., "prompt_tokens")
|
|
||||||
start_time: Start time as Unix timestamp
|
|
||||||
end_time: End time as Unix timestamp (defaults to now if None)
|
|
||||||
granularity: Time granularity for aggregation
|
|
||||||
query_type: Type of query (RANGE or INSTANT)
|
|
||||||
label_matchers: Label filters to apply
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
QueryMetricsResponse with metric time series data
|
|
||||||
"""
|
|
||||||
# Convert timestamps to datetime objects
|
|
||||||
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
|
|
||||||
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None
|
|
||||||
|
|
||||||
# Use SQLite trace store if available
|
|
||||||
if hasattr(self, "trace_store") and self.trace_store:
|
|
||||||
return await self.trace_store.query_metrics(
|
|
||||||
metric_name=metric_name,
|
|
||||||
start_time=start_dt,
|
|
||||||
end_time=end_dt,
|
|
||||||
granularity=granularity,
|
|
||||||
query_type=query_type,
|
|
||||||
label_matchers=label_matchers,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise ValueError(
|
|
||||||
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
|
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
# Use global storage instead of instance storage
|
# Use global storage instead of instance storage
|
||||||
|
|
@ -326,39 +267,3 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||||
_GLOBAL_STORAGE["active_spans"].pop(span_id, None)
|
_GLOBAL_STORAGE["active_spans"].pop(span_id, None)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown structured log event: {event}")
|
raise ValueError(f"Unknown structured log event: {event}")
|
||||||
|
|
||||||
async def query_traces(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition] | None = None,
|
|
||||||
limit: int | None = 100,
|
|
||||||
offset: int | None = 0,
|
|
||||||
order_by: list[str] | None = None,
|
|
||||||
) -> QueryTracesResponse:
|
|
||||||
return QueryTracesResponse(
|
|
||||||
data=await self.trace_store.query_traces(
|
|
||||||
attribute_filters=attribute_filters,
|
|
||||||
limit=limit,
|
|
||||||
offset=offset,
|
|
||||||
order_by=order_by,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
async def get_trace(self, trace_id: str) -> Trace:
|
|
||||||
return await self.trace_store.get_trace(trace_id)
|
|
||||||
|
|
||||||
async def get_span(self, trace_id: str, span_id: str) -> Span:
|
|
||||||
return await self.trace_store.get_span(trace_id, span_id)
|
|
||||||
|
|
||||||
async def get_span_tree(
|
|
||||||
self,
|
|
||||||
span_id: str,
|
|
||||||
attributes_to_return: list[str] | None = None,
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> QuerySpanTreeResponse:
|
|
||||||
return QuerySpanTreeResponse(
|
|
||||||
data=await self.trace_store.get_span_tree(
|
|
||||||
span_id=span_id,
|
|
||||||
attributes_to_return=attributes_to_return,
|
|
||||||
max_depth=max_depth,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
|
||||||
|
|
@ -1,80 +0,0 @@
|
||||||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
|
||||||
# the root directory of this source tree.
|
|
||||||
|
|
||||||
|
|
||||||
from llama_stack.apis.datasetio import DatasetIO
|
|
||||||
from llama_stack.apis.telemetry import QueryCondition, QuerySpansResponse, Span
|
|
||||||
|
|
||||||
|
|
||||||
class TelemetryDatasetMixin:
|
|
||||||
"""Mixin class that provides dataset-related functionality for telemetry providers."""
|
|
||||||
|
|
||||||
datasetio_api: DatasetIO | None
|
|
||||||
|
|
||||||
async def save_spans_to_dataset(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition],
|
|
||||||
attributes_to_save: list[str],
|
|
||||||
dataset_id: str,
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> None:
|
|
||||||
if self.datasetio_api is None:
|
|
||||||
raise RuntimeError("DatasetIO API not available")
|
|
||||||
|
|
||||||
spans = await self.query_spans(
|
|
||||||
attribute_filters=attribute_filters,
|
|
||||||
attributes_to_return=attributes_to_save,
|
|
||||||
max_depth=max_depth,
|
|
||||||
)
|
|
||||||
|
|
||||||
rows = [
|
|
||||||
{
|
|
||||||
"trace_id": span.trace_id,
|
|
||||||
"span_id": span.span_id,
|
|
||||||
"parent_span_id": span.parent_span_id,
|
|
||||||
"name": span.name,
|
|
||||||
"start_time": span.start_time,
|
|
||||||
"end_time": span.end_time,
|
|
||||||
**{attr: span.attributes.get(attr) for attr in attributes_to_save},
|
|
||||||
}
|
|
||||||
for span in spans
|
|
||||||
]
|
|
||||||
|
|
||||||
await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows)
|
|
||||||
|
|
||||||
async def query_spans(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition],
|
|
||||||
attributes_to_return: list[str],
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> QuerySpansResponse:
|
|
||||||
traces = await self.query_traces(attribute_filters=attribute_filters)
|
|
||||||
spans = []
|
|
||||||
|
|
||||||
for trace in traces.data:
|
|
||||||
spans_by_id_resp = await self.get_span_tree(
|
|
||||||
span_id=trace.root_span_id,
|
|
||||||
attributes_to_return=attributes_to_return,
|
|
||||||
max_depth=max_depth,
|
|
||||||
)
|
|
||||||
|
|
||||||
for span in spans_by_id_resp.data.values():
|
|
||||||
if span.attributes and all(
|
|
||||||
attr in span.attributes and span.attributes[attr] is not None for attr in attributes_to_return
|
|
||||||
):
|
|
||||||
spans.append(
|
|
||||||
Span(
|
|
||||||
trace_id=trace.root_span_id,
|
|
||||||
span_id=span.span_id,
|
|
||||||
parent_span_id=span.parent_span_id,
|
|
||||||
name=span.name,
|
|
||||||
start_time=span.start_time,
|
|
||||||
end_time=span.end_time,
|
|
||||||
attributes=span.attributes,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return QuerySpansResponse(data=spans)
|
|
||||||
|
|
@ -1,383 +0,0 @@
|
||||||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
|
||||||
# the root directory of this source tree.
|
|
||||||
|
|
||||||
import json
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
from typing import Protocol
|
|
||||||
|
|
||||||
import aiosqlite
|
|
||||||
|
|
||||||
from llama_stack.apis.telemetry import (
|
|
||||||
MetricDataPoint,
|
|
||||||
MetricLabel,
|
|
||||||
MetricLabelMatcher,
|
|
||||||
MetricQueryType,
|
|
||||||
MetricSeries,
|
|
||||||
QueryCondition,
|
|
||||||
QueryMetricsResponse,
|
|
||||||
Span,
|
|
||||||
SpanWithStatus,
|
|
||||||
Trace,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TraceStore(Protocol):
|
|
||||||
async def query_traces(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition] | None = None,
|
|
||||||
limit: int | None = 100,
|
|
||||||
offset: int | None = 0,
|
|
||||||
order_by: list[str] | None = None,
|
|
||||||
) -> list[Trace]: ...
|
|
||||||
|
|
||||||
async def get_span_tree(
|
|
||||||
self,
|
|
||||||
span_id: str,
|
|
||||||
attributes_to_return: list[str] | None = None,
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> dict[str, SpanWithStatus]: ...
|
|
||||||
|
|
||||||
async def query_metrics(
|
|
||||||
self,
|
|
||||||
metric_name: str,
|
|
||||||
start_time: datetime,
|
|
||||||
end_time: datetime | None = None,
|
|
||||||
granularity: str | None = "1d",
|
|
||||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
|
||||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
|
||||||
) -> QueryMetricsResponse: ...
|
|
||||||
|
|
||||||
|
|
||||||
class SQLiteTraceStore(TraceStore):
|
|
||||||
def __init__(self, conn_string: str):
|
|
||||||
self.conn_string = conn_string
|
|
||||||
|
|
||||||
async def query_metrics(
|
|
||||||
self,
|
|
||||||
metric_name: str,
|
|
||||||
start_time: datetime,
|
|
||||||
end_time: datetime | None = None,
|
|
||||||
granularity: str | None = None,
|
|
||||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
|
||||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
|
||||||
) -> QueryMetricsResponse:
|
|
||||||
if end_time is None:
|
|
||||||
end_time = datetime.now(UTC)
|
|
||||||
|
|
||||||
# Build base query
|
|
||||||
if query_type == MetricQueryType.INSTANT:
|
|
||||||
query = """
|
|
||||||
SELECT
|
|
||||||
se.name,
|
|
||||||
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
|
|
||||||
json_extract(se.attributes, '$.unit') as unit,
|
|
||||||
se.attributes
|
|
||||||
FROM span_events se
|
|
||||||
WHERE se.name = ?
|
|
||||||
AND se.timestamp BETWEEN ? AND ?
|
|
||||||
"""
|
|
||||||
else:
|
|
||||||
if granularity:
|
|
||||||
time_format = self._get_time_format_for_granularity(granularity)
|
|
||||||
query = f"""
|
|
||||||
SELECT
|
|
||||||
se.name,
|
|
||||||
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
|
|
||||||
json_extract(se.attributes, '$.unit') as unit,
|
|
||||||
se.attributes,
|
|
||||||
strftime('{time_format}', se.timestamp) as bucket_start
|
|
||||||
FROM span_events se
|
|
||||||
WHERE se.name = ?
|
|
||||||
AND se.timestamp BETWEEN ? AND ?
|
|
||||||
"""
|
|
||||||
else:
|
|
||||||
query = """
|
|
||||||
SELECT
|
|
||||||
se.name,
|
|
||||||
json_extract(se.attributes, '$.value') as value,
|
|
||||||
json_extract(se.attributes, '$.unit') as unit,
|
|
||||||
se.attributes,
|
|
||||||
se.timestamp
|
|
||||||
FROM span_events se
|
|
||||||
WHERE se.name = ?
|
|
||||||
AND se.timestamp BETWEEN ? AND ?
|
|
||||||
"""
|
|
||||||
|
|
||||||
params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()]
|
|
||||||
|
|
||||||
# Labels that will be attached to the MetricSeries (preserve matcher labels)
|
|
||||||
all_labels: list[MetricLabel] = []
|
|
||||||
matcher_label_names = set()
|
|
||||||
if label_matchers:
|
|
||||||
for matcher in label_matchers:
|
|
||||||
json_path = f"$.{matcher.name}"
|
|
||||||
if matcher.operator == "=":
|
|
||||||
query += f" AND json_extract(se.attributes, '{json_path}') = ?"
|
|
||||||
params.append(matcher.value)
|
|
||||||
elif matcher.operator == "!=":
|
|
||||||
query += f" AND json_extract(se.attributes, '{json_path}') != ?"
|
|
||||||
params.append(matcher.value)
|
|
||||||
elif matcher.operator == "=~":
|
|
||||||
query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?"
|
|
||||||
params.append(f"%{matcher.value}%")
|
|
||||||
elif matcher.operator == "!~":
|
|
||||||
query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?"
|
|
||||||
params.append(f"%{matcher.value}%")
|
|
||||||
# Preserve filter context in output
|
|
||||||
all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value)))
|
|
||||||
matcher_label_names.add(matcher.name)
|
|
||||||
|
|
||||||
# GROUP BY / ORDER BY logic
|
|
||||||
if query_type == MetricQueryType.RANGE and granularity:
|
|
||||||
group_time_format = self._get_time_format_for_granularity(granularity)
|
|
||||||
query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')"
|
|
||||||
query += " ORDER BY bucket_start"
|
|
||||||
elif query_type == MetricQueryType.INSTANT:
|
|
||||||
query += " GROUP BY json_extract(se.attributes, '$.unit')"
|
|
||||||
else:
|
|
||||||
query += " ORDER BY se.timestamp"
|
|
||||||
|
|
||||||
# Execute query
|
|
||||||
async with aiosqlite.connect(self.conn_string) as conn:
|
|
||||||
conn.row_factory = aiosqlite.Row
|
|
||||||
async with conn.execute(query, params) as cursor:
|
|
||||||
rows = await cursor.fetchall()
|
|
||||||
|
|
||||||
if not rows:
|
|
||||||
return QueryMetricsResponse(data=[])
|
|
||||||
|
|
||||||
data_points = []
|
|
||||||
# We want to add attribute labels, but only those not already present as matcher labels.
|
|
||||||
attr_label_names = set()
|
|
||||||
for row in rows:
|
|
||||||
# Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
|
|
||||||
try:
|
|
||||||
attributes = json.loads(row["attributes"] or "{}")
|
|
||||||
except (TypeError, json.JSONDecodeError):
|
|
||||||
attributes = {}
|
|
||||||
|
|
||||||
value = row["value"]
|
|
||||||
unit = row["unit"] or ""
|
|
||||||
|
|
||||||
# Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
|
|
||||||
for k, v in attributes.items():
|
|
||||||
if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names:
|
|
||||||
all_labels.append(MetricLabel(name=k, value=str(v)))
|
|
||||||
attr_label_names.add(k)
|
|
||||||
|
|
||||||
# Determine timestamp
|
|
||||||
if query_type == MetricQueryType.RANGE and granularity:
|
|
||||||
try:
|
|
||||||
bucket_start_raw = row["bucket_start"]
|
|
||||||
except KeyError as e:
|
|
||||||
raise ValueError(
|
|
||||||
"DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
|
|
||||||
) from e
|
|
||||||
# this value could also be there, but be NULL, I think.
|
|
||||||
if bucket_start_raw is None:
|
|
||||||
raise ValueError("bucket_start is None check time format and data")
|
|
||||||
bucket_start = datetime.fromisoformat(bucket_start_raw)
|
|
||||||
timestamp = int(bucket_start.timestamp())
|
|
||||||
elif query_type == MetricQueryType.INSTANT:
|
|
||||||
timestamp = int(datetime.now(UTC).timestamp())
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
timestamp_raw = row["timestamp"]
|
|
||||||
except KeyError as e:
|
|
||||||
raise ValueError(
|
|
||||||
"DB did not have a timestamp in row, this indicates improper formatting"
|
|
||||||
) from e
|
|
||||||
# this value could also be there, but be NULL, I think.
|
|
||||||
if timestamp_raw is None:
|
|
||||||
raise ValueError("timestamp is None check time format and data")
|
|
||||||
timestamp_iso = datetime.fromisoformat(timestamp_raw)
|
|
||||||
timestamp = int(timestamp_iso.timestamp())
|
|
||||||
|
|
||||||
data_points.append(
|
|
||||||
MetricDataPoint(
|
|
||||||
timestamp=timestamp,
|
|
||||||
value=value,
|
|
||||||
unit=unit,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)]
|
|
||||||
return QueryMetricsResponse(data=metric_series)
|
|
||||||
|
|
||||||
def _get_time_format_for_granularity(self, granularity: str | None) -> str:
|
|
||||||
"""Get the SQLite strftime format string for a given granularity.
|
|
||||||
Args:
|
|
||||||
granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
|
|
||||||
Returns:
|
|
||||||
SQLite strftime format string for the granularity
|
|
||||||
"""
|
|
||||||
if granularity is None:
|
|
||||||
raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation")
|
|
||||||
|
|
||||||
if granularity.endswith("d"):
|
|
||||||
return "%Y-%m-%d 00:00:00"
|
|
||||||
elif granularity.endswith("h"):
|
|
||||||
return "%Y-%m-%d %H:00:00"
|
|
||||||
elif granularity.endswith("m"):
|
|
||||||
return "%Y-%m-%d %H:%M:00"
|
|
||||||
else:
|
|
||||||
return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
|
|
||||||
|
|
||||||
async def query_traces(
|
|
||||||
self,
|
|
||||||
attribute_filters: list[QueryCondition] | None = None,
|
|
||||||
limit: int | None = 100,
|
|
||||||
offset: int | None = 0,
|
|
||||||
order_by: list[str] | None = None,
|
|
||||||
) -> list[Trace]:
|
|
||||||
def build_where_clause() -> tuple[str, list]:
|
|
||||||
if not attribute_filters:
|
|
||||||
return "", []
|
|
||||||
|
|
||||||
ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"}
|
|
||||||
|
|
||||||
conditions = [
|
|
||||||
f"json_extract(s.attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?"
|
|
||||||
for condition in attribute_filters
|
|
||||||
]
|
|
||||||
params = [condition.value for condition in attribute_filters]
|
|
||||||
where_clause = " WHERE " + " AND ".join(conditions)
|
|
||||||
return where_clause, params
|
|
||||||
|
|
||||||
def build_order_clause() -> str:
|
|
||||||
if not order_by:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
order_clauses = []
|
|
||||||
for field in order_by:
|
|
||||||
desc = field.startswith("-")
|
|
||||||
clean_field = field[1:] if desc else field
|
|
||||||
order_clauses.append(f"t.{clean_field} {'DESC' if desc else 'ASC'}")
|
|
||||||
return " ORDER BY " + ", ".join(order_clauses)
|
|
||||||
|
|
||||||
# Build the main query
|
|
||||||
base_query = """
|
|
||||||
WITH matching_traces AS (
|
|
||||||
SELECT DISTINCT t.trace_id
|
|
||||||
FROM traces t
|
|
||||||
JOIN spans s ON t.trace_id = s.trace_id
|
|
||||||
{where_clause}
|
|
||||||
),
|
|
||||||
filtered_traces AS (
|
|
||||||
SELECT t.trace_id, t.root_span_id, t.start_time, t.end_time
|
|
||||||
FROM matching_traces mt
|
|
||||||
JOIN traces t ON mt.trace_id = t.trace_id
|
|
||||||
LEFT JOIN spans s ON t.trace_id = s.trace_id
|
|
||||||
{order_clause}
|
|
||||||
)
|
|
||||||
SELECT DISTINCT trace_id, root_span_id, start_time, end_time
|
|
||||||
FROM filtered_traces
|
|
||||||
WHERE root_span_id IS NOT NULL
|
|
||||||
LIMIT {limit} OFFSET {offset}
|
|
||||||
"""
|
|
||||||
|
|
||||||
where_clause, params = build_where_clause()
|
|
||||||
query = base_query.format(
|
|
||||||
where_clause=where_clause,
|
|
||||||
order_clause=build_order_clause(),
|
|
||||||
limit=limit,
|
|
||||||
offset=offset,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Execute query and return results
|
|
||||||
async with aiosqlite.connect(self.conn_string) as conn:
|
|
||||||
conn.row_factory = aiosqlite.Row
|
|
||||||
async with conn.execute(query, params) as cursor:
|
|
||||||
rows = await cursor.fetchall()
|
|
||||||
return [
|
|
||||||
Trace(
|
|
||||||
trace_id=row["trace_id"],
|
|
||||||
root_span_id=row["root_span_id"],
|
|
||||||
start_time=datetime.fromisoformat(row["start_time"]),
|
|
||||||
end_time=datetime.fromisoformat(row["end_time"]),
|
|
||||||
)
|
|
||||||
for row in rows
|
|
||||||
]
|
|
||||||
|
|
||||||
async def get_span_tree(
|
|
||||||
self,
|
|
||||||
span_id: str,
|
|
||||||
attributes_to_return: list[str] | None = None,
|
|
||||||
max_depth: int | None = None,
|
|
||||||
) -> dict[str, SpanWithStatus]:
|
|
||||||
# Build the attributes selection
|
|
||||||
attributes_select = "s.attributes"
|
|
||||||
if attributes_to_return:
|
|
||||||
json_object = ", ".join(f"'{key}', json_extract(s.attributes, '$.{key}')" for key in attributes_to_return)
|
|
||||||
attributes_select = f"json_object({json_object})"
|
|
||||||
|
|
||||||
# SQLite CTE query with filtered attributes
|
|
||||||
query = f"""
|
|
||||||
WITH RECURSIVE span_tree AS (
|
|
||||||
SELECT s.*, 1 as depth, {attributes_select} as filtered_attributes
|
|
||||||
FROM spans s
|
|
||||||
WHERE s.span_id = ?
|
|
||||||
|
|
||||||
UNION ALL
|
|
||||||
|
|
||||||
SELECT s.*, st.depth + 1, {attributes_select} as filtered_attributes
|
|
||||||
FROM spans s
|
|
||||||
JOIN span_tree st ON s.parent_span_id = st.span_id
|
|
||||||
WHERE (? IS NULL OR st.depth < ?)
|
|
||||||
)
|
|
||||||
SELECT *
|
|
||||||
FROM span_tree
|
|
||||||
ORDER BY depth, start_time
|
|
||||||
"""
|
|
||||||
|
|
||||||
spans_by_id = {}
|
|
||||||
async with aiosqlite.connect(self.conn_string) as conn:
|
|
||||||
conn.row_factory = aiosqlite.Row
|
|
||||||
async with conn.execute(query, (span_id, max_depth, max_depth)) as cursor:
|
|
||||||
rows = await cursor.fetchall()
|
|
||||||
|
|
||||||
if not rows:
|
|
||||||
raise ValueError(f"Span {span_id} not found")
|
|
||||||
|
|
||||||
for row in rows:
|
|
||||||
span = SpanWithStatus(
|
|
||||||
span_id=row["span_id"],
|
|
||||||
trace_id=row["trace_id"],
|
|
||||||
parent_span_id=row["parent_span_id"],
|
|
||||||
name=row["name"],
|
|
||||||
start_time=datetime.fromisoformat(row["start_time"]),
|
|
||||||
end_time=datetime.fromisoformat(row["end_time"]),
|
|
||||||
attributes=json.loads(row["filtered_attributes"]),
|
|
||||||
status=row["status"].lower(),
|
|
||||||
)
|
|
||||||
|
|
||||||
spans_by_id[span.span_id] = span
|
|
||||||
|
|
||||||
return spans_by_id
|
|
||||||
|
|
||||||
async def get_trace(self, trace_id: str) -> Trace:
|
|
||||||
query = """
|
|
||||||
SELECT *
|
|
||||||
FROM traces t
|
|
||||||
WHERE t.trace_id = ?
|
|
||||||
"""
|
|
||||||
async with aiosqlite.connect(self.conn_string) as conn:
|
|
||||||
conn.row_factory = aiosqlite.Row
|
|
||||||
async with conn.execute(query, (trace_id,)) as cursor:
|
|
||||||
row = await cursor.fetchone()
|
|
||||||
if row is None:
|
|
||||||
raise ValueError(f"Trace {trace_id} not found")
|
|
||||||
return Trace(**row)
|
|
||||||
|
|
||||||
async def get_span(self, trace_id: str, span_id: str) -> Span:
|
|
||||||
query = "SELECT * FROM spans WHERE trace_id = ? AND span_id = ?"
|
|
||||||
async with aiosqlite.connect(self.conn_string) as conn:
|
|
||||||
conn.row_factory = aiosqlite.Row
|
|
||||||
async with conn.execute(query, (trace_id, span_id)) as cursor:
|
|
||||||
row = await cursor.fetchone()
|
|
||||||
if row is None:
|
|
||||||
raise ValueError(f"Span {span_id} not found")
|
|
||||||
return Span(**row)
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue