diff --git a/litellm/integrations/gcs_bucket.py b/litellm/integrations/gcs_bucket.py index 14aa3a9e0..4c04a6879 100644 --- a/litellm/integrations/gcs_bucket.py +++ b/litellm/integrations/gcs_bucket.py @@ -142,3 +142,65 @@ class GCSBucketLogger(CustomLogger): spend_logs_payload["output"] = output return spend_logs_payload + + async def download_gcs_object(self, object_name): + """ + Download an object from GCS. + + https://cloud.google.com/storage/docs/downloading-objects#download-object-json + """ + try: + headers = await self.construct_request_headers() + url = f"https://storage.googleapis.com/storage/v1/b/{self.BUCKET_NAME}/o/{object_name}?alt=media" + + # Send the GET request to download the object + response = await self.async_httpx_client.get(url=url, headers=headers) + + if response.status_code != 200: + verbose_logger.error( + "GCS object download error: %s", str(response.text) + ) + return None + + verbose_logger.debug( + "GCS object download response status code: %s", response.status_code + ) + + # Return the content of the downloaded object + return response.content + + except Exception as e: + verbose_logger.error("GCS object download error: %s", str(e)) + return None + + async def delete_gcs_object(self, object_name): + """ + Delete an object from GCS. + """ + try: + headers = await self.construct_request_headers() + url = f"https://storage.googleapis.com/storage/v1/b/{self.BUCKET_NAME}/o/{object_name}" + + # Send the DELETE request to delete the object + response = await self.async_httpx_client.delete(url=url, headers=headers) + + if (response.status_code != 200) or (response.status_code != 204): + verbose_logger.error( + "GCS object delete error: %s, status code: %s", + str(response.text), + response.status_code, + ) + return None + + verbose_logger.debug( + "GCS object delete response status code: %s, response: %s", + response.status_code, + response.text, + ) + + # Return the content of the downloaded object + return response.text + + except Exception as e: + verbose_logger.error("GCS object download error: %s", str(e)) + return None diff --git a/litellm/llms/custom_httpx/http_handler.py b/litellm/llms/custom_httpx/http_handler.py index e3a0a4f1c..1828a92d2 100644 --- a/litellm/llms/custom_httpx/http_handler.py +++ b/litellm/llms/custom_httpx/http_handler.py @@ -129,6 +129,50 @@ class AsyncHTTPHandler: except Exception as e: raise e + async def delete( + self, + url: str, + data: Optional[Union[dict, str]] = None, # type: ignore + json: Optional[dict] = None, + params: Optional[dict] = None, + headers: Optional[dict] = None, + timeout: Optional[Union[float, httpx.Timeout]] = None, + stream: bool = False, + ): + try: + if timeout is None: + timeout = self.timeout + req = self.client.build_request( + "DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore + ) + response = await self.client.send(req, stream=stream) + response.raise_for_status() + return response + except (httpx.RemoteProtocolError, httpx.ConnectError): + # Retry the request with a new session if there is a connection error + new_client = self.create_client(timeout=timeout, concurrent_limit=1) + try: + return await self.single_connection_post_request( + url=url, + client=new_client, + data=data, + json=json, + params=params, + headers=headers, + stream=stream, + ) + finally: + await new_client.aclose() + except httpx.HTTPStatusError as e: + setattr(e, "status_code", e.response.status_code) + if stream is True: + setattr(e, "message", await e.response.aread()) + else: + setattr(e, "message", e.response.text) + raise e + except Exception as e: + raise e + async def single_connection_post_request( self, url: str, diff --git a/litellm/tests/test_gcs_bucket.py b/litellm/tests/test_gcs_bucket.py index a85ef13e0..53ef96fda 100644 --- a/litellm/tests/test_gcs_bucket.py +++ b/litellm/tests/test_gcs_bucket.py @@ -80,3 +80,17 @@ async def test_basic_gcs_logger(): print("response", response) await asyncio.sleep(5) + + # Check if object landed on GCS + object_from_gcs = await gcs_logger.download_gcs_object(object_name=response.id) + # convert object_from_gcs from bytes to DICT + object_from_gcs = json.loads(object_from_gcs) + print("object_from_gcs", object_from_gcs) + + assert object_from_gcs["request_id"] == response.id + assert object_from_gcs["call_type"] == "acompletion" + assert object_from_gcs["model"] == "gpt-3.5-turbo" + + # Delete Object from GCS + print("deleting object from GCS") + await gcs_logger.delete_gcs_object(object_name=response.id)