diff --git a/litellm/router_strategy/base_routing_strategy.py b/litellm/router_strategy/base_routing_strategy.py index 10969a809b..a39d17e386 100644 --- a/litellm/router_strategy/base_routing_strategy.py +++ b/litellm/router_strategy/base_routing_strategy.py @@ -3,6 +3,7 @@ Base class across routing strategies to abstract commmon functions like batch in """ import asyncio +import threading from abc import ABC from typing import List, Optional, Set, Union @@ -22,18 +23,20 @@ class BaseRoutingStrategy(ABC): self.dual_cache = dual_cache self.redis_increment_operation_queue: List[RedisPipelineIncrementOperation] = [] if should_batch_redis_writes: - import threading - - thread = threading.Thread( - target=asyncio.run, - args=( - self.periodic_sync_in_memory_spend_with_redis( - default_sync_interval=default_sync_interval - ), - ), - daemon=True, - ) - thread.start() + try: + # Try to get existing event loop + loop = asyncio.get_event_loop() + if loop.is_running(): + # If loop exists and is running, create task in existing loop + loop.create_task( + self.periodic_sync_in_memory_spend_with_redis( + default_sync_interval=default_sync_interval + ) + ) + else: + self._create_sync_thread(default_sync_interval) + except RuntimeError: # No event loop in current thread + self._create_sync_thread(default_sync_interval) self.in_memory_keys_to_update: set[str] = ( set() @@ -172,3 +175,16 @@ class BaseRoutingStrategy(ABC): verbose_router_logger.exception( f"Error syncing in-memory cache with Redis: {str(e)}" ) + + def _create_sync_thread(self, default_sync_interval): + """Helper method to create a new thread for periodic sync""" + thread = threading.Thread( + target=asyncio.run, + args=( + self.periodic_sync_in_memory_spend_with_redis( + default_sync_interval=default_sync_interval + ), + ), + daemon=True, + ) + thread.start()