Simplified Telemetry API and tying it to logger

This commit is contained in:
Ashwin Bharambe 2024-09-07 15:25:35 -07:00
parent 741310f78e
commit 6ccb0a4c1f
15 changed files with 496 additions and 161 deletions

View file

@ -6,170 +6,91 @@
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Protocol, Union
from typing import Any, Dict, Literal, Optional, Protocol, Union
from llama_models.schema_utils import json_schema_type, webmethod
from pydantic import BaseModel
from pydantic import BaseModel, Field
from typing_extensions import Annotated
@json_schema_type
class ExperimentStatus(Enum):
NOT_STARTED = "not_started"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class SpanStatus(Enum):
OK = "ok"
ERROR = "error"
@json_schema_type
class Experiment(BaseModel):
id: str
class Span(BaseModel):
span_id: str
trace_id: str
parent_span_id: Optional[str] = None
name: str
status: ExperimentStatus
created_at: datetime
updated_at: datetime
metadata: Dict[str, Any]
start_time: datetime
end_time: Optional[datetime] = None
attributes: Optional[Dict[str, Any]] = Field(default_factory=dict)
@json_schema_type
class Run(BaseModel):
id: str
experiment_id: str
status: str
started_at: datetime
ended_at: Optional[datetime]
metadata: Dict[str, Any]
class Trace(BaseModel):
trace_id: str
root_span_id: str
start_time: datetime
end_time: Optional[datetime] = None
@json_schema_type
class Metric(BaseModel):
name: str
value: Union[float, int, str, bool]
class EventType(Enum):
LOG = "log"
SPAN_START = "span_start"
SPAN_END = "span_end"
@json_schema_type
class LogSeverity(Enum):
VERBOSE = "verbose"
DEBUG = "debug"
INFO = "info"
WARN = "warn"
ERROR = "error"
CRITICAL = "critical"
class EventCommon(BaseModel):
trace_id: str
span_id: str
timestamp: datetime
run_id: str
attributes: Optional[Dict[str, Any]] = Field(default_factory=dict)
@json_schema_type
class Log(BaseModel):
class LoggingEvent(EventCommon):
type: Literal[EventType.LOG.value] = EventType.LOG.value
message: str
level: str
timestamp: datetime
additional_info: Dict[str, Any]
severity: LogSeverity
@json_schema_type
class ArtifactType(Enum):
MODEL = "model"
DATASET = "dataset"
CHECKPOINT = "checkpoint"
PLOT = "plot"
METRIC = "metric"
CONFIG = "config"
CODE = "code"
OTHER = "other"
@json_schema_type
class Artifact(BaseModel):
id: str
class SpanStartEvent(EventCommon):
type: Literal[EventType.SPAN_START.value] = EventType.SPAN_START.value
name: str
type: ArtifactType
size: int
created_at: datetime
metadata: Dict[str, Any]
parent_span_id: Optional[str] = None
@json_schema_type
class CreateExperimentRequest(BaseModel):
name: str
metadata: Optional[Dict[str, Any]] = None
class SpanEndEvent(EventCommon):
type: Literal[EventType.SPAN_END.value] = EventType.SPAN_END.value
status: SpanStatus
@json_schema_type
class UpdateExperimentRequest(BaseModel):
experiment_id: str
status: Optional[ExperimentStatus] = None
metadata: Optional[Dict[str, Any]] = None
@json_schema_type
class CreateRunRequest(BaseModel):
experiment_id: str
metadata: Optional[Dict[str, Any]] = None
@json_schema_type
class UpdateRunRequest(BaseModel):
run_id: str
status: Optional[str] = None
ended_at: Optional[datetime] = None
metadata: Optional[Dict[str, Any]] = None
@json_schema_type
class LogMetricsRequest(BaseModel):
run_id: str
metrics: List[Metric]
@json_schema_type
class LogMessagesRequest(BaseModel):
logs: List[Log]
run_id: Optional[str] = None
@json_schema_type
class UploadArtifactRequest(BaseModel):
experiment_id: str
name: str
artifact_type: str
content: bytes
metadata: Optional[Dict[str, Any]] = None
@json_schema_type
class LogSearchRequest(BaseModel):
query: str
filters: Optional[Dict[str, Any]] = None
Event = Annotated[
Union[LoggingEvent, SpanStartEvent, SpanEndEvent],
Field(discriminator="type"),
]
class Telemetry(Protocol):
@webmethod(route="/experiments/create")
def create_experiment(self, request: CreateExperimentRequest) -> Experiment: ...
@webmethod(route="/telemetry/log_event")
async def log_event(self, event: Event): ...
@webmethod(route="/experiments/list")
def list_experiments(self) -> List[Experiment]: ...
@webmethod(route="/experiments/get")
def get_experiment(self, experiment_id: str) -> Experiment: ...
@webmethod(route="/experiments/update")
def update_experiment(self, request: UpdateExperimentRequest) -> Experiment: ...
@webmethod(route="/experiments/create_run")
def create_run(self, request: CreateRunRequest) -> Run: ...
@webmethod(route="/runs/update")
def update_run(self, request: UpdateRunRequest) -> Run: ...
@webmethod(route="/runs/log_metrics")
def log_metrics(self, request: LogMetricsRequest) -> None: ...
@webmethod(route="/runs/metrics", method="GET")
def get_metrics(self, run_id: str) -> List[Metric]: ...
@webmethod(route="/logging/log_messages")
def log_messages(self, request: LogMessagesRequest) -> None: ...
@webmethod(route="/logging/get_logs")
def get_logs(self, request: LogSearchRequest) -> List[Log]: ...
@webmethod(route="/experiments/artifacts/upload")
def upload_artifact(self, request: UploadArtifactRequest) -> Artifact: ...
@webmethod(route="/experiments/artifacts/get")
def list_artifacts(self, experiment_id: str) -> List[Artifact]: ...
@webmethod(route="/artifacts/get")
def get_artifact(self, artifact_id: str) -> Artifact: ...
@webmethod(route="/telemetry/get_trace", method="GET")
async def get_trace(self, trace_id: str) -> Trace: ...