mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 11:43:54 +00:00
feat - galileo logging
This commit is contained in:
parent
0eeea98f82
commit
cb7d34c080
2 changed files with 139 additions and 4 deletions
134
litellm/integrations/galileo.py
Normal file
134
litellm/integrations/galileo.py
Normal file
|
@ -0,0 +1,134 @@
|
|||
import os
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_logger
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.integrations.types.galileo import LLMResponse
|
||||
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
|
||||
|
||||
|
||||
class GalileoObserve(CustomLogger):
|
||||
def __init__(self) -> None:
|
||||
self.in_memory_records: List[dict] = []
|
||||
self.batch_size = 1
|
||||
self.base_url = os.getenv("GALILEO_BASE_URL", None)
|
||||
self.project_id = os.getenv("GALILEO_PROJECT_ID", None)
|
||||
self.headers = None
|
||||
self.async_httpx_handler = AsyncHTTPHandler()
|
||||
pass
|
||||
|
||||
def set_galileo_headers(self):
|
||||
# following https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#logging-your-records
|
||||
|
||||
headers = {
|
||||
"accept": "application/json",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
}
|
||||
galileo_login_response = self.async_httpx_handler.post(
|
||||
url=f"{self.base_url}/login",
|
||||
headers=headers,
|
||||
data={
|
||||
"username": os.getenv("GALILEO_USERNAME"),
|
||||
"password": os.getenv("GALILEO_PASSWORD"),
|
||||
},
|
||||
)
|
||||
|
||||
access_token = galileo_login_response.json()["access_token"]
|
||||
|
||||
self.headers = {
|
||||
"accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
}
|
||||
|
||||
def get_output_str_from_response(self, response_obj, kwargs):
|
||||
output = None
|
||||
if response_obj is not None and (
|
||||
kwargs.get("call_type", None) == "embedding"
|
||||
or isinstance(response_obj, litellm.EmbeddingResponse)
|
||||
):
|
||||
output = None
|
||||
elif response_obj is not None and isinstance(
|
||||
response_obj, litellm.ModelResponse
|
||||
):
|
||||
output = response_obj["choices"][0]["message"].json()
|
||||
elif response_obj is not None and isinstance(
|
||||
response_obj, litellm.TextCompletionResponse
|
||||
):
|
||||
output = response_obj.choices[0].text
|
||||
elif response_obj is not None and isinstance(
|
||||
response_obj, litellm.ImageResponse
|
||||
):
|
||||
output = response_obj["data"]
|
||||
|
||||
return output
|
||||
|
||||
async def async_log_success_event(
|
||||
self,
|
||||
kwargs,
|
||||
start_time,
|
||||
end_time,
|
||||
response_obj,
|
||||
):
|
||||
verbose_logger.debug(f"On Async Success")
|
||||
|
||||
_latency_ms = int((end_time - start_time).total_seconds() * 1000)
|
||||
_call_type = kwargs.get("call_type", "litellm")
|
||||
input_text = litellm.utils.get_formatted_prompt(
|
||||
data=kwargs, call_type=_call_type
|
||||
)
|
||||
|
||||
_usage = response_obj.get("usage", {}) or {}
|
||||
num_input_tokens = _usage.get("prompt_tokens", 0)
|
||||
num_output_tokens = _usage.get("completion_tokens", 0)
|
||||
|
||||
output_text = self.get_output_str_from_response(
|
||||
response_obj=response_obj, kwargs=kwargs
|
||||
)
|
||||
|
||||
request_record = LLMResponse(
|
||||
latency_ms=_latency_ms,
|
||||
status_code=200,
|
||||
input_text=input_text,
|
||||
output_text=output_text,
|
||||
node_type=_call_type,
|
||||
model=kwargs.get("model", "-"),
|
||||
num_input_tokens=num_input_tokens,
|
||||
num_output_tokens=num_output_tokens,
|
||||
created_at=start_time.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S"
|
||||
), # timestamp str constructed in "%Y-%m-%dT%H:%M:%S" format
|
||||
)
|
||||
|
||||
# dump to dict
|
||||
request_dict = request_record.model_dump()
|
||||
self.in_memory_records.append(request_dict)
|
||||
|
||||
if len(self.in_memory_records) >= self.batch_size:
|
||||
await self.flush_in_memory_records()
|
||||
|
||||
async def flush_in_memory_records(self):
|
||||
verbose_logger.debug("flushing in memory records")
|
||||
response = await self.async_httpx_handler.post(
|
||||
url=f"{self.base_url}/projects/{self.project_id}/observe/ingest",
|
||||
headers=self.headers,
|
||||
json={"records": self.in_memory_records},
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
verbose_logger.debug(
|
||||
"Galileo Logger:successfully flushed in memory records"
|
||||
)
|
||||
self.in_memory_records = []
|
||||
else:
|
||||
verbose_logger.debug("Galileo Logger: failed to flush in memory records")
|
||||
verbose_logger.debug(
|
||||
"Galileo Logger error=%s, status code=%s",
|
||||
response.text,
|
||||
response.status_code,
|
||||
)
|
||||
|
||||
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
||||
verbose_logger.debug(f"On Async Failure")
|
|
@ -15,10 +15,11 @@ class LLMResponse(BaseModel):
|
|||
num_input_tokens: int
|
||||
num_output_tokens: int
|
||||
output_logprobs: Optional[Dict[str, Any]] = Field(
|
||||
description="Optional. When available, logprobs are used to compute Uncertainty."
|
||||
default=None,
|
||||
description="Optional. When available, logprobs are used to compute Uncertainty.",
|
||||
)
|
||||
created_at: datetime = Field(
|
||||
created_at: str = Field(
|
||||
..., description='timestamp constructed in "%Y-%m-%dT%H:%M:%S" format'
|
||||
)
|
||||
tags: List[str]
|
||||
user_metadata: Dict[str, Any]
|
||||
tags: Optional[List[str]] = None
|
||||
user_metadata: Optional[Dict[str, Any]] = None
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue