Fix Registry so it scopes keys by object types

This commit is contained in:
Ashwin Bharambe 2024-11-12 12:25:43 -08:00
parent b1c3a95485
commit b94e6c0bd4
7 changed files with 63 additions and 47 deletions

View file

@ -5,7 +5,7 @@
# the root directory of this source tree.
import json
from typing import Dict, List, Optional, Protocol
from typing import Dict, List, Optional, Protocol, Tuple
import pydantic
@ -35,7 +35,8 @@ class DistributionRegistry(Protocol):
async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
KEY_FORMAT = "distributions:registry:v1::{}"
KEY_VERSION = "v1"
KEY_FORMAT = f"distributions:registry:{KEY_VERSION}::" + "{type}:{identifier}"
class DiskDistributionRegistry(DistributionRegistry):
@ -45,18 +46,24 @@ class DiskDistributionRegistry(DistributionRegistry):
async def initialize(self) -> None:
pass
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]:
def get_cached(
self, type: str, identifier: str
) -> List[RoutableObjectWithProvider]:
# Disk registry does not have a cache
return []
async def get_all(self) -> List[RoutableObjectWithProvider]:
start_key = KEY_FORMAT.format("")
end_key = KEY_FORMAT.format("\xff")
start_key = KEY_FORMAT.format(type="", identifier="")
end_key = KEY_FORMAT.format(type="", identifier="\xff")
keys = await self.kvstore.range(start_key, end_key)
return [await self.get(key.split(":")[-1]) for key in keys]
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]:
json_str = await self.kvstore.get(KEY_FORMAT.format(identifier))
tuples = [(key.split(":")[-2], key.split(":")[-1]) for key in keys]
return [await self.get(type, identifier) for type, identifier in tuples]
async def get(self, type: str, identifier: str) -> List[RoutableObjectWithProvider]:
json_str = await self.kvstore.get(
KEY_FORMAT.format(type=type, identifier=identifier)
)
if not json_str:
return []
@ -70,7 +77,7 @@ class DiskDistributionRegistry(DistributionRegistry):
]
async def register(self, obj: RoutableObjectWithProvider) -> bool:
existing_objects = await self.get(obj.identifier)
existing_objects = await self.get(obj.type, obj.identifier)
# dont register if the object's providerid already exists
for eobj in existing_objects:
if eobj.provider_id == obj.provider_id:
@ -82,7 +89,8 @@ class DiskDistributionRegistry(DistributionRegistry):
obj.model_dump_json() for obj in existing_objects
] # Fixed variable name
await self.kvstore.set(
KEY_FORMAT.format(obj.identifier), json.dumps(objects_json)
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),
json.dumps(objects_json),
)
return True
@ -90,33 +98,36 @@ class DiskDistributionRegistry(DistributionRegistry):
class CachedDiskDistributionRegistry(DiskDistributionRegistry):
def __init__(self, kvstore: KVStore):
super().__init__(kvstore)
self.cache: Dict[str, List[RoutableObjectWithProvider]] = {}
self.cache: Dict[Tuple[str, str], List[RoutableObjectWithProvider]] = {}
async def initialize(self) -> None:
start_key = KEY_FORMAT.format("")
end_key = KEY_FORMAT.format("\xff")
start_key = KEY_FORMAT.format(type="", identifier="")
end_key = KEY_FORMAT.format(type="", identifier="\xff")
keys = await self.kvstore.range(start_key, end_key)
for key in keys:
identifier = key.split(":")[-1]
objects = await super().get(identifier)
type, identifier = key.split(":")[-2:]
objects = await super().get(type, identifier)
if objects:
self.cache[identifier] = objects
self.cache[type, identifier] = objects
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]:
return self.cache.get(identifier, [])
def get_cached(
self, type: str, identifier: str
) -> List[RoutableObjectWithProvider]:
return self.cache.get((type, identifier), [])
async def get_all(self) -> List[RoutableObjectWithProvider]:
return [item for sublist in self.cache.values() for item in sublist]
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]:
if identifier in self.cache:
return self.cache[identifier]
async def get(self, type: str, identifier: str) -> List[RoutableObjectWithProvider]:
cachekey = (type, identifier)
if cachekey in self.cache:
return self.cache[cachekey]
objects = await super().get(identifier)
objects = await super().get(type, identifier)
if objects:
self.cache[identifier] = objects
self.cache[cachekey] = objects
return objects
@ -126,16 +137,17 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
if success:
# Then update cache
if obj.identifier not in self.cache:
self.cache[obj.identifier] = []
cachekey = (obj.type, obj.identifier)
if cachekey not in self.cache:
self.cache[cachekey] = []
# Check if provider already exists in cache
for cached_obj in self.cache[obj.identifier]:
for cached_obj in self.cache[cachekey]:
if cached_obj.provider_id == obj.provider_id:
return success
# If not, update cache
self.cache[obj.identifier].append(obj)
self.cache[cachekey].append(obj)
return success