(feat) Add support for logging to GCS Buckets with folder paths (#6675)

* use helper to log

* gcs _handle_folders_in_bucket_name

* add test_basic_gcs_logger_with_folder_in_bucket_name

* run gcs testing in logging callback tests

* include correct deps

* fix gcs bucket logging test

* fix test_basic_gcs_logger_with_folder_in_bucket_name

* fix test_get_gcs_logging_config_without_service_account

* fix test gcs bucket

* remove unused file
This commit is contained in:
Ishaan Jaff 2024-11-08 19:24:18 -08:00 committed by GitHub
parent 3d1c305401
commit 61026e189d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 183 additions and 27 deletions

View file

@ -645,6 +645,8 @@ jobs:
pip install "pytest-cov==5.0.0"
pip install "pytest-asyncio==0.21.1"
pip install "respx==0.21.1"
pip install "google-generativeai==0.3.2"
pip install "google-cloud-aiplatform==1.43.0"
# Run pytest and generate JUnit XML report
- run:
name: Run tests

View file

@ -76,28 +76,18 @@ class GCSBucketLogger(GCSBucketBase):
if logging_payload is None:
raise ValueError("standard_logging_object not found in kwargs")
json_logged_payload = json.dumps(logging_payload, default=str)
# Get the current date
current_date = datetime.now().strftime("%Y-%m-%d")
# Modify the object_name to include the date-based folder
object_name = f"{current_date}/{response_obj['id']}"
try:
response = await self.async_httpx_client.post(
await self._log_json_data_on_gcs(
headers=headers,
url=f"https://storage.googleapis.com/upload/storage/v1/b/{bucket_name}/o?uploadType=media&name={object_name}",
data=json_logged_payload,
bucket_name=bucket_name,
object_name=object_name,
logging_payload=logging_payload,
)
except httpx.HTTPStatusError as e:
raise Exception(f"GCS Bucket logging error: {e.response.text}")
if response.status_code != 200:
verbose_logger.error("GCS Bucket logging error: %s", str(response.text))
verbose_logger.debug("GCS Bucket response %s", response)
verbose_logger.debug("GCS Bucket status code %s", response.status_code)
verbose_logger.debug("GCS Bucket response.text %s", response.text)
except Exception as e:
verbose_logger.exception(f"GCS Bucket logging error: {str(e)}")
@ -134,8 +124,6 @@ class GCSBucketLogger(GCSBucketBase):
_litellm_params = kwargs.get("litellm_params") or {}
metadata = _litellm_params.get("metadata") or {}
json_logged_payload = json.dumps(logging_payload, default=str)
# Get the current date
current_date = datetime.now().strftime("%Y-%m-%d")
@ -145,6 +133,54 @@ class GCSBucketLogger(GCSBucketBase):
if "gcs_log_id" in metadata:
object_name = metadata["gcs_log_id"]
await self._log_json_data_on_gcs(
headers=headers,
bucket_name=bucket_name,
object_name=object_name,
logging_payload=logging_payload,
)
except Exception as e:
verbose_logger.exception(f"GCS Bucket logging error: {str(e)}")
def _handle_folders_in_bucket_name(
self,
bucket_name: str,
object_name: str,
) -> Tuple[str, str]:
"""
Handles when the user passes a bucket name with a folder postfix
Example:
- Bucket name: "my-bucket/my-folder/dev"
- Object name: "my-object"
- Returns: bucket_name="my-bucket", object_name="my-folder/dev/my-object"
"""
if "/" in bucket_name:
bucket_name, prefix = bucket_name.split("/", 1)
object_name = f"{prefix}/{object_name}"
return bucket_name, object_name
return bucket_name, object_name
async def _log_json_data_on_gcs(
self,
headers: Dict[str, str],
bucket_name: str,
object_name: str,
logging_payload: StandardLoggingPayload,
):
"""
Helper function to make POST request to GCS Bucket in the specified bucket.
"""
json_logged_payload = json.dumps(logging_payload, default=str)
bucket_name, object_name = self._handle_folders_in_bucket_name(
bucket_name=bucket_name,
object_name=object_name,
)
response = await self.async_httpx_client.post(
headers=headers,
url=f"https://storage.googleapis.com/upload/storage/v1/b/{bucket_name}/o?uploadType=media&name={object_name}",
@ -157,8 +193,6 @@ class GCSBucketLogger(GCSBucketBase):
verbose_logger.debug("GCS Bucket response %s", response)
verbose_logger.debug("GCS Bucket status code %s", response.status_code)
verbose_logger.debug("GCS Bucket response.text %s", response.text)
except Exception as e:
verbose_logger.exception(f"GCS Bucket logging error: {str(e)}")
async def get_gcs_logging_config(
self, kwargs: Optional[Dict[str, Any]] = {}
@ -267,6 +301,11 @@ class GCSBucketLogger(GCSBucketBase):
service_account_json=gcs_logging_config["path_service_account"],
)
bucket_name = gcs_logging_config["bucket_name"]
bucket_name, object_name = self._handle_folders_in_bucket_name(
bucket_name=bucket_name,
object_name=object_name,
)
url = f"https://storage.googleapis.com/storage/v1/b/{bucket_name}/o/{object_name}?alt=media"
# Send the GET request to download the object
@ -302,6 +341,11 @@ class GCSBucketLogger(GCSBucketBase):
service_account_json=gcs_logging_config["path_service_account"],
)
bucket_name = gcs_logging_config["bucket_name"]
bucket_name, object_name = self._handle_folders_in_bucket_name(
bucket_name=bucket_name,
object_name=object_name,
)
url = f"https://storage.googleapis.com/storage/v1/b/{bucket_name}/o/{object_name}"
# Send the DELETE request to delete the object

View file

@ -528,6 +528,7 @@ async def test_get_gcs_logging_config_without_service_account():
1. Key based logging without a service account
2. Default Callback without a service account
"""
load_vertex_ai_credentials()
_old_gcs_bucket_name = os.environ.get("GCS_BUCKET_NAME")
os.environ.pop("GCS_BUCKET_NAME")
@ -572,3 +573,112 @@ async def test_get_gcs_logging_config_without_service_account():
if _old_gcs_service_acct is not None:
os.environ["GCS_PATH_SERVICE_ACCOUNT"] = _old_gcs_service_acct
@pytest.mark.asyncio
async def test_basic_gcs_logger_with_folder_in_bucket_name():
load_vertex_ai_credentials()
gcs_logger = GCSBucketLogger()
bucket_name = "litellm-testing-bucket/test-folder-logs"
old_bucket_name = os.environ.get("GCS_BUCKET_NAME")
os.environ["GCS_BUCKET_NAME"] = bucket_name
print("GCSBucketLogger", gcs_logger)
litellm.callbacks = [gcs_logger]
response = await litellm.acompletion(
model="gpt-3.5-turbo",
temperature=0.7,
messages=[{"role": "user", "content": "This is a test"}],
max_tokens=10,
user="ishaan-2",
mock_response="Hi!",
metadata={
"tags": ["model-anthropic-claude-v2.1", "app-ishaan-prod"],
"user_api_key": "88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",
"user_api_key_alias": None,
"user_api_end_user_max_budget": None,
"litellm_api_version": "0.0.0",
"global_max_parallel_requests": None,
"user_api_key_user_id": "116544810872468347480",
"user_api_key_org_id": None,
"user_api_key_team_id": None,
"user_api_key_team_alias": None,
"user_api_key_metadata": {},
"requester_ip_address": "127.0.0.1",
"requester_metadata": {"foo": "bar"},
"spend_logs_metadata": {"hello": "world"},
"headers": {
"content-type": "application/json",
"user-agent": "PostmanRuntime/7.32.3",
"accept": "*/*",
"postman-token": "92300061-eeaa-423b-a420-0b44896ecdc4",
"host": "localhost:4000",
"accept-encoding": "gzip, deflate, br",
"connection": "keep-alive",
"content-length": "163",
},
"endpoint": "http://localhost:4000/chat/completions",
"model_group": "gpt-3.5-turbo",
"deployment": "azure/chatgpt-v-2",
"model_info": {
"id": "4bad40a1eb6bebd1682800f16f44b9f06c52a6703444c99c7f9f32e9de3693b4",
"db_model": False,
},
"api_base": "https://openai-gpt-4-test-v-1.openai.azure.com/",
"caching_groups": None,
"raw_request": "\n\nPOST Request Sent from LiteLLM:\ncurl -X POST \\\nhttps://openai-gpt-4-test-v-1.openai.azure.com//openai/ \\\n-H 'Authorization: *****' \\\n-d '{'model': 'chatgpt-v-2', 'messages': [{'role': 'system', 'content': 'you are a helpful assistant.\\n'}, {'role': 'user', 'content': 'bom dia'}], 'stream': False, 'max_tokens': 10, 'user': '116544810872468347480', 'extra_body': {}}'\n",
},
)
print("response", response)
await asyncio.sleep(5)
# Get the current date
# Get the current date
current_date = datetime.now().strftime("%Y-%m-%d")
# Modify the object_name to include the date-based folder
object_name = f"{current_date}%2F{response.id}"
print("object_name", object_name)
# Check if object landed on GCS
object_from_gcs = await gcs_logger.download_gcs_object(object_name=object_name)
print("object from gcs=", object_from_gcs)
# convert object_from_gcs from bytes to DICT
parsed_data = json.loads(object_from_gcs)
print("object_from_gcs as dict", parsed_data)
print("type of object_from_gcs", type(parsed_data))
gcs_payload = StandardLoggingPayload(**parsed_data)
print("gcs_payload", gcs_payload)
assert gcs_payload["model"] == "gpt-3.5-turbo"
assert gcs_payload["messages"] == [{"role": "user", "content": "This is a test"}]
assert gcs_payload["response"]["choices"][0]["message"]["content"] == "Hi!"
assert gcs_payload["response_cost"] > 0.0
assert gcs_payload["status"] == "success"
assert (
gcs_payload["metadata"]["user_api_key_hash"]
== "88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b"
)
assert gcs_payload["metadata"]["user_api_key_user_id"] == "116544810872468347480"
assert gcs_payload["metadata"]["requester_metadata"] == {"foo": "bar"}
# Delete Object from GCS
print("deleting object from GCS")
await gcs_logger.delete_gcs_object(object_name=object_name)
# clean up
if old_bucket_name is not None:
os.environ["GCS_BUCKET_NAME"] = old_bucket_name