use postgres to store traces and query

This commit is contained in:
Dinesh Yeduguru 2024-12-02 09:39:56 -08:00
parent 4dd08e5595
commit cb49d21a49
5 changed files with 235 additions and 9 deletions

View file

@ -4,14 +4,12 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.providers.utils.telemetry.jaeger import JaegerTraceStore
from .config import OpenTelemetryConfig
async def get_adapter_impl(config: OpenTelemetryConfig, deps):
from .opentelemetry import OpenTelemetryAdapter
trace_store = JaegerTraceStore(config.jaeger_query_endpoint, config.service_name)
impl = OpenTelemetryAdapter(config, trace_store, deps)
impl = OpenTelemetryAdapter(config, deps)
await impl.initialize()
return impl

View file

@ -18,10 +18,18 @@ class OpenTelemetryConfig(BaseModel):
default="llama-stack",
description="The service name to use for telemetry",
)
trace_store: str = Field(
default="postgres",
description="The trace store to use for telemetry",
)
jaeger_query_endpoint: str = Field(
default="http://localhost:16686/api/traces",
description="The Jaeger query endpoint URL",
)
postgres_conn_string: str = Field(
default="host=localhost dbname=llama_stack user=llama_stack password=llama_stack port=5432",
description="The PostgreSQL connection string to use for storing traces",
)
@classmethod
def sample_run_config(cls, **kwargs) -> Dict[str, Any]:

View file

@ -18,6 +18,11 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes
from llama_stack.distribution.datatypes import Api
from llama_stack.providers.remote.telemetry.opentelemetry.postgres_processor import (
PostgresSpanProcessor,
)
from llama_stack.providers.utils.telemetry.jaeger import JaegerTraceStore
from llama_stack.providers.utils.telemetry.postgres import PostgresTraceStore
from llama_stack.apis.telemetry import * # noqa: F403
@ -49,12 +54,18 @@ def is_tracing_enabled(tracer):
class OpenTelemetryAdapter(Telemetry):
def __init__(
self, config: OpenTelemetryConfig, trace_store: TraceStore, deps
) -> None:
def __init__(self, config: OpenTelemetryConfig, deps) -> None:
self.config = config
self.datasetio = deps[Api.datasetio]
self.trace_store = trace_store
if config.trace_store == "jaeger":
self.trace_store = JaegerTraceStore(
config.jaeger_query_endpoint, config.service_name
)
elif config.trace_store == "postgres":
self.trace_store = PostgresTraceStore(config.postgres_conn_string)
else:
raise ValueError(f"Invalid trace store: {config.trace_store}")
resource = Resource.create(
{
@ -69,6 +80,9 @@ class OpenTelemetryAdapter(Telemetry):
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
trace.get_tracer_provider().add_span_processor(
PostgresSpanProcessor(self.config.postgres_conn_string)
)
# Set up metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(
@ -252,8 +266,8 @@ class OpenTelemetryAdapter(Telemetry):
results.append(
EvalTrace(
step=child.span.name,
input=child.span.attributes.get("input", ""),
output=child.span.attributes.get("output", ""),
input=str(child.span.attributes.get("input", "")),
output=str(child.span.attributes.get("output", "")),
session_id=session_id,
expected_output="",
)

View file

@ -0,0 +1,92 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
from datetime import datetime
import psycopg2
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace import Span
class PostgresSpanProcessor(SpanProcessor):
def __init__(self, conn_string):
"""Initialize the PostgreSQL span processor with a connection string."""
self.conn_string = conn_string
self.conn = None
self.setup_database()
def setup_database(self):
"""Create the necessary table if it doesn't exist."""
with psycopg2.connect(self.conn_string) as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS traces (
trace_id TEXT,
span_id TEXT,
parent_span_id TEXT,
name TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
attributes JSONB,
status TEXT,
kind TEXT,
service_name TEXT,
session_id TEXT
)
"""
)
conn.commit()
def on_start(self, span: Span, parent_context=None):
"""Called when a span starts."""
pass
def on_end(self, span: Span):
"""Called when a span ends. Export the span data to PostgreSQL."""
try:
with psycopg2.connect(self.conn_string) as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO traces (
trace_id, span_id, parent_span_id, name,
start_time, end_time, attributes, status,
kind, service_name, session_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
format(span.get_span_context().trace_id, "032x"),
format(span.get_span_context().span_id, "016x"),
(
format(span.parent.span_id, "016x")
if span.parent
else None
),
span.name,
datetime.fromtimestamp(span.start_time / 1e9),
datetime.fromtimestamp(span.end_time / 1e9),
json.dumps(dict(span.attributes)),
span.status.status_code.name,
span.kind.name,
span.resource.attributes.get("service.name", "unknown"),
span.attributes.get("session_id", None),
),
)
conn.commit()
except Exception as e:
print(f"Error exporting span to PostgreSQL: {e}")
def shutdown(self):
"""Cleanup any resources."""
if self.conn:
self.conn.close()
def force_flush(self, timeout_millis=30000):
"""Force export of spans."""
pass