fix(base_routing_strategy.py): refactor for cleaner code

This commit is contained in:
Krrish Dholakia 2025-03-18 22:57:05 -07:00
parent 9a00e0a008
commit 381b220e60

View file

@ -3,6 +3,7 @@ Base class across routing strategies to abstract commmon functions like batch in
""" """
import asyncio import asyncio
import threading
from abc import ABC from abc import ABC
from typing import List, Optional, Set, Union from typing import List, Optional, Set, Union
@ -22,18 +23,20 @@ class BaseRoutingStrategy(ABC):
self.dual_cache = dual_cache self.dual_cache = dual_cache
self.redis_increment_operation_queue: List[RedisPipelineIncrementOperation] = [] self.redis_increment_operation_queue: List[RedisPipelineIncrementOperation] = []
if should_batch_redis_writes: if should_batch_redis_writes:
import threading try:
# Try to get existing event loop
thread = threading.Thread( loop = asyncio.get_event_loop()
target=asyncio.run, if loop.is_running():
args=( # If loop exists and is running, create task in existing loop
loop.create_task(
self.periodic_sync_in_memory_spend_with_redis( self.periodic_sync_in_memory_spend_with_redis(
default_sync_interval=default_sync_interval default_sync_interval=default_sync_interval
),
),
daemon=True,
) )
thread.start() )
else:
self._create_sync_thread(default_sync_interval)
except RuntimeError: # No event loop in current thread
self._create_sync_thread(default_sync_interval)
self.in_memory_keys_to_update: set[str] = ( self.in_memory_keys_to_update: set[str] = (
set() set()
@ -172,3 +175,16 @@ class BaseRoutingStrategy(ABC):
verbose_router_logger.exception( verbose_router_logger.exception(
f"Error syncing in-memory cache with Redis: {str(e)}" f"Error syncing in-memory cache with Redis: {str(e)}"
) )
def _create_sync_thread(self, default_sync_interval):
"""Helper method to create a new thread for periodic sync"""
thread = threading.Thread(
target=asyncio.run,
args=(
self.periodic_sync_in_memory_spend_with_redis(
default_sync_interval=default_sync_interval
),
),
daemon=True,
)
thread.start()