diff --git a/llama_stack/providers/remote/telemetry/opentelemetry/opentelemetry.py b/llama_stack/providers/remote/telemetry/opentelemetry/opentelemetry.py deleted file mode 100644 index 04eb71ce0..000000000 --- a/llama_stack/providers/remote/telemetry/opentelemetry/opentelemetry.py +++ /dev/null @@ -1,259 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import threading -from typing import List, Optional - -from llama_stack.distribution.datatypes import Api -from llama_stack.providers.remote.telemetry.opentelemetry.console_span_processor import ( - ConsoleSpanProcessor, -) -from llama_stack.providers.remote.telemetry.opentelemetry.sqlite_span_processor import ( - SQLiteSpanProcessor, -) -from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore - -from opentelemetry import metrics, trace -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.semconv.resource import ResourceAttributes - - -from llama_stack.apis.telemetry import * # noqa: F403 - -from .config import OpenTelemetryConfig, TelemetrySink - -_GLOBAL_STORAGE = { - "active_spans": {}, - "counters": {}, - "gauges": {}, - "up_down_counters": {}, -} -_global_lock = threading.Lock() - - -def string_to_trace_id(s: str) -> int: - # Convert the string to bytes and then to an integer - return int.from_bytes(s.encode(), byteorder="big", signed=False) - - -def string_to_span_id(s: str) -> int: - # Use only the first 8 bytes (64 bits) for span ID - return int.from_bytes(s.encode()[:8], byteorder="big", signed=False) - - -def is_tracing_enabled(tracer): - with tracer.start_as_current_span("check_tracing") as span: - return span.is_recording() - - -class OpenTelemetryAdapter(Telemetry): - def __init__(self, config: OpenTelemetryConfig, deps) -> None: - self.config = config - self.datasetio = deps[Api.datasetio] - - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: self.config.service_name, - } - ) - - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) - if TelemetrySink.JAEGER in self.config.sinks: - otlp_exporter = OTLPSpanExporter( - endpoint=self.config.otel_endpoint, - ) - span_processor = BatchSpanProcessor(otlp_exporter) - trace.get_tracer_provider().add_span_processor(span_processor) - metric_reader = PeriodicExportingMetricReader( - OTLPMetricExporter( - endpoint=self.config.otel_endpoint, - ) - ) - metric_provider = MeterProvider( - resource=resource, metric_readers=[metric_reader] - ) - metrics.set_meter_provider(metric_provider) - self.meter = metrics.get_meter(__name__) - if TelemetrySink.SQLITE in self.config.sinks: - trace.get_tracer_provider().add_span_processor( - SQLiteSpanProcessor(self.config.sqlite_db_path) - ) - self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) - if TelemetrySink.CONSOLE in self.config.sinks: - trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor()) - self._lock = _global_lock - - async def initialize(self) -> None: - pass - - async def shutdown(self) -> None: - trace.get_tracer_provider().force_flush() - trace.get_tracer_provider().shutdown() - metrics.get_meter_provider().shutdown() - - async def log_event(self, event: Event, ttl_seconds: int = 604800) -> None: - if isinstance(event, UnstructuredLogEvent): - self._log_unstructured(event, ttl_seconds) - elif isinstance(event, MetricEvent): - self._log_metric(event) - elif isinstance(event, StructuredLogEvent): - self._log_structured(event, ttl_seconds) - else: - raise ValueError(f"Unknown event type: {event}") - - def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None: - with self._lock: - # Use global storage instead of instance storage - span_id = string_to_span_id(event.span_id) - span = _GLOBAL_STORAGE["active_spans"].get(span_id) - - if span: - timestamp_ns = int(event.timestamp.timestamp() * 1e9) - span.add_event( - name=event.type, - attributes={ - "message": event.message, - "severity": event.severity.value, - "__ttl__": ttl_seconds, - **event.attributes, - }, - timestamp=timestamp_ns, - ) - else: - print( - f"Warning: No active span found for span_id {span_id}. Dropping event: {event}" - ) - - def _get_or_create_counter(self, name: str, unit: str) -> metrics.Counter: - if name not in _GLOBAL_STORAGE["counters"]: - _GLOBAL_STORAGE["counters"][name] = self.meter.create_counter( - name=name, - unit=unit, - description=f"Counter for {name}", - ) - return _GLOBAL_STORAGE["counters"][name] - - def _get_or_create_gauge(self, name: str, unit: str) -> metrics.ObservableGauge: - if name not in _GLOBAL_STORAGE["gauges"]: - _GLOBAL_STORAGE["gauges"][name] = self.meter.create_gauge( - name=name, - unit=unit, - description=f"Gauge for {name}", - ) - return _GLOBAL_STORAGE["gauges"][name] - - def _log_metric(self, event: MetricEvent) -> None: - if isinstance(event.value, int): - counter = self._get_or_create_counter(event.metric, event.unit) - counter.add(event.value, attributes=event.attributes) - elif isinstance(event.value, float): - up_down_counter = self._get_or_create_up_down_counter( - event.metric, event.unit - ) - up_down_counter.add(event.value, attributes=event.attributes) - - def _get_or_create_up_down_counter( - self, name: str, unit: str - ) -> metrics.UpDownCounter: - if name not in _GLOBAL_STORAGE["up_down_counters"]: - _GLOBAL_STORAGE["up_down_counters"][name] = ( - self.meter.create_up_down_counter( - name=name, - unit=unit, - description=f"UpDownCounter for {name}", - ) - ) - return _GLOBAL_STORAGE["up_down_counters"][name] - - def _log_structured(self, event: StructuredLogEvent, ttl_seconds: int) -> None: - with self._lock: - span_id = string_to_span_id(event.span_id) - trace_id = string_to_trace_id(event.trace_id) - tracer = trace.get_tracer(__name__) - if event.attributes is None: - event.attributes = {} - event.attributes["__ttl__"] = ttl_seconds - - if isinstance(event.payload, SpanStartPayload): - # Check if span already exists to prevent duplicates - if span_id in _GLOBAL_STORAGE["active_spans"]: - return - - parent_span = None - if event.payload.parent_span_id: - parent_span_id = string_to_span_id(event.payload.parent_span_id) - parent_span = _GLOBAL_STORAGE["active_spans"].get(parent_span_id) - - context = trace.Context(trace_id=trace_id) - if parent_span: - context = trace.set_span_in_context(parent_span, context) - - span = tracer.start_span( - name=event.payload.name, - context=context, - attributes=event.attributes or {}, - ) - _GLOBAL_STORAGE["active_spans"][span_id] = span - - elif isinstance(event.payload, SpanEndPayload): - span = _GLOBAL_STORAGE["active_spans"].get(span_id) - if span: - if event.attributes: - span.set_attributes(event.attributes) - - status = ( - trace.Status(status_code=trace.StatusCode.OK) - if event.payload.status == SpanStatus.OK - else trace.Status(status_code=trace.StatusCode.ERROR) - ) - span.set_status(status) - span.end() - _GLOBAL_STORAGE["active_spans"].pop(span_id, None) - else: - raise ValueError(f"Unknown structured log event: {event}") - - async def query_traces( - self, - attribute_conditions: Optional[List[QueryCondition]] = None, - attribute_keys_to_return: Optional[List[str]] = None, - limit: Optional[int] = 100, - offset: Optional[int] = 0, - order_by: Optional[List[str]] = None, - ) -> List[Trace]: - return await self.trace_store.query_traces( - attribute_conditions=attribute_conditions, - attribute_keys_to_return=attribute_keys_to_return, - limit=limit, - offset=offset, - order_by=order_by, - ) - - async def get_spans( - self, - span_id: str, - attribute_conditions: Optional[List[QueryCondition]] = None, - attribute_keys_to_return: Optional[List[str]] = None, - max_depth: Optional[int] = None, - limit: Optional[int] = 100, - offset: Optional[int] = 0, - order_by: Optional[List[str]] = None, - ) -> SpanWithChildren: - return await self.trace_store.get_spans( - span_id=span_id, - attribute_conditions=attribute_conditions, - attribute_keys_to_return=attribute_keys_to_return, - max_depth=max_depth, - limit=limit, - offset=offset, - order_by=order_by, - ) diff --git a/llama_stack/providers/utils/telemetry/sqlite.py b/llama_stack/providers/utils/telemetry/sqlite.py deleted file mode 100644 index e7161fffa..000000000 --- a/llama_stack/providers/utils/telemetry/sqlite.py +++ /dev/null @@ -1,177 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import json -from datetime import datetime -from typing import List, Optional - -import aiosqlite - -from llama_stack.apis.telemetry import ( - QueryCondition, - SpanWithChildren, - Trace, - TraceStore, -) - - -class SQLiteTraceStore(TraceStore): - def __init__(self, conn_string: str): - self.conn_string = conn_string - - async def query_traces( - self, - attribute_filters: Optional[List[QueryCondition]] = None, - attributes_to_return: Optional[List[str]] = None, - limit: Optional[int] = 100, - offset: Optional[int] = 0, - order_by: Optional[List[str]] = None, - ) -> List[Trace]: - print(attribute_filters, attributes_to_return, limit, offset, order_by) - - def build_attribute_select() -> str: - if not attributes_to_return: - return "" - return "".join( - f", json_extract(s.attributes, '$.{key}') as attr_{key}" - for key in attributes_to_return - ) - - def build_where_clause() -> tuple[str, list]: - if not attribute_filters: - return "", [] - - conditions = [ - f"json_extract(s.attributes, '$.{condition.key}') {condition.op} ?" - for condition in attribute_filters - ] - params = [condition.value for condition in attribute_filters] - where_clause = " WHERE " + " AND ".join(conditions) - return where_clause, params - - def build_order_clause() -> str: - if not order_by: - return "" - - order_clauses = [] - for field in order_by: - desc = field.startswith("-") - clean_field = field[1:] if desc else field - order_clauses.append(f"t.{clean_field} {'DESC' if desc else 'ASC'}") - return " ORDER BY " + ", ".join(order_clauses) - - # Build the main query - base_query = """ - WITH matching_traces AS ( - SELECT DISTINCT t.trace_id - FROM traces t - JOIN spans s ON t.trace_id = s.trace_id - {where_clause} - ), - filtered_traces AS ( - SELECT t.trace_id, t.root_span_id, t.start_time, t.end_time - {attribute_select} - FROM matching_traces mt - JOIN traces t ON mt.trace_id = t.trace_id - LEFT JOIN spans s ON t.trace_id = s.trace_id - {order_clause} - ) - SELECT DISTINCT trace_id, root_span_id, start_time, end_time - FROM filtered_traces - LIMIT {limit} OFFSET {offset} - """ - - where_clause, params = build_where_clause() - query = base_query.format( - attribute_select=build_attribute_select(), - where_clause=where_clause, - order_clause=build_order_clause(), - limit=limit, - offset=offset, - ) - - # Execute query and return results - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, params) as cursor: - rows = await cursor.fetchall() - return [ - Trace( - trace_id=row["trace_id"], - root_span_id=row["root_span_id"], - start_time=datetime.fromisoformat(row["start_time"]), - end_time=datetime.fromisoformat(row["end_time"]), - ) - for row in rows - ] - - async def get_materialized_span( - self, - span_id: str, - attributes_to_return: Optional[List[str]] = None, - max_depth: Optional[int] = None, - ) -> SpanWithChildren: - # Build the attributes selection - attributes_select = "s.attributes" - if attributes_to_return: - json_object = ", ".join( - f"'{key}', json_extract(s.attributes, '$.{key}')" - for key in attributes_to_return - ) - attributes_select = f"json_object({json_object})" - - # SQLite CTE query with filtered attributes - query = f""" - WITH RECURSIVE span_tree AS ( - SELECT s.*, 1 as depth, {attributes_select} as filtered_attributes - FROM spans s - WHERE s.span_id = ? - - UNION ALL - - SELECT s.*, st.depth + 1, {attributes_select} as filtered_attributes - FROM spans s - JOIN span_tree st ON s.parent_span_id = st.span_id - WHERE (? IS NULL OR st.depth < ?) - ) - SELECT * - FROM span_tree - ORDER BY depth, start_time - """ - - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, (span_id, max_depth, max_depth)) as cursor: - rows = await cursor.fetchall() - - if not rows: - raise ValueError(f"Span {span_id} not found") - - # Build span tree - spans_by_id = {} - root_span = None - - for row in rows: - span = SpanWithChildren( - span_id=row["span_id"], - trace_id=row["trace_id"], - parent_span_id=row["parent_span_id"], - name=row["name"], - start_time=datetime.fromisoformat(row["start_time"]), - end_time=datetime.fromisoformat(row["end_time"]), - attributes=json.loads(row["filtered_attributes"]), - status=row["status"].lower(), - children=[], - ) - - spans_by_id[span.span_id] = span - - if span.span_id == span_id: - root_span = span - elif span.parent_span_id in spans_by_id: - spans_by_id[span.parent_span_id].children.append(span) - - return root_span