forked from phoenix-oss/llama-stack-mirror
Support model resource updates and deletes (#452)
# What does this PR do? * Changes the registry to store only one RoutableObject per identifier. Before it was a list, which is not really required. * Adds impl for updates and deletes * Updates routing table to handle updates correctly ## Test Plan ``` ❯ llama-stack-client models list +------------------------+---------------+------------------------------------+------------+ | identifier | provider_id | provider_resource_id | metadata | +========================+===============+====================================+============+ | Llama3.1-405B-Instruct | fireworks-0 | fireworks/llama-v3p1-405b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.1-8B-Instruct | fireworks-0 | fireworks/llama-v3p1-8b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.2-3B-Instruct | fireworks-0 | fireworks/llama-v3p2-1b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ ❯ llama-stack-client models register dineshyv-model --provider-model-id=fireworks/llama-v3p1-70b-instruct Successfully registered model dineshyv-model ❯ llama-stack-client models list +------------------------+---------------+------------------------------------+------------+ | identifier | provider_id | provider_resource_id | metadata | +========================+===============+====================================+============+ | Llama3.1-405B-Instruct | fireworks-0 | fireworks/llama-v3p1-405b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.1-8B-Instruct | fireworks-0 | fireworks/llama-v3p1-8b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.2-3B-Instruct | fireworks-0 | fireworks/llama-v3p2-1b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | dineshyv-model | fireworks-0 | fireworks/llama-v3p1-70b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ ❯ llama-stack-client models update dineshyv-model --provider-model-id=fireworks/llama-v3p1-405b-instruct Successfully updated model dineshyv-model ❯ llama-stack-client models list +------------------------+---------------+------------------------------------+------------+ | identifier | provider_id | provider_resource_id | metadata | +========================+===============+====================================+============+ | Llama3.1-405B-Instruct | fireworks-0 | fireworks/llama-v3p1-405b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.1-8B-Instruct | fireworks-0 | fireworks/llama-v3p1-8b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.2-3B-Instruct | fireworks-0 | fireworks/llama-v3p2-1b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | dineshyv-model | fireworks-0 | fireworks/llama-v3p1-405b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ llama-stack-client models delete dineshyv-model ❯ llama-stack-client models list +------------------------+---------------+------------------------------------+------------+ | identifier | provider_id | provider_resource_id | metadata | +========================+===============+====================================+============+ | Llama3.1-405B-Instruct | fireworks-0 | fireworks/llama-v3p1-405b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.1-8B-Instruct | fireworks-0 | fireworks/llama-v3p1-8b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ | Llama3.2-3B-Instruct | fireworks-0 | fireworks/llama-v3p2-1b-instruct | {} | +------------------------+---------------+------------------------------------+------------+ ``` --------- Co-authored-by: Dinesh Yeduguru <dineshyv@fb.com>
This commit is contained in:
parent
4253cfcd7f
commit
efe791bab7
7 changed files with 447 additions and 129 deletions
|
@ -26,19 +26,21 @@ class DistributionRegistry(Protocol):
|
|||
|
||||
async def initialize(self) -> None: ...
|
||||
|
||||
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
|
||||
async def get(self, identifier: str) -> Optional[RoutableObjectWithProvider]: ...
|
||||
|
||||
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
|
||||
def get_cached(self, identifier: str) -> Optional[RoutableObjectWithProvider]: ...
|
||||
|
||||
async def update(
|
||||
self, obj: RoutableObjectWithProvider
|
||||
) -> RoutableObjectWithProvider: ...
|
||||
|
||||
# The current data structure allows multiple objects with the same identifier but different providers.
|
||||
# This is not ideal - we should have a single object that can be served by multiple providers,
|
||||
# suggesting a data structure like (obj: Obj, providers: List[str]) rather than List[RoutableObjectWithProvider].
|
||||
# The current approach could lead to inconsistencies if the same logical object has different data across providers.
|
||||
async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
|
||||
|
||||
async def delete(self, type: str, identifier: str) -> None: ...
|
||||
|
||||
|
||||
REGISTER_PREFIX = "distributions:registry"
|
||||
KEY_VERSION = "v1"
|
||||
KEY_VERSION = "v2"
|
||||
KEY_FORMAT = f"{REGISTER_PREFIX}:{KEY_VERSION}::" + "{type}:{identifier}"
|
||||
|
||||
|
||||
|
@ -52,19 +54,11 @@ def _parse_registry_values(values: List[str]) -> List[RoutableObjectWithProvider
|
|||
"""Utility function to parse registry values into RoutableObjectWithProvider objects."""
|
||||
all_objects = []
|
||||
for value in values:
|
||||
try:
|
||||
objects_data = json.loads(value)
|
||||
objects = [
|
||||
pydantic.parse_obj_as(
|
||||
RoutableObjectWithProvider,
|
||||
json.loads(obj_str),
|
||||
)
|
||||
for obj_str in objects_data
|
||||
]
|
||||
all_objects.extend(objects)
|
||||
except Exception as e:
|
||||
print(f"Error parsing value: {e}")
|
||||
traceback.print_exc()
|
||||
obj = pydantic.parse_obj_as(
|
||||
RoutableObjectWithProvider,
|
||||
json.loads(value),
|
||||
)
|
||||
all_objects.append(obj)
|
||||
return all_objects
|
||||
|
||||
|
||||
|
@ -77,54 +71,60 @@ class DiskDistributionRegistry(DistributionRegistry):
|
|||
|
||||
def get_cached(
|
||||
self, type: str, identifier: str
|
||||
) -> List[RoutableObjectWithProvider]:
|
||||
) -> Optional[RoutableObjectWithProvider]:
|
||||
# Disk registry does not have a cache
|
||||
return []
|
||||
raise NotImplementedError("Disk registry does not have a cache")
|
||||
|
||||
async def get_all(self) -> List[RoutableObjectWithProvider]:
|
||||
start_key, end_key = _get_registry_key_range()
|
||||
values = await self.kvstore.range(start_key, end_key)
|
||||
return _parse_registry_values(values)
|
||||
|
||||
async def get(self, type: str, identifier: str) -> List[RoutableObjectWithProvider]:
|
||||
async def get(
|
||||
self, type: str, identifier: str
|
||||
) -> Optional[RoutableObjectWithProvider]:
|
||||
json_str = await self.kvstore.get(
|
||||
KEY_FORMAT.format(type=type, identifier=identifier)
|
||||
)
|
||||
if not json_str:
|
||||
return []
|
||||
return None
|
||||
|
||||
objects_data = json.loads(json_str)
|
||||
return [
|
||||
pydantic.parse_obj_as(
|
||||
# Return only the first object if any exist
|
||||
if objects_data:
|
||||
return pydantic.parse_obj_as(
|
||||
RoutableObjectWithProvider,
|
||||
json.loads(obj_str),
|
||||
json.loads(objects_data),
|
||||
)
|
||||
for obj_str in objects_data
|
||||
]
|
||||
return None
|
||||
|
||||
async def register(self, obj: RoutableObjectWithProvider) -> bool:
|
||||
existing_objects = await self.get(obj.type, obj.identifier)
|
||||
# dont register if the object's providerid already exists
|
||||
for eobj in existing_objects:
|
||||
if eobj.provider_id == obj.provider_id:
|
||||
return False
|
||||
|
||||
existing_objects.append(obj)
|
||||
|
||||
objects_json = [
|
||||
obj.model_dump_json() for obj in existing_objects
|
||||
] # Fixed variable name
|
||||
async def update(self, obj: RoutableObjectWithProvider) -> None:
|
||||
await self.kvstore.set(
|
||||
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),
|
||||
json.dumps(objects_json),
|
||||
obj.model_dump_json(),
|
||||
)
|
||||
return obj
|
||||
|
||||
async def register(self, obj: RoutableObjectWithProvider) -> bool:
|
||||
existing_obj = await self.get(obj.type, obj.identifier)
|
||||
# dont register if the object's providerid already exists
|
||||
if existing_obj and existing_obj.provider_id == obj.provider_id:
|
||||
return False
|
||||
|
||||
await self.kvstore.set(
|
||||
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),
|
||||
obj.model_dump_json(),
|
||||
)
|
||||
return True
|
||||
|
||||
async def delete(self, type: str, identifier: str) -> None:
|
||||
await self.kvstore.delete(KEY_FORMAT.format(type=type, identifier=identifier))
|
||||
|
||||
|
||||
class CachedDiskDistributionRegistry(DiskDistributionRegistry):
|
||||
def __init__(self, kvstore: KVStore):
|
||||
super().__init__(kvstore)
|
||||
self.cache: Dict[Tuple[str, str], List[RoutableObjectWithProvider]] = {}
|
||||
self.cache: Dict[Tuple[str, str], RoutableObjectWithProvider] = {}
|
||||
self._initialized = False
|
||||
self._initialize_lock = asyncio.Lock()
|
||||
self._cache_lock = asyncio.Lock()
|
||||
|
@ -151,13 +151,7 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
|
|||
async with self._locked_cache() as cache:
|
||||
for obj in objects:
|
||||
cache_key = (obj.type, obj.identifier)
|
||||
if cache_key not in cache:
|
||||
cache[cache_key] = []
|
||||
if not any(
|
||||
cached_obj.provider_id == obj.provider_id
|
||||
for cached_obj in cache[cache_key]
|
||||
):
|
||||
cache[cache_key].append(obj)
|
||||
cache[cache_key] = obj
|
||||
|
||||
self._initialized = True
|
||||
|
||||
|
@ -166,28 +160,22 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
|
|||
|
||||
def get_cached(
|
||||
self, type: str, identifier: str
|
||||
) -> List[RoutableObjectWithProvider]:
|
||||
return self.cache.get((type, identifier), [])[:] # Return a copy
|
||||
) -> Optional[RoutableObjectWithProvider]:
|
||||
return self.cache.get((type, identifier), None)
|
||||
|
||||
async def get_all(self) -> List[RoutableObjectWithProvider]:
|
||||
await self._ensure_initialized()
|
||||
async with self._locked_cache() as cache:
|
||||
return [item for sublist in cache.values() for item in sublist]
|
||||
return list(cache.values())
|
||||
|
||||
async def get(self, type: str, identifier: str) -> List[RoutableObjectWithProvider]:
|
||||
async def get(
|
||||
self, type: str, identifier: str
|
||||
) -> Optional[RoutableObjectWithProvider]:
|
||||
await self._ensure_initialized()
|
||||
cache_key = (type, identifier)
|
||||
|
||||
async with self._locked_cache() as cache:
|
||||
if cache_key in cache:
|
||||
return cache[cache_key][:]
|
||||
|
||||
objects = await super().get(type, identifier)
|
||||
if objects:
|
||||
async with self._locked_cache() as cache:
|
||||
cache[cache_key] = objects
|
||||
|
||||
return objects
|
||||
return cache.get(cache_key, None)
|
||||
|
||||
async def register(self, obj: RoutableObjectWithProvider) -> bool:
|
||||
await self._ensure_initialized()
|
||||
|
@ -196,16 +184,24 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
|
|||
if success:
|
||||
cache_key = (obj.type, obj.identifier)
|
||||
async with self._locked_cache() as cache:
|
||||
if cache_key not in cache:
|
||||
cache[cache_key] = []
|
||||
if not any(
|
||||
cached_obj.provider_id == obj.provider_id
|
||||
for cached_obj in cache[cache_key]
|
||||
):
|
||||
cache[cache_key].append(obj)
|
||||
cache[cache_key] = obj
|
||||
|
||||
return success
|
||||
|
||||
async def update(self, obj: RoutableObjectWithProvider) -> None:
|
||||
await super().update(obj)
|
||||
cache_key = (obj.type, obj.identifier)
|
||||
async with self._locked_cache() as cache:
|
||||
cache[cache_key] = obj
|
||||
return obj
|
||||
|
||||
async def delete(self, type: str, identifier: str) -> None:
|
||||
await super().delete(type, identifier)
|
||||
cache_key = (type, identifier)
|
||||
async with self._locked_cache() as cache:
|
||||
if cache_key in cache:
|
||||
del cache[cache_key]
|
||||
|
||||
|
||||
async def create_dist_registry(
|
||||
metadata_store: Optional[KVStoreConfig],
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue