forked from phoenix/litellm-mirror
delete object from gcs
This commit is contained in:
parent
877836afcf
commit
5962267bf5
3 changed files with 120 additions and 0 deletions
|
@ -142,3 +142,65 @@ class GCSBucketLogger(CustomLogger):
|
||||||
|
|
||||||
spend_logs_payload["output"] = output
|
spend_logs_payload["output"] = output
|
||||||
return spend_logs_payload
|
return spend_logs_payload
|
||||||
|
|
||||||
|
async def download_gcs_object(self, object_name):
|
||||||
|
"""
|
||||||
|
Download an object from GCS.
|
||||||
|
|
||||||
|
https://cloud.google.com/storage/docs/downloading-objects#download-object-json
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
headers = await self.construct_request_headers()
|
||||||
|
url = f"https://storage.googleapis.com/storage/v1/b/{self.BUCKET_NAME}/o/{object_name}?alt=media"
|
||||||
|
|
||||||
|
# Send the GET request to download the object
|
||||||
|
response = await self.async_httpx_client.get(url=url, headers=headers)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
verbose_logger.error(
|
||||||
|
"GCS object download error: %s", str(response.text)
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
verbose_logger.debug(
|
||||||
|
"GCS object download response status code: %s", response.status_code
|
||||||
|
)
|
||||||
|
|
||||||
|
# Return the content of the downloaded object
|
||||||
|
return response.content
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
verbose_logger.error("GCS object download error: %s", str(e))
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def delete_gcs_object(self, object_name):
|
||||||
|
"""
|
||||||
|
Delete an object from GCS.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
headers = await self.construct_request_headers()
|
||||||
|
url = f"https://storage.googleapis.com/storage/v1/b/{self.BUCKET_NAME}/o/{object_name}"
|
||||||
|
|
||||||
|
# Send the DELETE request to delete the object
|
||||||
|
response = await self.async_httpx_client.delete(url=url, headers=headers)
|
||||||
|
|
||||||
|
if (response.status_code != 200) or (response.status_code != 204):
|
||||||
|
verbose_logger.error(
|
||||||
|
"GCS object delete error: %s, status code: %s",
|
||||||
|
str(response.text),
|
||||||
|
response.status_code,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
verbose_logger.debug(
|
||||||
|
"GCS object delete response status code: %s, response: %s",
|
||||||
|
response.status_code,
|
||||||
|
response.text,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Return the content of the downloaded object
|
||||||
|
return response.text
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
verbose_logger.error("GCS object download error: %s", str(e))
|
||||||
|
return None
|
||||||
|
|
|
@ -129,6 +129,50 @@ class AsyncHTTPHandler:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
async def delete(
|
||||||
|
self,
|
||||||
|
url: str,
|
||||||
|
data: Optional[Union[dict, str]] = None, # type: ignore
|
||||||
|
json: Optional[dict] = None,
|
||||||
|
params: Optional[dict] = None,
|
||||||
|
headers: Optional[dict] = None,
|
||||||
|
timeout: Optional[Union[float, httpx.Timeout]] = None,
|
||||||
|
stream: bool = False,
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
if timeout is None:
|
||||||
|
timeout = self.timeout
|
||||||
|
req = self.client.build_request(
|
||||||
|
"DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore
|
||||||
|
)
|
||||||
|
response = await self.client.send(req, stream=stream)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response
|
||||||
|
except (httpx.RemoteProtocolError, httpx.ConnectError):
|
||||||
|
# Retry the request with a new session if there is a connection error
|
||||||
|
new_client = self.create_client(timeout=timeout, concurrent_limit=1)
|
||||||
|
try:
|
||||||
|
return await self.single_connection_post_request(
|
||||||
|
url=url,
|
||||||
|
client=new_client,
|
||||||
|
data=data,
|
||||||
|
json=json,
|
||||||
|
params=params,
|
||||||
|
headers=headers,
|
||||||
|
stream=stream,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await new_client.aclose()
|
||||||
|
except httpx.HTTPStatusError as e:
|
||||||
|
setattr(e, "status_code", e.response.status_code)
|
||||||
|
if stream is True:
|
||||||
|
setattr(e, "message", await e.response.aread())
|
||||||
|
else:
|
||||||
|
setattr(e, "message", e.response.text)
|
||||||
|
raise e
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
|
||||||
async def single_connection_post_request(
|
async def single_connection_post_request(
|
||||||
self,
|
self,
|
||||||
url: str,
|
url: str,
|
||||||
|
|
|
@ -80,3 +80,17 @@ async def test_basic_gcs_logger():
|
||||||
print("response", response)
|
print("response", response)
|
||||||
|
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
# Check if object landed on GCS
|
||||||
|
object_from_gcs = await gcs_logger.download_gcs_object(object_name=response.id)
|
||||||
|
# convert object_from_gcs from bytes to DICT
|
||||||
|
object_from_gcs = json.loads(object_from_gcs)
|
||||||
|
print("object_from_gcs", object_from_gcs)
|
||||||
|
|
||||||
|
assert object_from_gcs["request_id"] == response.id
|
||||||
|
assert object_from_gcs["call_type"] == "acompletion"
|
||||||
|
assert object_from_gcs["model"] == "gpt-3.5-turbo"
|
||||||
|
|
||||||
|
# Delete Object from GCS
|
||||||
|
print("deleting object from GCS")
|
||||||
|
await gcs_logger.delete_gcs_object(object_name=response.id)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue