From 84ebed9c9f731a41b8f750f4043707b547332526 Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Tue, 17 Sep 2024 11:15:18 -0700 Subject: [PATCH] opentelemetry -> jaeger --- .../providers/adapters/telemetry/__init__.py | 5 + .../telemetry/opentelemetry/__init__.py | 15 ++ .../telemetry/opentelemetry/config.py | 12 ++ .../telemetry/opentelemetry/opentelemetry.py | 185 ++++++++++++++++++ llama_stack/providers/registry/telemetry.py | 14 ++ 5 files changed, 231 insertions(+) create mode 100644 llama_stack/providers/adapters/telemetry/__init__.py create mode 100644 llama_stack/providers/adapters/telemetry/opentelemetry/__init__.py create mode 100644 llama_stack/providers/adapters/telemetry/opentelemetry/config.py create mode 100644 llama_stack/providers/adapters/telemetry/opentelemetry/opentelemetry.py diff --git a/llama_stack/providers/adapters/telemetry/__init__.py b/llama_stack/providers/adapters/telemetry/__init__.py new file mode 100644 index 000000000..756f351d8 --- /dev/null +++ b/llama_stack/providers/adapters/telemetry/__init__.py @@ -0,0 +1,5 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. diff --git a/llama_stack/providers/adapters/telemetry/opentelemetry/__init__.py b/llama_stack/providers/adapters/telemetry/opentelemetry/__init__.py new file mode 100644 index 000000000..0842afe2d --- /dev/null +++ b/llama_stack/providers/adapters/telemetry/opentelemetry/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .config import OpenTelemetryConfig + + +async def get_adapter_impl(config: OpenTelemetryConfig, _deps): + from .opentelemetry import OpenTelemetryAdapter + + impl = OpenTelemetryAdapter(config) + await impl.initialize() + return impl diff --git a/llama_stack/providers/adapters/telemetry/opentelemetry/config.py b/llama_stack/providers/adapters/telemetry/opentelemetry/config.py new file mode 100644 index 000000000..71a82aed9 --- /dev/null +++ b/llama_stack/providers/adapters/telemetry/opentelemetry/config.py @@ -0,0 +1,12 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from pydantic import BaseModel + + +class OpenTelemetryConfig(BaseModel): + jaeger_host: str = "localhost" + jaeger_port: int = 6831 diff --git a/llama_stack/providers/adapters/telemetry/opentelemetry/opentelemetry.py b/llama_stack/providers/adapters/telemetry/opentelemetry/opentelemetry.py new file mode 100644 index 000000000..44e49346e --- /dev/null +++ b/llama_stack/providers/adapters/telemetry/opentelemetry/opentelemetry.py @@ -0,0 +1,185 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from datetime import datetime + +from opentelemetry import metrics, trace +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.semconv.resource import ResourceAttributes + +from llama_stack.apis.telemetry import * # noqa: F403 + +from .config import OpenTelemetryConfig + + +class OpenTelemetryAdapter(Telemetry): + def __init__(self, config: OpenTelemetryConfig): + self.config = config + + self.resource = Resource.create( + {ResourceAttributes.SERVICE_NAME: "foobar-service"} + ) + + # Set up tracing with Jaeger exporter + jaeger_exporter = JaegerExporter( + agent_host_name=self.config.jaeger_host, + agent_port=self.config.jaeger_port, + ) + trace_provider = TracerProvider(resource=self.resource) + trace_processor = BatchSpanProcessor(jaeger_exporter) + trace_provider.add_span_processor(trace_processor) + trace.set_tracer_provider(trace_provider) + self.tracer = trace.get_tracer(__name__) + + # Set up metrics + metric_reader = PeriodicExportingMetricReader(ConsoleMetricExporter()) + metric_provider = MeterProvider( + resource=self.resource, metric_readers=[metric_reader] + ) + metrics.set_meter_provider(metric_provider) + self.meter = metrics.get_meter(__name__) + + async def initialize(self) -> None: + pass + + async def shutdown(self) -> None: + trace.get_tracer_provider().shutdown() + metrics.get_meter_provider().shutdown() + + async def log_event(self, event: Event) -> None: + if isinstance(event, UnstructuredLogEvent): + self._log_unstructured(event) + elif isinstance(event, MetricEvent): + self._log_metric(event) + elif isinstance(event, StructuredLogEvent): + self._log_structured(event) + + def _log_unstructured(self, event: UnstructuredLogEvent) -> None: + span = trace.get_current_span() + span.add_event( + name=event.message, + attributes={"severity": event.severity.value, **event.attributes}, + timestamp=event.timestamp, + ) + + def _log_metric(self, event: MetricEvent) -> None: + if isinstance(event.value, int): + self.meter.create_counter( + name=event.metric, + unit=event.unit, + description=f"Counter for {event.metric}", + ).add(event.value, attributes=event.attributes) + elif isinstance(event.value, float): + self.meter.create_gauge( + name=event.metric, + unit=event.unit, + description=f"Gauge for {event.metric}", + ).set(event.value, attributes=event.attributes) + + def _log_structured(self, event: StructuredLogEvent) -> None: + if isinstance(event.payload, SpanStartPayload): + context = trace.set_span_in_context( + trace.NonRecordingSpan( + trace.SpanContext( + trace_id=int(event.trace_id, 16), + span_id=int(event.span_id, 16), + is_remote=True, + ) + ) + ) + span = self.tracer.start_span( + name=event.payload.name, + context=context, + kind=trace.SpanKind.INTERNAL, + attributes=event.attributes, + ) + if event.payload.parent_span_id: + span.set_parent( + trace.SpanContext( + trace_id=int(event.trace_id, 16), + span_id=int(event.payload.parent_span_id, 16), + is_remote=True, + ) + ) + elif isinstance(event.payload, SpanEndPayload): + span = trace.get_current_span() + span.set_status( + trace.Status( + trace.StatusCode.OK + if event.payload.status == SpanStatus.OK + else trace.StatusCode.ERROR + ) + ) + span.end(end_time=event.timestamp) + + async def get_trace(self, trace_id: str) -> Trace: + # we need to look up the root span id + raise NotImplementedError("not yet no") + + +# Usage example +async def main(): + telemetry = OpenTelemetryTelemetry("my-service") + await telemetry.initialize() + + # Log an unstructured event + await telemetry.log_event( + UnstructuredLogEvent( + trace_id="trace123", + span_id="span456", + timestamp=datetime.now(), + message="This is a log message", + severity=LogSeverity.INFO, + ) + ) + + # Log a metric event + await telemetry.log_event( + MetricEvent( + trace_id="trace123", + span_id="span456", + timestamp=datetime.now(), + metric="my_metric", + value=42, + unit="count", + ) + ) + + # Log a structured event (span start) + await telemetry.log_event( + StructuredLogEvent( + trace_id="trace123", + span_id="span789", + timestamp=datetime.now(), + payload=SpanStartPayload(name="my_operation"), + ) + ) + + # Log a structured event (span end) + await telemetry.log_event( + StructuredLogEvent( + trace_id="trace123", + span_id="span789", + timestamp=datetime.now(), + payload=SpanEndPayload(status=SpanStatus.OK), + ) + ) + + await telemetry.shutdown() + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/llama_stack/providers/registry/telemetry.py b/llama_stack/providers/registry/telemetry.py index 363578749..02b71077e 100644 --- a/llama_stack/providers/registry/telemetry.py +++ b/llama_stack/providers/registry/telemetry.py @@ -27,4 +27,18 @@ def available_providers() -> List[ProviderSpec]: config_class="llama_stack.providers.adapters.telemetry.sample.SampleConfig", ), ), + remote_provider_spec( + api=Api.telemetry, + adapter=AdapterSpec( + adapter_id="opentelemetry-jaeger", + pip_packages=[ + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-jaeger", + "opentelemetry-semantic-conventions", + ], + module="llama_stack.providers.adapters.telemetry.opentelemetry", + config_class="llama_stack.providers.adapters.telemetry.opentelemetry.OpenTelemetryConfig", + ), + ), ]