forked from phoenix/litellm-mirror
(feat) add s3 Bucket as Cache
This commit is contained in:
parent
14e501845f
commit
00364da993
1 changed files with 96 additions and 3 deletions
|
@ -115,6 +115,97 @@ class RedisCache(BaseCache):
|
|||
self.redis_client.flushall()
|
||||
|
||||
|
||||
class S3Cache(BaseCache):
|
||||
def __init__(
|
||||
self,
|
||||
bucket_name=None,
|
||||
aws_access_key_id=None,
|
||||
aws_secret_access_key=None,
|
||||
region_name=None,
|
||||
endpoint_url=None,
|
||||
**kwargs,
|
||||
):
|
||||
import boto3
|
||||
|
||||
self.bucket_name = "cache-bucket-litellm"
|
||||
self.region_name = region_name
|
||||
self.endpoint_url = endpoint_url # Add the endpoint_url parameter
|
||||
|
||||
# Create an S3 client with custom endpoint URL
|
||||
self.s3_client = boto3.client("s3", region_name="us-west-2", **kwargs)
|
||||
|
||||
def set_cache(self, key, value, **kwargs):
|
||||
try:
|
||||
print_verbose(f"LiteLLM SET Cache - S3. Key={key}. Value={value}")
|
||||
ttl = kwargs.get("ttl", None)
|
||||
# Convert value to JSON before storing in S3
|
||||
serialized_value = str(value)
|
||||
if ttl is not None:
|
||||
import datetime
|
||||
|
||||
# Calculate expiration time
|
||||
expiration_time = datetime.datetime.now() + ttl
|
||||
|
||||
# Upload the data to S3 with the calculated expiration time
|
||||
self.s3_client.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=key,
|
||||
Body=serialized_value,
|
||||
Expires=expiration_time,
|
||||
)
|
||||
else:
|
||||
# Upload the data to S3 without specifying Expires
|
||||
self.s3_client.put_object(
|
||||
Bucket=self.bucket_name, Key=key, Body=serialized_value
|
||||
)
|
||||
except Exception as e:
|
||||
# NON blocking - notify users S3 is throwing an exception
|
||||
print_verbose(f"S3 Caching: set_cache() - Got exception from S3: {e}")
|
||||
|
||||
def get_cache(self, key, **kwargs):
|
||||
import boto3, botocore
|
||||
|
||||
try:
|
||||
print_verbose(f"Get S3 Cache: key: {key}")
|
||||
# Download the data from S3
|
||||
cached_response = self.s3_client.get_object(
|
||||
Bucket=self.bucket_name, Key=key
|
||||
)
|
||||
|
||||
if cached_response != None:
|
||||
# cached_response is in `b{} convert it to ModelResponse
|
||||
cached_response = (
|
||||
cached_response["Body"].read().decode("utf-8")
|
||||
) # Convert bytes to string
|
||||
try:
|
||||
cached_response = json.loads(
|
||||
cached_response
|
||||
) # Convert string to dictionary
|
||||
except Exception as e:
|
||||
cached_response = ast.literal_eval(cached_response)
|
||||
if type(cached_response) is not dict:
|
||||
cached_response = dict(cached_response)
|
||||
print_verbose(
|
||||
f"Got S3 Cache: key: {key}, cached_response {cached_response}. Type Response {type(cached_response)}"
|
||||
)
|
||||
|
||||
return cached_response
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response["Error"]["Code"] == "NoSuchKey":
|
||||
print_verbose(
|
||||
f"S3 Cache: The specified key '{key}' does not exist in the S3 bucket."
|
||||
)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
# NON blocking - notify users S3 is throwing an exception
|
||||
traceback.print_exc()
|
||||
print_verbose(f"S3 Caching: get_cache() - Got exception from S3: {e}")
|
||||
|
||||
def flush_cache(self):
|
||||
pass
|
||||
|
||||
|
||||
class DualCache(BaseCache):
|
||||
"""
|
||||
This updates both Redis and an in-memory cache simultaneously.
|
||||
|
@ -183,7 +274,7 @@ class DualCache(BaseCache):
|
|||
class Cache:
|
||||
def __init__(
|
||||
self,
|
||||
type: Optional[Literal["local", "redis"]] = "local",
|
||||
type: Optional[Literal["local", "redis", "s3"]] = "local",
|
||||
host: Optional[str] = None,
|
||||
port: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
|
@ -213,6 +304,8 @@ class Cache:
|
|||
self.cache: BaseCache = RedisCache(host, port, password, **kwargs)
|
||||
if type == "local":
|
||||
self.cache = InMemoryCache()
|
||||
if type == "s3":
|
||||
self.cache = S3Cache()
|
||||
if "cache" not in litellm.input_callback:
|
||||
litellm.input_callback.append("cache")
|
||||
if "cache" not in litellm.success_callback:
|
||||
|
@ -376,7 +469,7 @@ class Cache:
|
|||
return cached_response
|
||||
return cached_result
|
||||
except Exception as e:
|
||||
logging.debug(f"An exception occurred: {traceback.format_exc()}")
|
||||
print_verbose(f"An exception occurred: {traceback.format_exc()}")
|
||||
return None
|
||||
|
||||
def add_cache(self, result, *args, **kwargs):
|
||||
|
@ -418,7 +511,7 @@ class Cache:
|
|||
|
||||
|
||||
def enable_cache(
|
||||
type: Optional[Literal["local", "redis"]] = "local",
|
||||
type: Optional[Literal["local", "redis", "s3"]] = "local",
|
||||
host: Optional[str] = None,
|
||||
port: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue