registery to handle updates and deletes

This commit is contained in:
Dinesh Yeduguru 2024-11-13 20:50:26 -08:00
parent 4b1b196251
commit 9e68ed3f36
5 changed files with 113 additions and 120 deletions

View file

@ -26,9 +26,13 @@ class DistributionRegistry(Protocol):
async def initialize(self) -> None: ...
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
async def get(self, identifier: str) -> Optional[RoutableObjectWithProvider]: ...
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
def get_cached(self, identifier: str) -> Optional[RoutableObjectWithProvider]: ...
async def update(
self, obj: RoutableObjectWithProvider
) -> RoutableObjectWithProvider: ...
# The current data structure allows multiple objects with the same identifier but different providers.
# This is not ideal - we should have a single object that can be served by multiple providers,
@ -40,7 +44,7 @@ class DistributionRegistry(Protocol):
REGISTER_PREFIX = "distributions:registry"
KEY_VERSION = "v1"
KEY_VERSION = "v2"
KEY_FORMAT = f"{REGISTER_PREFIX}:{KEY_VERSION}::" + "{type}:{identifier}"
@ -54,19 +58,11 @@ def _parse_registry_values(values: List[str]) -> List[RoutableObjectWithProvider
"""Utility function to parse registry values into RoutableObjectWithProvider objects."""
all_objects = []
for value in values:
try:
objects_data = json.loads(value)
objects = [
pydantic.parse_obj_as(
RoutableObjectWithProvider,
json.loads(obj_str),
)
for obj_str in objects_data
]
all_objects.extend(objects)
except Exception as e:
print(f"Error parsing value: {e}")
traceback.print_exc()
obj = pydantic.parse_obj_as(
RoutableObjectWithProvider,
json.loads(value),
)
all_objects.append(obj)
return all_objects
@ -79,46 +75,49 @@ class DiskDistributionRegistry(DistributionRegistry):
def get_cached(
self, type: str, identifier: str
) -> List[RoutableObjectWithProvider]:
) -> Optional[RoutableObjectWithProvider]:
# Disk registry does not have a cache
return []
raise NotImplementedError("Disk registry does not have a cache")
async def get_all(self) -> List[RoutableObjectWithProvider]:
start_key, end_key = _get_registry_key_range()
values = await self.kvstore.range(start_key, end_key)
return _parse_registry_values(values)
async def get(self, type: str, identifier: str) -> List[RoutableObjectWithProvider]:
async def get(
self, type: str, identifier: str
) -> Optional[RoutableObjectWithProvider]:
json_str = await self.kvstore.get(
KEY_FORMAT.format(type=type, identifier=identifier)
)
if not json_str:
return []
return None
objects_data = json.loads(json_str)
return [
pydantic.parse_obj_as(
# Return only the first object if any exist
if objects_data:
return pydantic.parse_obj_as(
RoutableObjectWithProvider,
json.loads(obj_str),
json.loads(objects_data),
)
for obj_str in objects_data
]
return None
async def register(self, obj: RoutableObjectWithProvider) -> bool:
existing_objects = await self.get(obj.type, obj.identifier)
# dont register if the object's providerid already exists
for eobj in existing_objects:
if eobj.provider_id == obj.provider_id:
return False
existing_objects.append(obj)
objects_json = [
obj.model_dump_json() for obj in existing_objects
] # Fixed variable name
async def update(self, obj: RoutableObjectWithProvider) -> None:
await self.kvstore.set(
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),
json.dumps(objects_json),
obj.model_dump_json(),
)
return obj
async def register(self, obj: RoutableObjectWithProvider) -> bool:
existing_obj = await self.get(obj.type, obj.identifier)
# dont register if the object's providerid already exists
if existing_obj and existing_obj.provider_id == obj.provider_id:
return False
await self.kvstore.set(
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),
obj.model_dump_json(),
)
return True
@ -129,7 +128,7 @@ class DiskDistributionRegistry(DistributionRegistry):
class CachedDiskDistributionRegistry(DiskDistributionRegistry):
def __init__(self, kvstore: KVStore):
super().__init__(kvstore)
self.cache: Dict[Tuple[str, str], List[RoutableObjectWithProvider]] = {}
self.cache: Dict[Tuple[str, str], RoutableObjectWithProvider] = {}
self._initialized = False
self._initialize_lock = asyncio.Lock()
self._cache_lock = asyncio.Lock()
@ -156,13 +155,7 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
async with self._locked_cache() as cache:
for obj in objects:
cache_key = (obj.type, obj.identifier)
if cache_key not in cache:
cache[cache_key] = []
if not any(
cached_obj.provider_id == obj.provider_id
for cached_obj in cache[cache_key]
):
cache[cache_key].append(obj)
cache[cache_key] = obj
self._initialized = True
@ -171,28 +164,22 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
def get_cached(
self, type: str, identifier: str
) -> List[RoutableObjectWithProvider]:
return self.cache.get((type, identifier), [])[:] # Return a copy
) -> Optional[RoutableObjectWithProvider]:
return self.cache.get((type, identifier), None)
async def get_all(self) -> List[RoutableObjectWithProvider]:
await self._ensure_initialized()
async with self._locked_cache() as cache:
return [item for sublist in cache.values() for item in sublist]
return list(cache.values())
async def get(self, type: str, identifier: str) -> List[RoutableObjectWithProvider]:
async def get(
self, type: str, identifier: str
) -> Optional[RoutableObjectWithProvider]:
await self._ensure_initialized()
cache_key = (type, identifier)
async with self._locked_cache() as cache:
if cache_key in cache:
return cache[cache_key][:]
objects = await super().get(type, identifier)
if objects:
async with self._locked_cache() as cache:
cache[cache_key] = objects
return objects
return cache.get(cache_key, None)
async def register(self, obj: RoutableObjectWithProvider) -> bool:
await self._ensure_initialized()
@ -201,16 +188,17 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
if success:
cache_key = (obj.type, obj.identifier)
async with self._locked_cache() as cache:
if cache_key not in cache:
cache[cache_key] = []
if not any(
cached_obj.provider_id == obj.provider_id
for cached_obj in cache[cache_key]
):
cache[cache_key].append(obj)
cache[cache_key] = obj
return success
async def update(self, obj: RoutableObjectWithProvider) -> None:
await super().update(obj)
cache_key = (obj.type, obj.identifier)
async with self._locked_cache() as cache:
cache[cache_key] = obj
return obj
async def delete(self, type: str, identifier: str) -> None:
await super().delete(type, identifier)
cache_key = (type, identifier)