diff --git a/docs/my-website/docs/proxy/config_settings.md b/docs/my-website/docs/proxy/config_settings.md index fcde2e3ba8..5ba26031b8 100644 --- a/docs/my-website/docs/proxy/config_settings.md +++ b/docs/my-website/docs/proxy/config_settings.md @@ -367,6 +367,8 @@ router_settings: | GCS_PATH_SERVICE_ACCOUNT | Path to the Google Cloud service account JSON file | GCS_FLUSH_INTERVAL | Flush interval for GCS logging (in seconds). Specify how often you want a log to be sent to GCS. **Default is 20 seconds** | GCS_BATCH_SIZE | Batch size for GCS logging. Specify after how many logs you want to flush to GCS. If `BATCH_SIZE` is set to 10, logs are flushed every 10 logs. **Default is 2048** +| GCS_PUBSUB_TOPIC_ID | PubSub Topic ID to send LiteLLM SpendLogs to. +| GCS_PUBSUB_PROJECT_ID | PubSub Project ID to send LiteLLM SpendLogs to. | GENERIC_AUTHORIZATION_ENDPOINT | Authorization endpoint for generic OAuth providers | GENERIC_CLIENT_ID | Client ID for generic OAuth providers | GENERIC_CLIENT_SECRET | Client secret for generic OAuth providers diff --git a/docs/my-website/docs/proxy/logging.md b/docs/my-website/docs/proxy/logging.md index 3629cdd629..6992057a22 100644 --- a/docs/my-website/docs/proxy/logging.md +++ b/docs/my-website/docs/proxy/logging.md @@ -1025,6 +1025,74 @@ curl --location 'http://0.0.0.0:4000/chat/completions' \ 6. Save the JSON file and add the path to `GCS_PATH_SERVICE_ACCOUNT` + +## Google Cloud Storage - PubSub Topic + +Log LLM Logs/SpendLogs to [Google Cloud Storage PubSub Topic](https://cloud.google.com/pubsub/docs/reference/rest) + +:::info + +✨ This is an Enterprise only feature [Get Started with Enterprise here](https://calendly.com/d/4mp-gd3-k5k/litellm-1-1-onboarding-chat) + +::: + + +| Property | Details | +|----------|---------| +| Description | Log LiteLLM `SpendLogs Table` to Google Cloud Storage PubSub Topic | + +When to use `gcs_pubsub`? + +- If your LiteLLM Database has crossed 1M+ spend logs and you want to send `SpendLogs` to a PubSub Topic that can be consumed by GCS BigQuery + + +#### Usage + +1. Add `gcs_pubsub` to LiteLLM Config.yaml +```yaml +model_list: +- litellm_params: + api_base: https://exampleopenaiendpoint-production.up.railway.app/ + api_key: my-fake-key + model: openai/my-fake-model + model_name: fake-openai-endpoint + +litellm_settings: + callbacks: ["gcs_pubsub"] # 👈 KEY CHANGE # 👈 KEY CHANGE +``` + +2. Set required env variables + +```shell +GCS_PUBSUB_TOPIC_ID="litellmDB" +GCS_PUBSUB_PROJECT_ID="reliableKeys" +``` + +3. Start Proxy + +``` +litellm --config /path/to/config.yaml +``` + +4. Test it! + +```bash +curl --location 'http://0.0.0.0:4000/chat/completions' \ +--header 'Content-Type: application/json' \ +--data ' { + "model": "fake-openai-endpoint", + "messages": [ + { + "role": "user", + "content": "what llm are you" + } + ], + } +' +``` + + + ## s3 Buckets We will use the `--config` to set @@ -1301,7 +1369,7 @@ LiteLLM supports customizing the following Datadog environment variables ## Lunary -### Step1: Install dependencies and set your environment variables +#### Step1: Install dependencies and set your environment variables Install the dependencies ```shell pip install litellm lunary @@ -1312,7 +1380,7 @@ Get you Lunary public key from from https://app.lunary.ai/settings export LUNARY_PUBLIC_KEY="" ``` -### Step 2: Create a `config.yaml` and set `lunary` callbacks +#### Step 2: Create a `config.yaml` and set `lunary` callbacks ```yaml model_list: @@ -1324,12 +1392,12 @@ litellm_settings: failure_callback: ["lunary"] ``` -### Step 3: Start the LiteLLM proxy +#### Step 3: Start the LiteLLM proxy ```shell litellm --config config.yaml ``` -### Step 4: Make a request +#### Step 4: Make a request ```shell curl -X POST 'http://0.0.0.0:4000/chat/completions' \ @@ -1352,14 +1420,14 @@ curl -X POST 'http://0.0.0.0:4000/chat/completions' \ ## MLflow -### Step1: Install dependencies +#### Step1: Install dependencies Install the dependencies. ```shell pip install litellm mlflow ``` -### Step 2: Create a `config.yaml` with `mlflow` callback +#### Step 2: Create a `config.yaml` with `mlflow` callback ```yaml model_list: @@ -1371,12 +1439,12 @@ litellm_settings: failure_callback: ["mlflow"] ``` -### Step 3: Start the LiteLLM proxy +#### Step 3: Start the LiteLLM proxy ```shell litellm --config config.yaml ``` -### Step 4: Make a request +#### Step 4: Make a request ```shell curl -X POST 'http://0.0.0.0:4000/chat/completions' \ @@ -1392,7 +1460,7 @@ curl -X POST 'http://0.0.0.0:4000/chat/completions' \ }' ``` -### Step 5: Review traces +#### Step 5: Review traces Run the following command to start MLflow UI and review recorded traces. diff --git a/litellm/__init__.py b/litellm/__init__.py index fcef6bc56f..462dd07a8f 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -77,6 +77,7 @@ _custom_logger_compatible_callbacks_literal = Literal[ "langfuse", "pagerduty", "humanloop", + "gcs_pubsub", ] logged_real_time_event_types: Optional[Union[List[str], Literal["*"]]] = None _known_custom_logger_compatible_callbacks: List = list( diff --git a/litellm/integrations/Readme.md b/litellm/integrations/Readme.md new file mode 100644 index 0000000000..2b0b530ab8 --- /dev/null +++ b/litellm/integrations/Readme.md @@ -0,0 +1,5 @@ +# Integrations + +This folder contains logging integrations for litellm + +eg. logging to Datadog, Langfuse, Prometheus, s3, GCS Bucket, etc. \ No newline at end of file diff --git a/litellm/integrations/gcs_pubsub/pub_sub.py b/litellm/integrations/gcs_pubsub/pub_sub.py new file mode 100644 index 0000000000..8acab9fbbd --- /dev/null +++ b/litellm/integrations/gcs_pubsub/pub_sub.py @@ -0,0 +1,202 @@ +""" +BETA + +This is the PubSub logger for GCS PubSub, this sends LiteLLM SpendLogs Payloads to GCS PubSub. + +Users can use this instead of sending their SpendLogs to their Postgres database. +""" + +import asyncio +import json +import os +import traceback +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + from litellm.proxy._types import SpendLogsPayload +else: + SpendLogsPayload = Any + +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, + httpxSpecialProvider, +) + + +class GcsPubSubLogger(CustomBatchLogger): + def __init__( + self, + project_id: Optional[str] = None, + topic_id: Optional[str] = None, + credentials_path: Optional[str] = None, + **kwargs, + ): + """ + Initialize Google Cloud Pub/Sub publisher + + Args: + project_id (str): Google Cloud project ID + topic_id (str): Pub/Sub topic ID + credentials_path (str, optional): Path to Google Cloud credentials JSON file + """ + from litellm.proxy.utils import _premium_user_check + + _premium_user_check() + + self.async_httpx_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + + self.project_id = project_id or os.getenv("GCS_PUBSUB_PROJECT_ID") + self.topic_id = topic_id or os.getenv("GCS_PUBSUB_TOPIC_ID") + self.path_service_account_json = credentials_path or os.getenv( + "GCS_PATH_SERVICE_ACCOUNT" + ) + + if not self.project_id or not self.topic_id: + raise ValueError("Both project_id and topic_id must be provided") + + self.flush_lock = asyncio.Lock() + super().__init__(**kwargs, flush_lock=self.flush_lock) + asyncio.create_task(self.periodic_flush()) + self.log_queue: List[SpendLogsPayload] = [] + + async def construct_request_headers(self) -> Dict[str, str]: + """Construct authorization headers using Vertex AI auth""" + from litellm import vertex_chat_completion + + _auth_header, vertex_project = ( + await vertex_chat_completion._ensure_access_token_async( + credentials=self.path_service_account_json, + project_id=None, + custom_llm_provider="vertex_ai", + ) + ) + + auth_header, _ = vertex_chat_completion._get_token_and_url( + model="pub-sub", + auth_header=_auth_header, + vertex_credentials=self.path_service_account_json, + vertex_project=vertex_project, + vertex_location=None, + gemini_api_key=None, + stream=None, + custom_llm_provider="vertex_ai", + api_base=None, + ) + + headers = { + "Authorization": f"Bearer {auth_header}", + "Content-Type": "application/json", + } + return headers + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + """ + Async Log success events to GCS PubSub Topic + + - Creates a SpendLogsPayload + - Adds to batch queue + - Flushes based on CustomBatchLogger settings + + Raises: + Raises a NON Blocking verbose_logger.exception if an error occurs + """ + from litellm.proxy.spend_tracking.spend_tracking_utils import ( + get_logging_payload, + ) + from litellm.proxy.utils import _premium_user_check + + _premium_user_check() + + try: + verbose_logger.debug( + "PubSub: Logging - Enters logging function for model %s", kwargs + ) + spend_logs_payload = get_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + self.log_queue.append(spend_logs_payload) + + if len(self.log_queue) >= self.batch_size: + await self.async_send_batch() + + except Exception as e: + verbose_logger.exception( + f"PubSub Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + + async def async_send_batch(self): + """ + Sends the batch of messages to Pub/Sub + """ + try: + if not self.log_queue: + return + + verbose_logger.debug( + f"PubSub - about to flush {len(self.log_queue)} events" + ) + + for message in self.log_queue: + await self.publish_message(message) + + except Exception as e: + verbose_logger.exception( + f"PubSub Error sending batch - {str(e)}\n{traceback.format_exc()}" + ) + finally: + self.log_queue.clear() + + async def publish_message( + self, message: SpendLogsPayload + ) -> Optional[Dict[str, Any]]: + """ + Publish message to Google Cloud Pub/Sub using REST API + + Args: + message: Message to publish (dict or string) + + Returns: + dict: Published message response + """ + try: + headers = await self.construct_request_headers() + + # Prepare message data + if isinstance(message, str): + message_data = message + else: + message_data = json.dumps(message, default=str) + + # Base64 encode the message + import base64 + + encoded_message = base64.b64encode(message_data.encode("utf-8")).decode( + "utf-8" + ) + + # Construct request body + request_body = {"messages": [{"data": encoded_message}]} + + url = f"https://pubsub.googleapis.com/v1/projects/{self.project_id}/topics/{self.topic_id}:publish" + + response = await self.async_httpx_client.post( + url=url, headers=headers, json=request_body + ) + + if response.status_code not in [200, 202]: + verbose_logger.error("Pub/Sub publish error: %s", str(response.text)) + raise Exception(f"Failed to publish message: {response.text}") + + verbose_logger.debug("Pub/Sub response: %s", response.text) + return response.json() + + except Exception as e: + verbose_logger.error("Pub/Sub publish error: %s", str(e)) diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 97884e9d29..957b73c923 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -77,6 +77,7 @@ from ..integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger from ..integrations.dynamodb import DyanmoDBLogger from ..integrations.galileo import GalileoObserve from ..integrations.gcs_bucket.gcs_bucket import GCSBucketLogger +from ..integrations.gcs_pubsub.pub_sub import GcsPubSubLogger from ..integrations.greenscale import GreenscaleLogger from ..integrations.helicone import HeliconeLogger from ..integrations.humanloop import HumanloopLogger @@ -2571,6 +2572,13 @@ def _init_custom_logger_compatible_class( # noqa: PLR0915 pagerduty_logger = PagerDutyAlerting(**custom_logger_init_args) _in_memory_loggers.append(pagerduty_logger) return pagerduty_logger # type: ignore + elif logging_integration == "gcs_pubsub": + for callback in _in_memory_loggers: + if isinstance(callback, GcsPubSubLogger): + return callback + _gcs_pubsub_logger = GcsPubSubLogger() + _in_memory_loggers.append(_gcs_pubsub_logger) + return _gcs_pubsub_logger # type: ignore elif logging_integration == "humanloop": for callback in _in_memory_loggers: if isinstance(callback, HumanloopLogger): @@ -2704,6 +2712,10 @@ def get_custom_logger_compatible_class( # noqa: PLR0915 for callback in _in_memory_loggers: if isinstance(callback, PagerDutyAlerting): return callback + elif logging_integration == "gcs_pubsub": + for callback in _in_memory_loggers: + if isinstance(callback, GcsPubSubLogger): + return callback return None except Exception as e: diff --git a/litellm/proxy/management_endpoints/team_endpoints.py b/litellm/proxy/management_endpoints/team_endpoints.py index c5fb1da8db..7ead5ca0d4 100644 --- a/litellm/proxy/management_endpoints/team_endpoints.py +++ b/litellm/proxy/management_endpoints/team_endpoints.py @@ -58,7 +58,7 @@ from litellm.proxy.management_helpers.utils import ( add_new_member, management_endpoint_wrapper, ) -from litellm.proxy.utils import PrismaClient +from litellm.proxy.utils import PrismaClient, _premium_user_check router = APIRouter() @@ -1527,15 +1527,3 @@ def _set_team_metadata_field( _premium_user_check() team_data.metadata = team_data.metadata or {} team_data.metadata[field_name] = value - - -def _premium_user_check(): - from litellm.proxy.proxy_server import premium_user - - if not premium_user: - raise HTTPException( - status_code=403, - detail={ - "error": f"This feature is only available for LiteLLM Enterprise users. {CommonProxyErrors.not_premium_user.value}" - }, - ) diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index cffad1210e..2f21949792 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -15,8 +15,7 @@ model_list: litellm_settings: - callbacks: ["prometheus"] - + callbacks: ["gcs_pubsub"] guardrails: - guardrail_name: "bedrock-pre-guard" @@ -26,3 +25,4 @@ guardrails: guardrailIdentifier: gf3sc1mzinjw guardrailVersion: "DRAFT" default_on: true + diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 40080a107c..409f4bfa5c 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -16,6 +16,7 @@ from typing import TYPE_CHECKING, Any, List, Literal, Optional, Union, overload from litellm.litellm_core_utils.duration_parser import duration_in_seconds from litellm.proxy._types import ( DB_CONNECTION_ERROR_TYPES, + CommonProxyErrors, ProxyErrorTypes, ProxyException, ) @@ -2959,3 +2960,18 @@ def handle_exception_on_proxy(e: Exception) -> ProxyException: param=getattr(e, "param", "None"), code=status.HTTP_500_INTERNAL_SERVER_ERROR, ) + + +def _premium_user_check(): + """ + Raises an HTTPException if the user is not a premium user + """ + from litellm.proxy.proxy_server import premium_user + + if not premium_user: + raise HTTPException( + status_code=403, + detail={ + "error": f"This feature is only available for LiteLLM Enterprise users. {CommonProxyErrors.not_premium_user.value}" + }, + ) diff --git a/tests/logging_callback_tests/gcs_pub_sub_body/spend_logs_payload.json b/tests/logging_callback_tests/gcs_pub_sub_body/spend_logs_payload.json new file mode 100644 index 0000000000..0e78e60b76 --- /dev/null +++ b/tests/logging_callback_tests/gcs_pub_sub_body/spend_logs_payload.json @@ -0,0 +1,27 @@ +{ + "request_id": "chatcmpl-2283081b-dc89-41f6-93e6-d4f914774027", + "call_type": "acompletion", + "api_key": "", + "cache_hit": "None", + "startTime": "2025-01-24 09:20:46.847371", + "endTime": "2025-01-24 09:20:46.851954", + "completionStartTime": "2025-01-24 09:20:46.851954", + "model": "gpt-4o", + "user": "", + "team_id": "", + "metadata": "{\"additional_usage_values\": {\"completion_tokens_details\": null, \"prompt_tokens_details\": null}}", + "cache_key": "Cache OFF", + "spend": 0.00022500000000000002, + "total_tokens": 30, + "prompt_tokens": 10, + "completion_tokens": 20, + "request_tags": "[]", + "end_user": "", + "api_base": "", + "model_group": "", + "model_id": "", + "requester_ip_address": null, + "custom_llm_provider": "openai", + "messages": "{}", + "response": "{}" +} \ No newline at end of file diff --git a/tests/logging_callback_tests/test_gcs_pub_sub.py b/tests/logging_callback_tests/test_gcs_pub_sub.py new file mode 100644 index 0000000000..9bae76343f --- /dev/null +++ b/tests/logging_callback_tests/test_gcs_pub_sub.py @@ -0,0 +1,113 @@ +import io +import os +import sys + + +sys.path.insert(0, os.path.abspath("../..")) + +import asyncio +import gzip +import json +import logging +import time +from unittest.mock import AsyncMock, patch + +import pytest + +import litellm +from litellm import completion +from litellm._logging import verbose_logger +from litellm.integrations.gcs_pubsub.pub_sub import * +from datetime import datetime, timedelta +from litellm.types.utils import ( + StandardLoggingPayload, + StandardLoggingModelInformation, + StandardLoggingMetadata, + StandardLoggingHiddenParams, +) + +verbose_logger.setLevel(logging.DEBUG) + + +def assert_gcs_pubsub_request_matches_expected( + actual_request_body: dict, + expected_file_name: str, +): + """ + Helper function to compare actual GCS PubSub request body with expected JSON file. + + Args: + actual_request_body (dict): The actual request body received from the API call + expected_file_name (str): Name of the JSON file containing expected request body + """ + # Get the current directory and read the expected request body + pwd = os.path.dirname(os.path.realpath(__file__)) + expected_body_path = os.path.join(pwd, "gcs_pub_sub_body", expected_file_name) + + with open(expected_body_path, "r") as f: + expected_request_body = json.load(f) + + # Replace dynamic values in actual request body + time_fields = ["startTime", "endTime", "completionStartTime", "request_id"] + for field in time_fields: + if field in actual_request_body: + actual_request_body[field] = expected_request_body[field] + + # Assert the entire request body matches + assert ( + actual_request_body == expected_request_body + ), f"Difference in request bodies: {json.dumps(actual_request_body, indent=2)} != {json.dumps(expected_request_body, indent=2)}" + + +@pytest.mark.asyncio +async def test_async_gcs_pub_sub(): + # Create a mock for the async_httpx_client's post method + mock_post = AsyncMock() + mock_post.return_value.status_code = 202 + mock_post.return_value.text = "Accepted" + + # Initialize the GcsPubSubLogger and set the mock + gcs_pub_sub_logger = GcsPubSubLogger(flush_interval=1) + gcs_pub_sub_logger.async_httpx_client.post = mock_post + + mock_construct_request_headers = AsyncMock() + mock_construct_request_headers.return_value = {"Authorization": "Bearer mock_token"} + gcs_pub_sub_logger.construct_request_headers = mock_construct_request_headers + litellm.callbacks = [gcs_pub_sub_logger] + + # Make the completion call + response = await litellm.acompletion( + model="gpt-4o", + messages=[{"role": "user", "content": "Hello, world!"}], + mock_response="hi", + ) + + await asyncio.sleep(3) # Wait for async flush + + # Assert httpx post was called + mock_post.assert_called_once() + + # Get the actual request body from the mock + actual_url = mock_post.call_args[1]["url"] + print("sent to url", actual_url) + assert ( + actual_url + == "https://pubsub.googleapis.com/v1/projects/reliableKeys/topics/litellmDB:publish" + ) + actual_request = mock_post.call_args[1]["json"] + + # Extract and decode the base64 encoded message + encoded_message = actual_request["messages"][0]["data"] + import base64 + + decoded_message = base64.b64decode(encoded_message).decode("utf-8") + + # Parse the JSON string into a dictionary + actual_request = json.loads(decoded_message) + print("##########\n") + print(json.dumps(actual_request, indent=4)) + print("##########\n") + # Verify the request body matches expected format + assert_gcs_pubsub_request_matches_expected( + actual_request, "spend_logs_payload.json" + )