From cb7d34c080aaf86676057cd90a44b4ac8095d59a Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Fri, 5 Jul 2024 16:25:08 -0700 Subject: [PATCH] feat - galileo logging --- litellm/integrations/galileo.py | 134 ++++++++++++++++++++++++++ litellm/integrations/types/galileo.py | 9 +- 2 files changed, 139 insertions(+), 4 deletions(-) create mode 100644 litellm/integrations/galileo.py diff --git a/litellm/integrations/galileo.py b/litellm/integrations/galileo.py new file mode 100644 index 0000000000..ebf7998672 --- /dev/null +++ b/litellm/integrations/galileo.py @@ -0,0 +1,134 @@ +import os +from datetime import datetime +from typing import List + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_logger import CustomLogger +from litellm.integrations.types.galileo import LLMResponse +from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler + + +class GalileoObserve(CustomLogger): + def __init__(self) -> None: + self.in_memory_records: List[dict] = [] + self.batch_size = 1 + self.base_url = os.getenv("GALILEO_BASE_URL", None) + self.project_id = os.getenv("GALILEO_PROJECT_ID", None) + self.headers = None + self.async_httpx_handler = AsyncHTTPHandler() + pass + + def set_galileo_headers(self): + # following https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#logging-your-records + + headers = { + "accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + } + galileo_login_response = self.async_httpx_handler.post( + url=f"{self.base_url}/login", + headers=headers, + data={ + "username": os.getenv("GALILEO_USERNAME"), + "password": os.getenv("GALILEO_PASSWORD"), + }, + ) + + access_token = galileo_login_response.json()["access_token"] + + self.headers = { + "accept": "application/json", + "Content-Type": "application/json", + "Authorization": f"Bearer {access_token}", + } + + def get_output_str_from_response(self, response_obj, kwargs): + output = None + if response_obj is not None and ( + kwargs.get("call_type", None) == "embedding" + or isinstance(response_obj, litellm.EmbeddingResponse) + ): + output = None + elif response_obj is not None and isinstance( + response_obj, litellm.ModelResponse + ): + output = response_obj["choices"][0]["message"].json() + elif response_obj is not None and isinstance( + response_obj, litellm.TextCompletionResponse + ): + output = response_obj.choices[0].text + elif response_obj is not None and isinstance( + response_obj, litellm.ImageResponse + ): + output = response_obj["data"] + + return output + + async def async_log_success_event( + self, + kwargs, + start_time, + end_time, + response_obj, + ): + verbose_logger.debug(f"On Async Success") + + _latency_ms = int((end_time - start_time).total_seconds() * 1000) + _call_type = kwargs.get("call_type", "litellm") + input_text = litellm.utils.get_formatted_prompt( + data=kwargs, call_type=_call_type + ) + + _usage = response_obj.get("usage", {}) or {} + num_input_tokens = _usage.get("prompt_tokens", 0) + num_output_tokens = _usage.get("completion_tokens", 0) + + output_text = self.get_output_str_from_response( + response_obj=response_obj, kwargs=kwargs + ) + + request_record = LLMResponse( + latency_ms=_latency_ms, + status_code=200, + input_text=input_text, + output_text=output_text, + node_type=_call_type, + model=kwargs.get("model", "-"), + num_input_tokens=num_input_tokens, + num_output_tokens=num_output_tokens, + created_at=start_time.strftime( + "%Y-%m-%dT%H:%M:%S" + ), # timestamp str constructed in "%Y-%m-%dT%H:%M:%S" format + ) + + # dump to dict + request_dict = request_record.model_dump() + self.in_memory_records.append(request_dict) + + if len(self.in_memory_records) >= self.batch_size: + await self.flush_in_memory_records() + + async def flush_in_memory_records(self): + verbose_logger.debug("flushing in memory records") + response = await self.async_httpx_handler.post( + url=f"{self.base_url}/projects/{self.project_id}/observe/ingest", + headers=self.headers, + json={"records": self.in_memory_records}, + ) + + if response.status_code == 200: + verbose_logger.debug( + "Galileo Logger:successfully flushed in memory records" + ) + self.in_memory_records = [] + else: + verbose_logger.debug("Galileo Logger: failed to flush in memory records") + verbose_logger.debug( + "Galileo Logger error=%s, status code=%s", + response.text, + response.status_code, + ) + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + verbose_logger.debug(f"On Async Failure") diff --git a/litellm/integrations/types/galileo.py b/litellm/integrations/types/galileo.py index cbdf53da56..54d3072af4 100644 --- a/litellm/integrations/types/galileo.py +++ b/litellm/integrations/types/galileo.py @@ -15,10 +15,11 @@ class LLMResponse(BaseModel): num_input_tokens: int num_output_tokens: int output_logprobs: Optional[Dict[str, Any]] = Field( - description="Optional. When available, logprobs are used to compute Uncertainty." + default=None, + description="Optional. When available, logprobs are used to compute Uncertainty.", ) - created_at: datetime = Field( + created_at: str = Field( ..., description='timestamp constructed in "%Y-%m-%dT%H:%M:%S" format' ) - tags: List[str] - user_metadata: Dict[str, Any] + tags: Optional[List[str]] = None + user_metadata: Optional[Dict[str, Any]] = None