Litellm dev 11 02 2024 (#6561)

* fix(dual_cache.py): update in-memory check for redis batch get cache

Fixes latency delay for async_batch_redis_cache

* fix(service_logger.py): fix race condition causing otel service logging to be overwritten if service_callbacks set

* feat(user_api_key_auth.py): add parent otel component for auth

allows us to isolate how much latency is added by auth checks

* perf(parallel_request_limiter.py): move async_set_cache_pipeline (from max parallel request limiter) out of execution path (background task)

reduces latency by 200ms

* feat(user_api_key_auth.py): have user api key auth object return user tpm/rpm limits - reduces redis calls in downstream task (parallel_request_limiter)

Reduces latency by 400-800ms

* fix(parallel_request_limiter.py): use batch get cache to reduce user/key/team usage object calls

reduces latency by 50-100ms

* fix: fix linting error

* fix(_service_logger.py): fix import

* fix(user_api_key_auth.py): fix service logging

* fix(dual_cache.py): don't pass 'self'

* fix: fix python3.8 error

* fix: fix init]
This commit is contained in:
Krish Dholakia 2024-11-04 07:48:20 +05:30 committed by GitHub
parent e5b4a71c79
commit cc19a9f6a1
17 changed files with 303 additions and 157 deletions

View file

@ -8,8 +8,10 @@ Has 4 primary methods:
- async_get_cache
"""
import asyncio
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, List, Optional, Tuple
import litellm
@ -40,6 +42,7 @@ class LimitedSizeOrderedDict(OrderedDict):
self.popitem(last=False)
super().__setitem__(key, value)
class DualCache(BaseCache):
"""
DualCache is a cache implementation that updates both Redis and an in-memory cache simultaneously.
@ -53,7 +56,7 @@ class DualCache(BaseCache):
redis_cache: Optional[RedisCache] = None,
default_in_memory_ttl: Optional[float] = None,
default_redis_ttl: Optional[float] = None,
default_redis_batch_cache_expiry: float = 1,
default_redis_batch_cache_expiry: Optional[float] = None,
default_max_redis_batch_cache_size: int = 100,
) -> None:
super().__init__()
@ -64,7 +67,11 @@ class DualCache(BaseCache):
self.last_redis_batch_access_time = LimitedSizeOrderedDict(
max_size=default_max_redis_batch_cache_size
)
self.redis_batch_cache_expiry = default_redis_batch_cache_expiry
self.redis_batch_cache_expiry = (
default_redis_batch_cache_expiry
or litellm.default_redis_batch_cache_expiry
or 5
)
self.default_in_memory_ttl = (
default_in_memory_ttl or litellm.default_in_memory_ttl
)
@ -156,52 +163,33 @@ class DualCache(BaseCache):
local_only: bool = False,
**kwargs,
):
received_args = locals()
received_args.pop("self")
def run_in_new_loop():
"""Run the coroutine in a new event loop within this thread."""
new_loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(new_loop)
return new_loop.run_until_complete(
self.async_batch_get_cache(**received_args)
)
finally:
new_loop.close()
asyncio.set_event_loop(None)
try:
result = [None for _ in range(len(keys))]
if self.in_memory_cache is not None:
in_memory_result = self.in_memory_cache.batch_get_cache(keys, **kwargs)
# First, try to get the current event loop
_ = asyncio.get_running_loop()
# If we're already in an event loop, run in a separate thread
# to avoid nested event loop issues
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(run_in_new_loop)
return future.result()
if in_memory_result is not None:
result = in_memory_result
if None in result and self.redis_cache is not None and local_only is False:
"""
- for the none values in the result
- check the redis cache
"""
# Track the last access time for these keys
current_time = time.time()
key_tuple = tuple(keys)
# Only hit Redis if the last access time was more than 5 seconds ago
if (
key_tuple not in self.last_redis_batch_access_time
or current_time - self.last_redis_batch_access_time[key_tuple]
>= self.redis_batch_cache_expiry
):
sublist_keys = [
key for key, value in zip(keys, result) if value is None
]
# If not found in in-memory cache, try fetching from Redis
redis_result = self.redis_cache.batch_get_cache(
sublist_keys, parent_otel_span=parent_otel_span
)
if redis_result is not None:
# Update in-memory cache with the value from Redis
for key in redis_result:
self.in_memory_cache.set_cache(
key, redis_result[key], **kwargs
)
for key, value in redis_result.items():
result[keys.index(key)] = value
print_verbose(f"async batch get cache: cache result: {result}")
return result
except Exception:
verbose_logger.error(traceback.format_exc())
except RuntimeError:
# No running event loop, we can safely run in this thread
return run_in_new_loop()
async def async_get_cache(
self,
@ -244,6 +232,23 @@ class DualCache(BaseCache):
except Exception:
verbose_logger.error(traceback.format_exc())
def get_redis_batch_keys(
self,
current_time: float,
keys: List[str],
result: List[Any],
) -> List[str]:
sublist_keys = []
for key, value in zip(keys, result):
if value is None:
if (
key not in self.last_redis_batch_access_time
or current_time - self.last_redis_batch_access_time[key]
>= self.redis_batch_cache_expiry
):
sublist_keys.append(key)
return sublist_keys
async def async_batch_get_cache(
self,
keys: list,
@ -266,25 +271,16 @@ class DualCache(BaseCache):
- for the none values in the result
- check the redis cache
"""
# Track the last access time for these keys
current_time = time.time()
key_tuple = tuple(keys)
sublist_keys = self.get_redis_batch_keys(current_time, keys, result)
# Only hit Redis if the last access time was more than 5 seconds ago
if (
key_tuple not in self.last_redis_batch_access_time
or current_time - self.last_redis_batch_access_time[key_tuple]
>= self.redis_batch_cache_expiry
):
sublist_keys = [
key for key, value in zip(keys, result) if value is None
]
if len(sublist_keys) > 0:
# If not found in in-memory cache, try fetching from Redis
redis_result = await self.redis_cache.async_batch_get_cache(
sublist_keys, parent_otel_span=parent_otel_span
)
if redis_result is not None:
# Update in-memory cache with the value from Redis
for key, value in redis_result.items():
@ -292,6 +288,9 @@ class DualCache(BaseCache):
await self.in_memory_cache.async_set_cache(
key, redis_result[key], **kwargs
)
# Update the last access time for each key fetched from Redis
self.last_redis_batch_access_time[key] = current_time
for key, value in redis_result.items():
index = keys.index(key)
result[index] = value