updated the warning message

This commit is contained in:
Omar Abdelwahab 2025-09-15 17:07:42 -07:00
parent a67b9d7523
commit 5ce41fa85e

View file

@ -28,9 +28,7 @@ class DistributionRegistry(Protocol):
def get_cached(self, identifier: str) -> RoutableObjectWithProvider | None: ... def get_cached(self, identifier: str) -> RoutableObjectWithProvider | None: ...
async def update( async def update(self, obj: RoutableObjectWithProvider) -> RoutableObjectWithProvider: ...
self, obj: RoutableObjectWithProvider
) -> RoutableObjectWithProvider: ...
async def register(self, obj: RoutableObjectWithProvider) -> bool: ... async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
@ -56,9 +54,7 @@ def _parse_registry_values(values: list[str]) -> list[RoutableObjectWithProvider
obj = pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(value) obj = pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(value)
all_objects.append(obj) all_objects.append(obj)
except pydantic.ValidationError as e: except pydantic.ValidationError as e:
logger.error( logger.error(f"Error parsing registry value, raw value: {value}. Error: {e}")
f"Error parsing registry value, raw value: {value}. Error: {e}"
)
continue continue
return all_objects return all_objects
@ -71,9 +67,7 @@ class DiskDistributionRegistry(DistributionRegistry):
async def initialize(self) -> None: async def initialize(self) -> None:
pass pass
def get_cached( def get_cached(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
self, type: str, identifier: str
) -> RoutableObjectWithProvider | None:
# Disk registry does not have a cache # Disk registry does not have a cache
raise NotImplementedError("Disk registry does not have a cache") raise NotImplementedError("Disk registry does not have a cache")
@ -82,23 +76,15 @@ class DiskDistributionRegistry(DistributionRegistry):
values = await self.kvstore.values_in_range(start_key, end_key) values = await self.kvstore.values_in_range(start_key, end_key)
return _parse_registry_values(values) return _parse_registry_values(values)
async def get( async def get(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
self, type: str, identifier: str json_str = await self.kvstore.get(KEY_FORMAT.format(type=type, identifier=identifier))
) -> RoutableObjectWithProvider | None:
json_str = await self.kvstore.get(
KEY_FORMAT.format(type=type, identifier=identifier)
)
if not json_str: if not json_str:
return None return None
try: try:
return pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json( return pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(json_str)
json_str
)
except pydantic.ValidationError as e: except pydantic.ValidationError as e:
logger.error( logger.error(f"Error parsing registry value for {type}:{identifier}, raw value: {json_str}. Error: {e}")
f"Error parsing registry value for {type}:{identifier}, raw value: {json_str}. Error: {e}"
)
return None return None
async def update(self, obj: RoutableObjectWithProvider) -> None: async def update(self, obj: RoutableObjectWithProvider) -> None:
@ -111,9 +97,9 @@ class DiskDistributionRegistry(DistributionRegistry):
async def register(self, obj: RoutableObjectWithProvider) -> bool: async def register(self, obj: RoutableObjectWithProvider) -> bool:
existing_obj = await self.get(obj.type, obj.identifier) existing_obj = await self.get(obj.type, obj.identifier)
# warn if the object's providerid already exists but proceed with registration # warn if the object's providerid already exists but proceed with registration
if existing_obj and existing_obj.provider_id == obj.provider_id: if existing_obj and existing_obj.provider_id != obj.provider_id:
logger.warning( logger.warning(
f"Object {obj.type}:{obj.identifier} with provider_id {obj.provider_id} already exists, overwriting" f"Object {existing_obj.type}:{existing_obj.identifier} with provider_id {existing_obj.provider_id} is being replaced with {existing_obj.type}:{existing_obj.identifier} with provider_id {obj.provider_id}, overwriting"
) )
await self.kvstore.set( await self.kvstore.set(
@ -163,9 +149,7 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
async def initialize(self) -> None: async def initialize(self) -> None:
await self._ensure_initialized() await self._ensure_initialized()
def get_cached( def get_cached(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
self, type: str, identifier: str
) -> RoutableObjectWithProvider | None:
return self.cache.get((type, identifier), None) return self.cache.get((type, identifier), None)
async def get_all(self) -> list[RoutableObjectWithProvider]: async def get_all(self) -> list[RoutableObjectWithProvider]:
@ -173,9 +157,7 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
async with self._locked_cache() as cache: async with self._locked_cache() as cache:
return list(cache.values()) return list(cache.values())
async def get( async def get(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
self, type: str, identifier: str
) -> RoutableObjectWithProvider | None:
await self._ensure_initialized() await self._ensure_initialized()
cache_key = (type, identifier) cache_key = (type, identifier)
@ -217,9 +199,7 @@ async def create_dist_registry(
dist_kvstore = await kvstore_impl(metadata_store) dist_kvstore = await kvstore_impl(metadata_store)
else: else:
dist_kvstore = await kvstore_impl( dist_kvstore = await kvstore_impl(
SqliteKVStoreConfig( SqliteKVStoreConfig(db_path=(DISTRIBS_BASE_DIR / image_name / "kvstore.db").as_posix())
db_path=(DISTRIBS_BASE_DIR / image_name / "kvstore.db").as_posix()
)
) )
dist_registry = CachedDiskDistributionRegistry(dist_kvstore) dist_registry = CachedDiskDistributionRegistry(dist_kvstore)
await dist_registry.initialize() await dist_registry.initialize()