diff --git a/.circleci/config.yml b/.circleci/config.yml index 7a742afe0..4bb232421 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -645,6 +645,8 @@ jobs: pip install "pytest-cov==5.0.0" pip install "pytest-asyncio==0.21.1" pip install "respx==0.21.1" + pip install "google-generativeai==0.3.2" + pip install "google-cloud-aiplatform==1.43.0" # Run pytest and generate JUnit XML report - run: name: Run tests diff --git a/litellm/integrations/gcs_bucket/gcs_bucket.py b/litellm/integrations/gcs_bucket/gcs_bucket.py index 111730d1f..f7f36c124 100644 --- a/litellm/integrations/gcs_bucket/gcs_bucket.py +++ b/litellm/integrations/gcs_bucket/gcs_bucket.py @@ -76,28 +76,18 @@ class GCSBucketLogger(GCSBucketBase): if logging_payload is None: raise ValueError("standard_logging_object not found in kwargs") - json_logged_payload = json.dumps(logging_payload, default=str) - # Get the current date current_date = datetime.now().strftime("%Y-%m-%d") # Modify the object_name to include the date-based folder object_name = f"{current_date}/{response_obj['id']}" - try: - response = await self.async_httpx_client.post( - headers=headers, - url=f"https://storage.googleapis.com/upload/storage/v1/b/{bucket_name}/o?uploadType=media&name={object_name}", - data=json_logged_payload, - ) - except httpx.HTTPStatusError as e: - raise Exception(f"GCS Bucket logging error: {e.response.text}") - if response.status_code != 200: - verbose_logger.error("GCS Bucket logging error: %s", str(response.text)) - - verbose_logger.debug("GCS Bucket response %s", response) - verbose_logger.debug("GCS Bucket status code %s", response.status_code) - verbose_logger.debug("GCS Bucket response.text %s", response.text) + await self._log_json_data_on_gcs( + headers=headers, + bucket_name=bucket_name, + object_name=object_name, + logging_payload=logging_payload, + ) except Exception as e: verbose_logger.exception(f"GCS Bucket logging error: {str(e)}") @@ -134,8 +124,6 @@ class GCSBucketLogger(GCSBucketBase): _litellm_params = kwargs.get("litellm_params") or {} metadata = _litellm_params.get("metadata") or {} - json_logged_payload = json.dumps(logging_payload, default=str) - # Get the current date current_date = datetime.now().strftime("%Y-%m-%d") @@ -145,21 +133,67 @@ class GCSBucketLogger(GCSBucketBase): if "gcs_log_id" in metadata: object_name = metadata["gcs_log_id"] - response = await self.async_httpx_client.post( + await self._log_json_data_on_gcs( headers=headers, - url=f"https://storage.googleapis.com/upload/storage/v1/b/{bucket_name}/o?uploadType=media&name={object_name}", - data=json_logged_payload, + bucket_name=bucket_name, + object_name=object_name, + logging_payload=logging_payload, ) - if response.status_code != 200: - verbose_logger.error("GCS Bucket logging error: %s", str(response.text)) - - verbose_logger.debug("GCS Bucket response %s", response) - verbose_logger.debug("GCS Bucket status code %s", response.status_code) - verbose_logger.debug("GCS Bucket response.text %s", response.text) except Exception as e: verbose_logger.exception(f"GCS Bucket logging error: {str(e)}") + def _handle_folders_in_bucket_name( + self, + bucket_name: str, + object_name: str, + ) -> Tuple[str, str]: + """ + Handles when the user passes a bucket name with a folder postfix + + + Example: + - Bucket name: "my-bucket/my-folder/dev" + - Object name: "my-object" + - Returns: bucket_name="my-bucket", object_name="my-folder/dev/my-object" + + """ + if "/" in bucket_name: + bucket_name, prefix = bucket_name.split("/", 1) + object_name = f"{prefix}/{object_name}" + return bucket_name, object_name + return bucket_name, object_name + + async def _log_json_data_on_gcs( + self, + headers: Dict[str, str], + bucket_name: str, + object_name: str, + logging_payload: StandardLoggingPayload, + ): + """ + Helper function to make POST request to GCS Bucket in the specified bucket. + """ + json_logged_payload = json.dumps(logging_payload, default=str) + + bucket_name, object_name = self._handle_folders_in_bucket_name( + bucket_name=bucket_name, + object_name=object_name, + ) + + response = await self.async_httpx_client.post( + headers=headers, + url=f"https://storage.googleapis.com/upload/storage/v1/b/{bucket_name}/o?uploadType=media&name={object_name}", + data=json_logged_payload, + ) + + if response.status_code != 200: + verbose_logger.error("GCS Bucket logging error: %s", str(response.text)) + + verbose_logger.debug("GCS Bucket response %s", response) + verbose_logger.debug("GCS Bucket status code %s", response.status_code) + verbose_logger.debug("GCS Bucket response.text %s", response.text) + async def get_gcs_logging_config( self, kwargs: Optional[Dict[str, Any]] = {} ) -> GCSLoggingConfig: @@ -267,6 +301,11 @@ class GCSBucketLogger(GCSBucketBase): service_account_json=gcs_logging_config["path_service_account"], ) bucket_name = gcs_logging_config["bucket_name"] + bucket_name, object_name = self._handle_folders_in_bucket_name( + bucket_name=bucket_name, + object_name=object_name, + ) + url = f"https://storage.googleapis.com/storage/v1/b/{bucket_name}/o/{object_name}?alt=media" # Send the GET request to download the object @@ -302,6 +341,11 @@ class GCSBucketLogger(GCSBucketBase): service_account_json=gcs_logging_config["path_service_account"], ) bucket_name = gcs_logging_config["bucket_name"] + bucket_name, object_name = self._handle_folders_in_bucket_name( + bucket_name=bucket_name, + object_name=object_name, + ) + url = f"https://storage.googleapis.com/storage/v1/b/{bucket_name}/o/{object_name}" # Send the DELETE request to delete the object diff --git a/tests/local_testing/test_gcs_bucket.py b/tests/local_testing/test_gcs_bucket.py index fed287bd0..a01e839fa 100644 --- a/tests/local_testing/test_gcs_bucket.py +++ b/tests/local_testing/test_gcs_bucket.py @@ -528,6 +528,7 @@ async def test_get_gcs_logging_config_without_service_account(): 1. Key based logging without a service account 2. Default Callback without a service account """ + load_vertex_ai_credentials() _old_gcs_bucket_name = os.environ.get("GCS_BUCKET_NAME") os.environ.pop("GCS_BUCKET_NAME") @@ -572,3 +573,112 @@ async def test_get_gcs_logging_config_without_service_account(): if _old_gcs_service_acct is not None: os.environ["GCS_PATH_SERVICE_ACCOUNT"] = _old_gcs_service_acct + + +@pytest.mark.asyncio +async def test_basic_gcs_logger_with_folder_in_bucket_name(): + load_vertex_ai_credentials() + gcs_logger = GCSBucketLogger() + + bucket_name = "litellm-testing-bucket/test-folder-logs" + + old_bucket_name = os.environ.get("GCS_BUCKET_NAME") + os.environ["GCS_BUCKET_NAME"] = bucket_name + print("GCSBucketLogger", gcs_logger) + + litellm.callbacks = [gcs_logger] + response = await litellm.acompletion( + model="gpt-3.5-turbo", + temperature=0.7, + messages=[{"role": "user", "content": "This is a test"}], + max_tokens=10, + user="ishaan-2", + mock_response="Hi!", + metadata={ + "tags": ["model-anthropic-claude-v2.1", "app-ishaan-prod"], + "user_api_key": "88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b", + "user_api_key_alias": None, + "user_api_end_user_max_budget": None, + "litellm_api_version": "0.0.0", + "global_max_parallel_requests": None, + "user_api_key_user_id": "116544810872468347480", + "user_api_key_org_id": None, + "user_api_key_team_id": None, + "user_api_key_team_alias": None, + "user_api_key_metadata": {}, + "requester_ip_address": "127.0.0.1", + "requester_metadata": {"foo": "bar"}, + "spend_logs_metadata": {"hello": "world"}, + "headers": { + "content-type": "application/json", + "user-agent": "PostmanRuntime/7.32.3", + "accept": "*/*", + "postman-token": "92300061-eeaa-423b-a420-0b44896ecdc4", + "host": "localhost:4000", + "accept-encoding": "gzip, deflate, br", + "connection": "keep-alive", + "content-length": "163", + }, + "endpoint": "http://localhost:4000/chat/completions", + "model_group": "gpt-3.5-turbo", + "deployment": "azure/chatgpt-v-2", + "model_info": { + "id": "4bad40a1eb6bebd1682800f16f44b9f06c52a6703444c99c7f9f32e9de3693b4", + "db_model": False, + }, + "api_base": "https://openai-gpt-4-test-v-1.openai.azure.com/", + "caching_groups": None, + "raw_request": "\n\nPOST Request Sent from LiteLLM:\ncurl -X POST \\\nhttps://openai-gpt-4-test-v-1.openai.azure.com//openai/ \\\n-H 'Authorization: *****' \\\n-d '{'model': 'chatgpt-v-2', 'messages': [{'role': 'system', 'content': 'you are a helpful assistant.\\n'}, {'role': 'user', 'content': 'bom dia'}], 'stream': False, 'max_tokens': 10, 'user': '116544810872468347480', 'extra_body': {}}'\n", + }, + ) + + print("response", response) + + await asyncio.sleep(5) + + # Get the current date + # Get the current date + current_date = datetime.now().strftime("%Y-%m-%d") + + # Modify the object_name to include the date-based folder + object_name = f"{current_date}%2F{response.id}" + + print("object_name", object_name) + + # Check if object landed on GCS + object_from_gcs = await gcs_logger.download_gcs_object(object_name=object_name) + print("object from gcs=", object_from_gcs) + # convert object_from_gcs from bytes to DICT + parsed_data = json.loads(object_from_gcs) + print("object_from_gcs as dict", parsed_data) + + print("type of object_from_gcs", type(parsed_data)) + + gcs_payload = StandardLoggingPayload(**parsed_data) + + print("gcs_payload", gcs_payload) + + assert gcs_payload["model"] == "gpt-3.5-turbo" + assert gcs_payload["messages"] == [{"role": "user", "content": "This is a test"}] + + assert gcs_payload["response"]["choices"][0]["message"]["content"] == "Hi!" + + assert gcs_payload["response_cost"] > 0.0 + + assert gcs_payload["status"] == "success" + + assert ( + gcs_payload["metadata"]["user_api_key_hash"] + == "88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b" + ) + assert gcs_payload["metadata"]["user_api_key_user_id"] == "116544810872468347480" + + assert gcs_payload["metadata"]["requester_metadata"] == {"foo": "bar"} + + # Delete Object from GCS + print("deleting object from GCS") + await gcs_logger.delete_gcs_object(object_name=object_name) + + # clean up + if old_bucket_name is not None: + os.environ["GCS_BUCKET_NAME"] = old_bucket_name