fix(base_routing_strategy.py): refactor for cleaner code

This commit is contained in:
Krrish Dholakia 2025-03-18 22:57:05 -07:00
parent 084e8c425c
commit 2c69b5d221

View file

@ -3,6 +3,7 @@ Base class across routing strategies to abstract commmon functions like batch in
"""
import asyncio
import threading
from abc import ABC
from typing import List, Optional, Set, Union
@ -22,18 +23,20 @@ class BaseRoutingStrategy(ABC):
self.dual_cache = dual_cache
self.redis_increment_operation_queue: List[RedisPipelineIncrementOperation] = []
if should_batch_redis_writes:
import threading
thread = threading.Thread(
target=asyncio.run,
args=(
self.periodic_sync_in_memory_spend_with_redis(
default_sync_interval=default_sync_interval
),
),
daemon=True,
)
thread.start()
try:
# Try to get existing event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop exists and is running, create task in existing loop
loop.create_task(
self.periodic_sync_in_memory_spend_with_redis(
default_sync_interval=default_sync_interval
)
)
else:
self._create_sync_thread(default_sync_interval)
except RuntimeError: # No event loop in current thread
self._create_sync_thread(default_sync_interval)
self.in_memory_keys_to_update: set[str] = (
set()
@ -172,3 +175,16 @@ class BaseRoutingStrategy(ABC):
verbose_router_logger.exception(
f"Error syncing in-memory cache with Redis: {str(e)}"
)
def _create_sync_thread(self, default_sync_interval):
"""Helper method to create a new thread for periodic sync"""
thread = threading.Thread(
target=asyncio.run,
args=(
self.periodic_sync_in_memory_spend_with_redis(
default_sync_interval=default_sync_interval
),
),
daemon=True,
)
thread.start()