llama-stack/llama_stack/distribution/store/registry.py
Sébastien Han c91e3552a3
feat: implementation for agent/session list and describe (#1606)
Create a new agent:

```
curl --request POST \
  --url http://localhost:8321/v1/agents \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '{
  "agent_config": {
    "sampling_params": {
      "strategy": {
        "type": "greedy"
      },
      "max_tokens": 0,
      "repetition_penalty": 1
    },
    "input_shields": [
      "string"
    ],
    "output_shields": [
      "string"
    ],
    "toolgroups": [
      "string"
    ],
    "client_tools": [
      {
        "name": "string",
        "description": "string",
        "parameters": [
          {
            "name": "string",
            "parameter_type": "string",
            "description": "string",
            "required": true,
            "default": null
          }
        ],
        "metadata": {
          "property1": null,
          "property2": null
        }
      }
    ],
    "tool_choice": "auto",
    "tool_prompt_format": "json",
    "tool_config": {
      "tool_choice": "auto",
      "tool_prompt_format": "json",
      "system_message_behavior": "append"
    },
    "max_infer_iters": 10,
    "model": "string",
    "instructions": "string",
    "enable_session_persistence": false,
    "response_format": {
      "type": "json_schema",
      "json_schema": {
        "property1": null,
        "property2": null
      }
    }
  }
}'
```

Get agent:

```
curl http://127.0.0.1:8321/v1/agents/9abad4ab-2c77-45f9-9d16-46b79d2bea1f
{"agent_id":"9abad4ab-2c77-45f9-9d16-46b79d2bea1f","agent_config":{"sampling_params":{"strategy":{"type":"greedy"},"max_tokens":0,"repetition_penalty":1.0},"input_shields":["string"],"output_shields":["string"],"toolgroups":["string"],"client_tools":[{"name":"string","description":"string","parameters":[{"name":"string","parameter_type":"string","description":"string","required":true,"default":null}],"metadata":{"property1":null,"property2":null}}],"tool_choice":"auto","tool_prompt_format":"json","tool_config":{"tool_choice":"auto","tool_prompt_format":"json","system_message_behavior":"append"},"max_infer_iters":10,"model":"string","instructions":"string","enable_session_persistence":false,"response_format":{"type":"json_schema","json_schema":{"property1":null,"property2":null}}},"created_at":"2025-03-12T16:18:28.369144Z"}%
```

List agents:

```
curl http://127.0.0.1:8321/v1/agents|jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1680  100  1680    0     0   498k      0 --:--:-- --:--:-- --:--:--  546k
{
  "data": [
    {
      "agent_id": "9abad4ab-2c77-45f9-9d16-46b79d2bea1f",
      "agent_config": {
        "sampling_params": {
          "strategy": {
            "type": "greedy"
          },
          "max_tokens": 0,
          "repetition_penalty": 1.0
        },
        "input_shields": [
          "string"
        ],
        "output_shields": [
          "string"
        ],
        "toolgroups": [
          "string"
        ],
        "client_tools": [
          {
            "name": "string",
            "description": "string",
            "parameters": [
              {
                "name": "string",
                "parameter_type": "string",
                "description": "string",
                "required": true,
                "default": null
              }
            ],
            "metadata": {
              "property1": null,
              "property2": null
            }
          }
        ],
        "tool_choice": "auto",
        "tool_prompt_format": "json",
        "tool_config": {
          "tool_choice": "auto",
          "tool_prompt_format": "json",
          "system_message_behavior": "append"
        },
        "max_infer_iters": 10,
        "model": "string",
        "instructions": "string",
        "enable_session_persistence": false,
        "response_format": {
          "type": "json_schema",
          "json_schema": {
            "property1": null,
            "property2": null
          }
        }
      },
      "created_at": "2025-03-12T16:18:28.369144Z"
    },
    {
      "agent_id": "a6643aaa-96dd-46db-a405-333dc504b168",
      "agent_config": {
        "sampling_params": {
          "strategy": {
            "type": "greedy"
          },
          "max_tokens": 0,
          "repetition_penalty": 1.0
        },
        "input_shields": [
          "string"
        ],
        "output_shields": [
          "string"
        ],
        "toolgroups": [
          "string"
        ],
        "client_tools": [
          {
            "name": "string",
            "description": "string",
            "parameters": [
              {
                "name": "string",
                "parameter_type": "string",
                "description": "string",
                "required": true,
                "default": null
              }
            ],
            "metadata": {
              "property1": null,
              "property2": null
            }
          }
        ],
        "tool_choice": "auto",
        "tool_prompt_format": "json",
        "tool_config": {
          "tool_choice": "auto",
          "tool_prompt_format": "json",
          "system_message_behavior": "append"
        },
        "max_infer_iters": 10,
        "model": "string",
        "instructions": "string",
        "enable_session_persistence": false,
        "response_format": {
          "type": "json_schema",
          "json_schema": {
            "property1": null,
            "property2": null
          }
        }
      },
      "created_at": "2025-03-12T16:17:12.811273Z"
    }
  ]
}
```

Create sessions:

```
curl --request POST \
  --url http://localhost:8321/v1/agents/{agent_id}/session \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '{
  "session_name": "string"
}'
```

List sessions:

```
 curl http://127.0.0.1:8321/v1/agents/9abad4ab-2c77-45f9-9d16-46b79d2bea1f/sessions|jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   263  100   263    0     0  90099      0 --:--:-- --:--:-- --:--:--  128k
[
  {
    "session_id": "2b15c4fc-e348-46c1-ae32-f6d424441ac1",
    "session_name": "string",
    "turns": [],
    "started_at": "2025-03-12T17:19:17.784328"
  },
  {
    "session_id": "9432472d-d483-4b73-b682-7b1d35d64111",
    "session_name": "string",
    "turns": [],
    "started_at": "2025-03-12T17:19:19.885834"
  }
]
```

Signed-off-by: Sébastien Han <seb@redhat.com>
2025-05-07 14:49:23 +02:00

204 lines
7.4 KiB
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
from contextlib import asynccontextmanager
from typing import Protocol
import pydantic
from llama_stack.distribution.datatypes import KVStoreConfig, RoutableObjectWithProvider
from llama_stack.distribution.utils.config_dirs import DISTRIBS_BASE_DIR
from llama_stack.log import get_logger
from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
logger = get_logger(__name__, category="core")
class DistributionRegistry(Protocol):
async def get_all(self) -> list[RoutableObjectWithProvider]: ...
async def initialize(self) -> None: ...
async def get(self, identifier: str) -> RoutableObjectWithProvider | None: ...
def get_cached(self, identifier: str) -> RoutableObjectWithProvider | None: ...
async def update(self, obj: RoutableObjectWithProvider) -> RoutableObjectWithProvider: ...
async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
async def delete(self, type: str, identifier: str) -> None: ...
REGISTER_PREFIX = "distributions:registry"
KEY_VERSION = "v8"
KEY_FORMAT = f"{REGISTER_PREFIX}:{KEY_VERSION}::" + "{type}:{identifier}"
def _get_registry_key_range() -> tuple[str, str]:
"""Returns the start and end keys for the registry range query."""
start_key = f"{REGISTER_PREFIX}:{KEY_VERSION}"
return start_key, f"{start_key}\xff"
def _parse_registry_values(values: list[str]) -> list[RoutableObjectWithProvider]:
"""Utility function to parse registry values into RoutableObjectWithProvider objects."""
all_objects = []
for value in values:
try:
obj = pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(value)
all_objects.append(obj)
except pydantic.ValidationError as e:
logger.error(f"Error parsing registry value, raw value: {value}. Error: {e}")
continue
return all_objects
class DiskDistributionRegistry(DistributionRegistry):
def __init__(self, kvstore: KVStore):
self.kvstore = kvstore
async def initialize(self) -> None:
pass
def get_cached(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
# Disk registry does not have a cache
raise NotImplementedError("Disk registry does not have a cache")
async def get_all(self) -> list[RoutableObjectWithProvider]:
start_key, end_key = _get_registry_key_range()
values = await self.kvstore.values_in_range(start_key, end_key)
return _parse_registry_values(values)
async def get(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
json_str = await self.kvstore.get(KEY_FORMAT.format(type=type, identifier=identifier))
if not json_str:
return None
try:
return pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(json_str)
except pydantic.ValidationError as e:
logger.error(f"Error parsing registry value for {type}:{identifier}, raw value: {json_str}. Error: {e}")
return None
async def update(self, obj: RoutableObjectWithProvider) -> None:
await self.kvstore.set(
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),
obj.model_dump_json(),
)
return obj
async def register(self, obj: RoutableObjectWithProvider) -> bool:
existing_obj = await self.get(obj.type, obj.identifier)
# dont register if the object's providerid already exists
if existing_obj and existing_obj.provider_id == obj.provider_id:
return False
await self.kvstore.set(
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),
obj.model_dump_json(),
)
return True
async def delete(self, type: str, identifier: str) -> None:
await self.kvstore.delete(KEY_FORMAT.format(type=type, identifier=identifier))
class CachedDiskDistributionRegistry(DiskDistributionRegistry):
def __init__(self, kvstore: KVStore):
super().__init__(kvstore)
self.cache: dict[tuple[str, str], RoutableObjectWithProvider] = {}
self._initialized = False
self._initialize_lock = asyncio.Lock()
self._cache_lock = asyncio.Lock()
@asynccontextmanager
async def _locked_cache(self):
"""Context manager for safely accessing the cache with a lock."""
async with self._cache_lock:
yield self.cache
async def _ensure_initialized(self):
"""Ensures the registry is initialized before operations."""
if self._initialized:
return
async with self._initialize_lock:
if self._initialized:
return
start_key, end_key = _get_registry_key_range()
values = await self.kvstore.values_in_range(start_key, end_key)
objects = _parse_registry_values(values)
async with self._locked_cache() as cache:
for obj in objects:
cache_key = (obj.type, obj.identifier)
cache[cache_key] = obj
self._initialized = True
async def initialize(self) -> None:
await self._ensure_initialized()
def get_cached(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
return self.cache.get((type, identifier), None)
async def get_all(self) -> list[RoutableObjectWithProvider]:
await self._ensure_initialized()
async with self._locked_cache() as cache:
return list(cache.values())
async def get(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
await self._ensure_initialized()
cache_key = (type, identifier)
async with self._locked_cache() as cache:
return cache.get(cache_key, None)
async def register(self, obj: RoutableObjectWithProvider) -> bool:
await self._ensure_initialized()
success = await super().register(obj)
if success:
cache_key = (obj.type, obj.identifier)
async with self._locked_cache() as cache:
cache[cache_key] = obj
return success
async def update(self, obj: RoutableObjectWithProvider) -> None:
await super().update(obj)
cache_key = (obj.type, obj.identifier)
async with self._locked_cache() as cache:
cache[cache_key] = obj
return obj
async def delete(self, type: str, identifier: str) -> None:
await super().delete(type, identifier)
cache_key = (type, identifier)
async with self._locked_cache() as cache:
if cache_key in cache:
del cache[cache_key]
async def create_dist_registry(
metadata_store: KVStoreConfig | None,
image_name: str,
) -> tuple[CachedDiskDistributionRegistry, KVStore]:
# instantiate kvstore for storing and retrieving distribution metadata
if metadata_store:
dist_kvstore = await kvstore_impl(metadata_store)
else:
dist_kvstore = await kvstore_impl(
SqliteKVStoreConfig(db_path=(DISTRIBS_BASE_DIR / image_name / "kvstore.db").as_posix())
)
dist_registry = CachedDiskDistributionRegistry(dist_kvstore)
await dist_registry.initialize()
return dist_registry, dist_kvstore