From 9225d317760734585fa8abfa52f676594aff22f6 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 7 Sep 2024 07:41:49 -0700 Subject: [PATCH 1/6] allow setting REDIS_CLUSTER_NODES in .env --- litellm/_redis.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/litellm/_redis.py b/litellm/_redis.py index 23f82ed1a..1c300afcb 100644 --- a/litellm/_redis.py +++ b/litellm/_redis.py @@ -8,6 +8,7 @@ # Thank you users! We ❤️ you! - Krrish & Ishaan import inspect +import json # s/o [@Frank Colson](https://www.linkedin.com/in/frank-colson-422b9b183/) for this redis implementation import os @@ -142,7 +143,14 @@ def get_redis_client(**env_overrides): return redis.Redis.from_url(**url_kwargs) - if "startup_nodes" in redis_kwargs: + if ( + "startup_nodes" in redis_kwargs + or litellm.get_secret("REDIS_CLUSTER_NODES") is not None + ): + _redis_cluster_nodes_in_env = litellm.get_secret("REDIS_CLUSTER_NODES") + if _redis_cluster_nodes_in_env is not None: + redis_kwargs["startup_nodes"] = json.loads(_redis_cluster_nodes_in_env) + from redis.cluster import ClusterNode args = _get_redis_cluster_kwargs() From 5c4f3a9a34cc97c5a71bdbd4f8fded018feaaf13 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 7 Sep 2024 08:54:40 -0700 Subject: [PATCH 2/6] fix allow using .env vars for redis cluster --- litellm/_redis.py | 59 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/litellm/_redis.py b/litellm/_redis.py index 1c300afcb..8d0fa4e18 100644 --- a/litellm/_redis.py +++ b/litellm/_redis.py @@ -19,6 +19,8 @@ import redis.asyncio as async_redis # type: ignore import litellm +from ._logging import verbose_logger + def _get_redis_kwargs(): arg_spec = inspect.getfullargspec(redis.Redis) @@ -121,17 +123,56 @@ def _get_redis_client_logic(**env_overrides): **env_overrides, } + _startup_nodes = redis_kwargs.get("startup_nodes", None) or litellm.get_secret( + "REDIS_CLUSTER_NODES" + ) + + if _startup_nodes is not None: + redis_kwargs["startup_nodes"] = json.loads(_startup_nodes) + if "url" in redis_kwargs and redis_kwargs["url"] is not None: redis_kwargs.pop("host", None) redis_kwargs.pop("port", None) redis_kwargs.pop("db", None) redis_kwargs.pop("password", None) + elif "startup_nodes" in redis_kwargs and redis_kwargs["startup_nodes"] is not None: + pass elif "host" not in redis_kwargs or redis_kwargs["host"] is None: raise ValueError("Either 'host' or 'url' must be specified for redis.") # litellm.print_verbose(f"redis_kwargs: {redis_kwargs}") return redis_kwargs +def init_redis_cluster(redis_kwargs) -> redis.RedisCluster: + _redis_cluster_nodes_in_env = litellm.get_secret("REDIS_CLUSTER_NODES") + if _redis_cluster_nodes_in_env is not None: + try: + redis_kwargs["startup_nodes"] = json.loads(_redis_cluster_nodes_in_env) + except json.JSONDecodeError: + raise ValueError( + "REDIS_CLUSTER_NODES environment variable is not valid JSON. Please ensure it's properly formatted." + ) + + verbose_logger.debug( + "init_redis_cluster: startup nodes: ", redis_kwargs["startup_nodes"] + ) + from redis.cluster import ClusterNode + + args = _get_redis_cluster_kwargs() + cluster_kwargs = {} + for arg in redis_kwargs: + if arg in args: + cluster_kwargs[arg] = redis_kwargs[arg] + + new_startup_nodes: List[ClusterNode] = [] + + for item in redis_kwargs["startup_nodes"]: + new_startup_nodes.append(ClusterNode(**item)) + + redis_kwargs.pop("startup_nodes") + return redis.RedisCluster(startup_nodes=new_startup_nodes, **cluster_kwargs) + + def get_redis_client(**env_overrides): redis_kwargs = _get_redis_client_logic(**env_overrides) if "url" in redis_kwargs and redis_kwargs["url"] is not None: @@ -147,24 +188,8 @@ def get_redis_client(**env_overrides): "startup_nodes" in redis_kwargs or litellm.get_secret("REDIS_CLUSTER_NODES") is not None ): - _redis_cluster_nodes_in_env = litellm.get_secret("REDIS_CLUSTER_NODES") - if _redis_cluster_nodes_in_env is not None: - redis_kwargs["startup_nodes"] = json.loads(_redis_cluster_nodes_in_env) + return init_redis_cluster(redis_kwargs) - from redis.cluster import ClusterNode - - args = _get_redis_cluster_kwargs() - cluster_kwargs = {} - for arg in redis_kwargs: - if arg in args: - cluster_kwargs[arg] = redis_kwargs[arg] - - new_startup_nodes: List[ClusterNode] = [] - - for item in redis_kwargs["startup_nodes"]: - new_startup_nodes.append(ClusterNode(**item)) - redis_kwargs.pop("startup_nodes") - return redis.RedisCluster(startup_nodes=new_startup_nodes, **cluster_kwargs) return redis.Redis(**redis_kwargs) From eaab0e761dc4759de5a3721101b31d0feefea3b9 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 7 Sep 2024 08:56:00 -0700 Subject: [PATCH 3/6] add test_redis_cache_cluster_init_with_env_vars_unit_test --- litellm/tests/test_caching.py | 46 +++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/litellm/tests/test_caching.py b/litellm/tests/test_caching.py index a272d2dcf..a82d7bc8e 100644 --- a/litellm/tests/test_caching.py +++ b/litellm/tests/test_caching.py @@ -837,6 +837,52 @@ async def test_redis_cache_cluster_init_unit_test(): raise e +@pytest.mark.asyncio +async def test_redis_cache_cluster_init_with_env_vars_unit_test(): + try: + import json + + from redis.asyncio import RedisCluster as AsyncRedisCluster + from redis.cluster import RedisCluster + + from litellm.caching import RedisCache + + litellm.set_verbose = True + + # List of startup nodes + startup_nodes = [ + {"host": "127.0.0.1", "port": "7001"}, + {"host": "127.0.0.1", "port": "7003"}, + {"host": "127.0.0.1", "port": "7004"}, + {"host": "127.0.0.1", "port": "7005"}, + {"host": "127.0.0.1", "port": "7006"}, + {"host": "127.0.0.1", "port": "7007"}, + ] + + # set startup nodes in environment variables + os.environ["REDIS_CLUSTER_NODES"] = json.dumps(startup_nodes) + + # unser REDIS_HOST, REDIS_PORT, REDIS_PASSWORD + os.environ.pop("REDIS_HOST", None) + os.environ.pop("REDIS_PORT", None) + os.environ.pop("REDIS_PASSWORD", None) + + resp = RedisCache() + print("response from redis cache", resp) + assert isinstance(resp.redis_client, RedisCluster) + assert isinstance(resp.init_async_client(), AsyncRedisCluster) + + resp = litellm.Cache(type="redis") + + assert isinstance(resp.cache, RedisCache) + assert isinstance(resp.cache.redis_client, RedisCluster) + assert isinstance(resp.cache.init_async_client(), AsyncRedisCluster) + + except Exception as e: + print(f"{str(e)}\n\n{traceback.format_exc()}") + raise e + + @pytest.mark.asyncio async def test_redis_cache_acompletion_stream(): try: From 9a9c0e42eb6792478996f8b3fda5fd9b94a36fa6 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 7 Sep 2024 09:25:13 -0700 Subject: [PATCH 4/6] allow setting password for redis cluster --- litellm/_redis.py | 1 + 1 file changed, 1 insertion(+) diff --git a/litellm/_redis.py b/litellm/_redis.py index 8d0fa4e18..faa98e648 100644 --- a/litellm/_redis.py +++ b/litellm/_redis.py @@ -67,6 +67,7 @@ def _get_redis_cluster_kwargs(client=None): exclude_args = {"self", "connection_pool", "retry", "host", "port", "startup_nodes"} available_args = [x for x in arg_spec.args if x not in exclude_args] + available_args.append("password") return available_args From 3bf2c06e066be36f756010dc31b76527f59cb52a Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 7 Sep 2024 09:37:23 -0700 Subject: [PATCH 5/6] add config for setting up redis cluster --- docs/my-website/docs/proxy/caching.md | 42 +++++++++++++++++++++++++++ litellm/tests/test_caching.py | 2 ++ 2 files changed, 44 insertions(+) diff --git a/docs/my-website/docs/proxy/caching.md b/docs/my-website/docs/proxy/caching.md index 6324f4243..220ef5c36 100644 --- a/docs/my-website/docs/proxy/caching.md +++ b/docs/my-website/docs/proxy/caching.md @@ -54,6 +54,10 @@ litellm_caching: #### Redis Cluster + + + + ```yaml model_list: - model_name: "*" @@ -68,6 +72,44 @@ litellm_settings: redis_startup_nodes: [{"host": "127.0.0.1", "port": "7001"}] ``` + + + + +You can configure redis cluster in your .env by setting `REDIS_CLUSTER_NODES` in your .env + +**Example `REDIS_CLUSTER_NODES`** value + +``` +REDIS_CLUSTER_NODES = "[{"host": "127.0.0.1", "port": "7001"}, {"host": "127.0.0.1", "port": "7003"}, {"host": "127.0.0.1", "port": "7004"}, {"host": "127.0.0.1", "port": "7005"}, {"host": "127.0.0.1", "port": "7006"}, {"host": "127.0.0.1", "port": "7007"}]" +``` + +:::note + +Example python script for setting redis cluster nodes in .env: + +```python +# List of startup nodes +startup_nodes = [ + {"host": "127.0.0.1", "port": "7001"}, + {"host": "127.0.0.1", "port": "7003"}, + {"host": "127.0.0.1", "port": "7004"}, + {"host": "127.0.0.1", "port": "7005"}, + {"host": "127.0.0.1", "port": "7006"}, + {"host": "127.0.0.1", "port": "7007"}, +] + +# set startup nodes in environment variables +os.environ["REDIS_CLUSTER_NODES"] = json.dumps(startup_nodes) +print("REDIS_CLUSTER_NODES", os.environ["REDIS_CLUSTER_NODES"]) +``` + +::: + + + + + #### TTL ```yaml diff --git a/litellm/tests/test_caching.py b/litellm/tests/test_caching.py index a82d7bc8e..a52f17847 100644 --- a/litellm/tests/test_caching.py +++ b/litellm/tests/test_caching.py @@ -838,6 +838,7 @@ async def test_redis_cache_cluster_init_unit_test(): @pytest.mark.asyncio +@pytest.mark.skip(reason="Local test. Requires running redis cluster locally.") async def test_redis_cache_cluster_init_with_env_vars_unit_test(): try: import json @@ -861,6 +862,7 @@ async def test_redis_cache_cluster_init_with_env_vars_unit_test(): # set startup nodes in environment variables os.environ["REDIS_CLUSTER_NODES"] = json.dumps(startup_nodes) + print("REDIS_CLUSTER_NODES", os.environ["REDIS_CLUSTER_NODES"]) # unser REDIS_HOST, REDIS_PORT, REDIS_PASSWORD os.environ.pop("REDIS_HOST", None) From 66eba43f29259026204439574bffa5540572b5ce Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sat, 7 Sep 2024 11:15:29 -0700 Subject: [PATCH 6/6] mark test_langfuse_masked_input_output --- litellm/tests/test_alangfuse.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litellm/tests/test_alangfuse.py b/litellm/tests/test_alangfuse.py index 11ac2a411..903b01411 100644 --- a/litellm/tests/test_alangfuse.py +++ b/litellm/tests/test_alangfuse.py @@ -290,7 +290,7 @@ async def test_langfuse_logging_audio_transcriptions(langfuse_client): @pytest.mark.asyncio -@pytest.mark.flaky(retries=3, delay=1) +@pytest.mark.flaky(retries=5, delay=1) async def test_langfuse_masked_input_output(langfuse_client): """ Test that creates a trace with masked input and output