Merge branch 'main' into santiagxf/azure-ai-inference

This commit is contained in:
Facundo Santiago 2024-11-07 12:43:55 -05:00 committed by GitHub
commit 5c429b0b67
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
273 changed files with 5491 additions and 5418 deletions

View file

@ -4,7 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, Dict, List, Optional, Protocol
from typing import Any, Dict, List, Literal, Optional, Protocol
from llama_models.llama3.api.datatypes import URL
@ -32,6 +32,7 @@ class DatasetDef(BaseModel):
@json_schema_type
class DatasetDefWithProvider(DatasetDef):
type: Literal["dataset"] = "dataset"
provider_id: str = Field(
description="ID of the provider which serves this dataset",
)

View file

@ -4,7 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
from typing import Any, Dict, List, Literal, Optional, Protocol, runtime_checkable
from llama_models.schema_utils import json_schema_type, webmethod
from pydantic import BaseModel, Field
@ -25,6 +25,7 @@ class ModelDef(BaseModel):
@json_schema_type
class ModelDefWithProvider(ModelDef):
type: Literal["model"] = "model"
provider_id: str = Field(
description="The provider ID for this model",
)

View file

@ -39,7 +39,7 @@ class RunShieldResponse(BaseModel):
class ShieldStore(Protocol):
def get_shield(self, identifier: str) -> ShieldDef: ...
async def get_shield(self, identifier: str) -> ShieldDef: ...
@runtime_checkable
@ -48,5 +48,5 @@ class Safety(Protocol):
@webmethod(route="/safety/run_shield")
async def run_shield(
self, shield_type: str, messages: List[Message], params: Dict[str, Any] = None
self, identifier: str, messages: List[Message], params: Dict[str, Any] = None
) -> RunShieldResponse: ...

View file

@ -4,7 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
from typing import Any, Dict, List, Literal, Optional, Protocol, runtime_checkable
from llama_models.schema_utils import json_schema_type, webmethod
from pydantic import BaseModel, Field
@ -53,6 +53,7 @@ class ScoringFnDef(BaseModel):
@json_schema_type
class ScoringFnDefWithProvider(ScoringFnDef):
type: Literal["scoring_fn"] = "scoring_fn"
provider_id: str = Field(
description="ID of the provider which serves this dataset",
)

View file

@ -5,7 +5,7 @@
# the root directory of this source tree.
from enum import Enum
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
from typing import Any, Dict, List, Literal, Optional, Protocol, runtime_checkable
from llama_models.schema_utils import json_schema_type, webmethod
from pydantic import BaseModel, Field
@ -23,7 +23,7 @@ class ShieldDef(BaseModel):
identifier: str = Field(
description="A unique identifier for the shield type",
)
type: str = Field(
shield_type: str = Field(
description="The type of shield this is; the value is one of the ShieldType enum"
)
params: Dict[str, Any] = Field(
@ -34,6 +34,7 @@ class ShieldDef(BaseModel):
@json_schema_type
class ShieldDefWithProvider(ShieldDef):
type: Literal["shield"] = "shield"
provider_id: str = Field(
description="The provider ID for this shield type",
)
@ -45,7 +46,7 @@ class Shields(Protocol):
async def list_shields(self) -> List[ShieldDefWithProvider]: ...
@webmethod(route="/shields/get", method="GET")
async def get_shield(self, shield_type: str) -> Optional[ShieldDefWithProvider]: ...
async def get_shield(self, identifier: str) -> Optional[ShieldDefWithProvider]: ...
@webmethod(route="/shields/register", method="POST")
async def register_shield(self, shield: ShieldDefWithProvider) -> None: ...

View file

@ -12,6 +12,10 @@ import os
from functools import lru_cache
from pathlib import Path
from llama_stack.distribution.distribution import get_provider_registry
from llama_stack.distribution.utils.dynamic import instantiate_class_type
TEMPLATES_PATH = Path(os.path.relpath(__file__)).parent.parent.parent / "templates"
@ -176,6 +180,66 @@ class StackBuild(Subcommand):
return
self._run_stack_build_command_from_build_config(build_config)
def _generate_run_config(self, build_config: BuildConfig, build_dir: Path) -> None:
"""
Generate a run.yaml template file for user to edit from a build.yaml file
"""
import json
import yaml
from termcolor import cprint
from llama_stack.distribution.build import ImageType
apis = list(build_config.distribution_spec.providers.keys())
run_config = StackRunConfig(
built_at=datetime.now(),
docker_image=(
build_config.name
if build_config.image_type == ImageType.docker.value
else None
),
image_name=build_config.name,
conda_env=(
build_config.name
if build_config.image_type == ImageType.conda.value
else None
),
apis=apis,
providers={},
)
# build providers dict
provider_registry = get_provider_registry()
for api in apis:
run_config.providers[api] = []
provider_types = build_config.distribution_spec.providers[api]
if isinstance(provider_types, str):
provider_types = [provider_types]
for i, provider_type in enumerate(provider_types):
p_spec = Provider(
provider_id=f"{provider_type}-{i}",
provider_type=provider_type,
config={},
)
config_type = instantiate_class_type(
provider_registry[Api(api)][provider_type].config_class
)
p_spec.config = config_type()
run_config.providers[api].append(p_spec)
os.makedirs(build_dir, exist_ok=True)
run_config_file = build_dir / f"{build_config.name}-run.yaml"
with open(run_config_file, "w") as f:
to_write = json.loads(run_config.model_dump_json())
f.write(yaml.dump(to_write, sort_keys=False))
cprint(
f"You can now edit {run_config_file} and run `llama stack run {run_config_file}`",
color="green",
)
def _run_stack_build_command_from_build_config(
self, build_config: BuildConfig
) -> None:
@ -183,48 +247,24 @@ class StackBuild(Subcommand):
import os
import yaml
from termcolor import cprint
from llama_stack.distribution.build import build_image, ImageType
from llama_stack.distribution.build import build_image
from llama_stack.distribution.utils.config_dirs import DISTRIBS_BASE_DIR
from llama_stack.distribution.utils.serialize import EnumEncoder
# save build.yaml spec for building same distribution again
if build_config.image_type == ImageType.docker.value:
# docker needs build file to be in the llama-stack repo dir to be able to copy over to the image
llama_stack_path = Path(
os.path.abspath(__file__)
).parent.parent.parent.parent
build_dir = llama_stack_path / "tmp/configs/"
else:
build_dir = DISTRIBS_BASE_DIR / f"llamastack-{build_config.name}"
build_dir = DISTRIBS_BASE_DIR / f"llamastack-{build_config.name}"
os.makedirs(build_dir, exist_ok=True)
build_file_path = build_dir / f"{build_config.name}-build.yaml"
with open(build_file_path, "w") as f:
to_write = json.loads(json.dumps(build_config.dict(), cls=EnumEncoder))
to_write = json.loads(build_config.model_dump_json())
f.write(yaml.dump(to_write, sort_keys=False))
return_code = build_image(build_config, build_file_path)
if return_code != 0:
return
configure_name = (
build_config.name
if build_config.image_type == "conda"
else (f"llamastack-{build_config.name}")
)
if build_config.image_type == "conda":
cprint(
f"You can now run `llama stack configure {configure_name}`",
color="green",
)
else:
cprint(
f"You can now edit your run.yaml file and run `docker run -it -p 5000:5000 {build_config.name}`. See full command in llama-stack/distributions/",
color="green",
)
self._generate_run_config(build_config, build_dir)
def _run_template_list_cmd(self, args: argparse.Namespace) -> None:
import json

View file

@ -7,8 +7,6 @@
import argparse
from llama_stack.cli.subcommand import Subcommand
from llama_stack.distribution.utils.config_dirs import BUILDS_BASE_DIR
from llama_stack.distribution.datatypes import * # noqa: F403
class StackConfigure(Subcommand):
@ -39,123 +37,10 @@ class StackConfigure(Subcommand):
)
def _run_stack_configure_cmd(self, args: argparse.Namespace) -> None:
import json
import os
import subprocess
from pathlib import Path
import pkg_resources
import yaml
from termcolor import cprint
from llama_stack.distribution.build import ImageType
from llama_stack.distribution.utils.exec import run_with_pty
docker_image = None
build_config_file = Path(args.config)
if build_config_file.exists():
with open(build_config_file, "r") as f:
build_config = BuildConfig(**yaml.safe_load(f))
self._configure_llama_distribution(build_config, args.output_dir)
return
conda_dir = (
Path(os.path.expanduser("~/.conda/envs")) / f"llamastack-{args.config}"
)
output = subprocess.check_output(["bash", "-c", "conda info --json"])
conda_envs = json.loads(output.decode("utf-8"))["envs"]
for x in conda_envs:
if x.endswith(f"/llamastack-{args.config}"):
conda_dir = Path(x)
break
build_config_file = Path(conda_dir) / f"{args.config}-build.yaml"
if build_config_file.exists():
with open(build_config_file, "r") as f:
build_config = BuildConfig(**yaml.safe_load(f))
cprint(f"Using {build_config_file}...", "green")
self._configure_llama_distribution(build_config, args.output_dir)
return
docker_image = args.config
builds_dir = BUILDS_BASE_DIR / ImageType.docker.value
if args.output_dir:
builds_dir = Path(output_dir)
os.makedirs(builds_dir, exist_ok=True)
script = pkg_resources.resource_filename(
"llama_stack", "distribution/configure_container.sh"
)
script_args = [script, docker_image, str(builds_dir)]
return_code = run_with_pty(script_args)
if return_code != 0:
self.parser.error(
f"Failed to configure container {docker_image} with return code {return_code}. Please run `llama stack build` first. "
)
def _configure_llama_distribution(
self,
build_config: BuildConfig,
output_dir: Optional[str] = None,
):
import json
import os
from pathlib import Path
import yaml
from termcolor import cprint
from llama_stack.distribution.configure import (
configure_api_providers,
parse_and_maybe_upgrade_config,
)
from llama_stack.distribution.utils.serialize import EnumEncoder
builds_dir = BUILDS_BASE_DIR / build_config.image_type
if output_dir:
builds_dir = Path(output_dir)
os.makedirs(builds_dir, exist_ok=True)
image_name = build_config.name.replace("::", "-")
run_config_file = builds_dir / f"{image_name}-run.yaml"
if run_config_file.exists():
cprint(
f"Configuration already exists at `{str(run_config_file)}`. Will overwrite...",
"yellow",
attrs=["bold"],
)
config_dict = yaml.safe_load(run_config_file.read_text())
config = parse_and_maybe_upgrade_config(config_dict)
else:
config = StackRunConfig(
built_at=datetime.now(),
image_name=image_name,
apis=list(build_config.distribution_spec.providers.keys()),
providers={},
)
config = configure_api_providers(config, build_config.distribution_spec)
config.docker_image = (
image_name if build_config.image_type == "docker" else None
)
config.conda_env = image_name if build_config.image_type == "conda" else None
with open(run_config_file, "w") as f:
to_write = json.loads(json.dumps(config.dict(), cls=EnumEncoder))
f.write(yaml.dump(to_write, sort_keys=False))
cprint(
f"> YAML configuration has been written to `{run_config_file}`.",
color="blue",
)
cprint(
f"You can now run `llama stack run {image_name} --port PORT`",
color="green",
self.parser.error(
"""
DEPRECATED! llama stack configure has been deprecated.
Please use llama stack run --config <path/to/run.yaml> instead.
Please see example run.yaml in /distributions folder.
"""
)

View file

@ -45,7 +45,6 @@ class StackRun(Subcommand):
import pkg_resources
import yaml
from termcolor import cprint
from llama_stack.distribution.build import ImageType
from llama_stack.distribution.configure import parse_and_maybe_upgrade_config
@ -71,14 +70,12 @@ class StackRun(Subcommand):
if not config_file.exists():
self.parser.error(
f"File {str(config_file)} does not exist. Please run `llama stack build` and `llama stack configure <name>` to generate a run.yaml file"
f"File {str(config_file)} does not exist. Please run `llama stack build` to generate (and optionally edit) a run.yaml file"
)
return
cprint(f"Using config `{config_file}`", "green")
with open(config_file, "r") as f:
config_dict = yaml.safe_load(config_file.read_text())
config = parse_and_maybe_upgrade_config(config_dict)
config_dict = yaml.safe_load(config_file.read_text())
config = parse_and_maybe_upgrade_config(config_dict)
if config.docker_image:
script = pkg_resources.resource_filename(

View file

@ -25,6 +25,7 @@ from llama_stack.distribution.utils.config_dirs import BUILDS_BASE_DIR
# These are the dependencies needed by the distribution server.
# `llama-stack` is automatically installed by the installation script.
SERVER_DEPENDENCIES = [
"aiosqlite",
"fastapi",
"fire",
"httpx",

View file

@ -36,7 +36,6 @@ SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
REPO_DIR=$(dirname $(dirname "$SCRIPT_DIR"))
DOCKER_BINARY=${DOCKER_BINARY:-docker}
DOCKER_OPTS=${DOCKER_OPTS:-}
REPO_CONFIGS_DIR="$REPO_DIR/tmp/configs"
TEMP_DIR=$(mktemp -d)
@ -115,8 +114,6 @@ ENTRYPOINT ["python", "-m", "llama_stack.distribution.server.server"]
EOF
add_to_docker "ADD tmp/configs/$(basename "$build_file_path") ./llamastack-build.yaml"
printf "Dockerfile created successfully in $TEMP_DIR/Dockerfile"
cat $TEMP_DIR/Dockerfile
printf "\n"
@ -138,7 +135,6 @@ set -x
$DOCKER_BINARY build $DOCKER_OPTS -t $image_name -f "$TEMP_DIR/Dockerfile" "$REPO_DIR" $mounts
# clean up tmp/configs
rm -rf $REPO_CONFIGS_DIR
set +x
echo "Success!"

View file

@ -83,6 +83,7 @@ def create_api_client_class(protocol, additional_protocol) -> Type:
j = response.json()
if j is None:
return None
# print(f"({protocol.__name__}) Returning {j}, type {return_type}")
return parse_obj_as(return_type, j)
async def _call_streaming(self, method_name: str, *args, **kwargs) -> Any:
@ -102,14 +103,15 @@ def create_api_client_class(protocol, additional_protocol) -> Type:
if line.startswith("data:"):
data = line[len("data: ") :]
try:
data = json.loads(data)
if "error" in data:
cprint(data, "red")
continue
yield parse_obj_as(return_type, json.loads(data))
yield parse_obj_as(return_type, data)
except Exception as e:
print(data)
print(f"Error with parsing or validation: {e}")
print(data)
def httpx_request_params(self, method_name: str, *args, **kwargs) -> dict:
webmethod, sig = self.routes[method_name]
@ -141,14 +143,21 @@ def create_api_client_class(protocol, additional_protocol) -> Type:
else:
data.update(convert(kwargs))
return dict(
ret = dict(
method=webmethod.method or "POST",
url=url,
headers={"Content-Type": "application/json"},
params=params,
json=data,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=30,
)
if params:
ret["params"] = params
if data:
ret["json"] = data
return ret
# Add protocol methods to the wrapper
for p in protocols:

View file

@ -21,6 +21,7 @@ from llama_stack.apis.inference import Inference
from llama_stack.apis.memory import Memory
from llama_stack.apis.safety import Safety
from llama_stack.apis.scoring import Scoring
from llama_stack.providers.utils.kvstore.config import KVStoreConfig
LLAMA_STACK_BUILD_CONFIG_VERSION = "2"
LLAMA_STACK_RUN_CONFIG_VERSION = "2"
@ -37,12 +38,16 @@ RoutableObject = Union[
ScoringFnDef,
]
RoutableObjectWithProvider = Union[
ModelDefWithProvider,
ShieldDefWithProvider,
MemoryBankDefWithProvider,
DatasetDefWithProvider,
ScoringFnDefWithProvider,
RoutableObjectWithProvider = Annotated[
Union[
ModelDefWithProvider,
ShieldDefWithProvider,
MemoryBankDefWithProvider,
DatasetDefWithProvider,
ScoringFnDefWithProvider,
],
Field(discriminator="type"),
]
RoutedProtocol = Union[
@ -134,6 +139,12 @@ One or more providers to use for each API. The same provider_type (e.g., meta-re
can be instantiated multiple times (with different configs) if necessary.
""",
)
metadata_store: Optional[KVStoreConfig] = Field(
default=None,
description="""
Configuration for the persistence store used by the distribution registry. If not specified,
a default SQLite store will be used.""",
)
class BuildConfig(BaseModel):

View file

@ -26,6 +26,7 @@ from llama_stack.apis.scoring_functions import ScoringFunctions
from llama_stack.apis.shields import Shields
from llama_stack.apis.telemetry import Telemetry
from llama_stack.distribution.distribution import builtin_automatically_routed_apis
from llama_stack.distribution.store import DistributionRegistry
from llama_stack.distribution.utils.dynamic import instantiate_class_type
@ -65,7 +66,9 @@ class ProviderWithSpec(Provider):
# TODO: this code is not very straightforward to follow and needs one more round of refactoring
async def resolve_impls(
run_config: StackRunConfig, provider_registry: Dict[Api, Dict[str, ProviderSpec]]
run_config: StackRunConfig,
provider_registry: Dict[Api, Dict[str, ProviderSpec]],
dist_registry: DistributionRegistry,
) -> Dict[Api, Any]:
"""
Does two things:
@ -189,6 +192,7 @@ async def resolve_impls(
provider,
deps,
inner_impls,
dist_registry,
)
# TODO: ugh slightly redesign this shady looking code
if "inner-" in api_str:
@ -237,6 +241,7 @@ async def instantiate_provider(
provider: ProviderWithSpec,
deps: Dict[str, Any],
inner_impls: Dict[str, Any],
dist_registry: DistributionRegistry,
):
protocols = api_protocol_map()
additional_protocols = additional_protocols_map()
@ -270,7 +275,7 @@ async def instantiate_provider(
method = "get_routing_table_impl"
config = None
args = [provider_spec.api, inner_impls, deps]
args = [provider_spec.api, inner_impls, deps, dist_registry]
else:
method = "get_provider_impl"

View file

@ -7,6 +7,9 @@
from typing import Any
from llama_stack.distribution.datatypes import * # noqa: F403
from llama_stack.distribution.store import DistributionRegistry
from .routing_tables import (
DatasetsRoutingTable,
MemoryBanksRoutingTable,
@ -20,6 +23,7 @@ async def get_routing_table_impl(
api: Api,
impls_by_provider_id: Dict[str, RoutedProtocol],
_deps,
dist_registry: DistributionRegistry,
) -> Any:
api_to_tables = {
"memory_banks": MemoryBanksRoutingTable,
@ -32,7 +36,7 @@ async def get_routing_table_impl(
if api.value not in api_to_tables:
raise ValueError(f"API {api.value} not found in router map")
impl = api_to_tables[api.value](impls_by_provider_id)
impl = api_to_tables[api.value](impls_by_provider_id, dist_registry)
await impl.initialize()
return impl

View file

@ -154,12 +154,12 @@ class SafetyRouter(Safety):
async def run_shield(
self,
shield_type: str,
identifier: str,
messages: List[Message],
params: Dict[str, Any] = None,
) -> RunShieldResponse:
return await self.routing_table.get_provider_impl(shield_type).run_shield(
shield_type=shield_type,
return await self.routing_table.get_provider_impl(identifier).run_shield(
identifier=identifier,
messages=messages,
params=params,
)

View file

@ -13,6 +13,7 @@ from llama_stack.apis.shields import * # noqa: F403
from llama_stack.apis.memory_banks import * # noqa: F403
from llama_stack.apis.datasets import * # noqa: F403
from llama_stack.distribution.store import DistributionRegistry
from llama_stack.distribution.datatypes import * # noqa: F403
@ -46,25 +47,23 @@ async def register_object_with_provider(obj: RoutableObject, p: Any) -> None:
Registry = Dict[str, List[RoutableObjectWithProvider]]
# TODO: this routing table maintains state in memory purely. We need to
# add persistence to it when we add dynamic registration of objects.
class CommonRoutingTableImpl(RoutingTable):
def __init__(
self,
impls_by_provider_id: Dict[str, RoutedProtocol],
dist_registry: DistributionRegistry,
) -> None:
self.impls_by_provider_id = impls_by_provider_id
self.dist_registry = dist_registry
async def initialize(self) -> None:
self.registry: Registry = {}
# Initialize the registry if not already done
await self.dist_registry.initialize()
def add_objects(
async def add_objects(
objs: List[RoutableObjectWithProvider], provider_id: str, cls
) -> None:
for obj in objs:
if obj.identifier not in self.registry:
self.registry[obj.identifier] = []
if cls is None:
obj.provider_id = provider_id
else:
@ -74,34 +73,35 @@ class CommonRoutingTableImpl(RoutingTable):
obj.provider_id = provider_id
else:
obj = cls(**obj.model_dump(), provider_id=provider_id)
self.registry[obj.identifier].append(obj)
await self.dist_registry.register(obj)
# Register all objects from providers
for pid, p in self.impls_by_provider_id.items():
api = get_impl_api(p)
if api == Api.inference:
p.model_store = self
models = await p.list_models()
add_objects(models, pid, ModelDefWithProvider)
await add_objects(models, pid, ModelDefWithProvider)
elif api == Api.safety:
p.shield_store = self
shields = await p.list_shields()
add_objects(shields, pid, ShieldDefWithProvider)
await add_objects(shields, pid, ShieldDefWithProvider)
elif api == Api.memory:
p.memory_bank_store = self
memory_banks = await p.list_memory_banks()
add_objects(memory_banks, pid, None)
await add_objects(memory_banks, pid, None)
elif api == Api.datasetio:
p.dataset_store = self
datasets = await p.list_datasets()
add_objects(datasets, pid, DatasetDefWithProvider)
await add_objects(datasets, pid, DatasetDefWithProvider)
elif api == Api.scoring:
p.scoring_function_store = self
scoring_functions = await p.list_scoring_functions()
add_objects(scoring_functions, pid, ScoringFnDefWithProvider)
await add_objects(scoring_functions, pid, ScoringFnDefWithProvider)
async def shutdown(self) -> None:
for p in self.impls_by_provider_id.values():
@ -124,39 +124,49 @@ class CommonRoutingTableImpl(RoutingTable):
else:
raise ValueError("Unknown routing table type")
if routing_key not in self.registry:
# Get objects from disk registry
objects = self.dist_registry.get_cached(routing_key)
if not objects:
apiname, objname = apiname_object()
provider_ids = list(self.impls_by_provider_id.keys())
if len(provider_ids) > 1:
provider_ids_str = f"any of the providers: {', '.join(provider_ids)}"
else:
provider_ids_str = f"provider: `{provider_ids[0]}`"
raise ValueError(
f"`{routing_key}` not registered. Make sure there is an {apiname} provider serving this {objname}."
f"{objname.capitalize()} `{routing_key}` not served by {provider_ids_str}. Make sure there is an {apiname} provider serving this {objname}."
)
objs = self.registry[routing_key]
for obj in objs:
for obj in objects:
if not provider_id or provider_id == obj.provider_id:
return self.impls_by_provider_id[obj.provider_id]
raise ValueError(f"Provider not found for `{routing_key}`")
def get_object_by_identifier(
async def get_object_by_identifier(
self, identifier: str
) -> Optional[RoutableObjectWithProvider]:
objs = self.registry.get(identifier, [])
if not objs:
# Get from disk registry
objects = await self.dist_registry.get(identifier)
if not objects:
return None
# kind of ill-defined behavior here, but we'll just return the first one
return objs[0]
return objects[0]
async def register_object(self, obj: RoutableObjectWithProvider):
entries = self.registry.get(obj.identifier, [])
for entry in entries:
if entry.provider_id == obj.provider_id or not obj.provider_id:
# Get existing objects from registry
existing_objects = await self.dist_registry.get(obj.identifier)
# Check for existing registration
for existing_obj in existing_objects:
if existing_obj.provider_id == obj.provider_id or not obj.provider_id:
print(
f"`{obj.identifier}` already registered with `{entry.provider_id}`"
f"`{obj.identifier}` already registered with `{existing_obj.provider_id}`"
)
return
# if provider_id is not specified, we'll pick an arbitrary one from existing entries
# if provider_id is not specified, pick an arbitrary one from existing entries
if not obj.provider_id and len(self.impls_by_provider_id) > 0:
obj.provider_id = list(self.impls_by_provider_id.keys())[0]
@ -166,23 +176,25 @@ class CommonRoutingTableImpl(RoutingTable):
p = self.impls_by_provider_id[obj.provider_id]
await register_object_with_provider(obj, p)
await self.dist_registry.register(obj)
if obj.identifier not in self.registry:
self.registry[obj.identifier] = []
self.registry[obj.identifier].append(obj)
async def get_all_with_type(self, type: str) -> List[RoutableObjectWithProvider]:
objs = await self.dist_registry.get_all()
return [obj for obj in objs if obj.type == type]
# TODO: persist this to a store
async def get_all_with_types(
self, types: List[str]
) -> List[RoutableObjectWithProvider]:
objs = await self.dist_registry.get_all()
return [obj for obj in objs if obj.type in types]
class ModelsRoutingTable(CommonRoutingTableImpl, Models):
async def list_models(self) -> List[ModelDefWithProvider]:
objects = []
for objs in self.registry.values():
objects.extend(objs)
return objects
return await self.get_all_with_type("model")
async def get_model(self, identifier: str) -> Optional[ModelDefWithProvider]:
return self.get_object_by_identifier(identifier)
return await self.get_object_by_identifier(identifier)
async def register_model(self, model: ModelDefWithProvider) -> None:
await self.register_object(model)
@ -190,13 +202,10 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models):
class ShieldsRoutingTable(CommonRoutingTableImpl, Shields):
async def list_shields(self) -> List[ShieldDef]:
objects = []
for objs in self.registry.values():
objects.extend(objs)
return objects
return await self.get_all_with_type("shield")
async def get_shield(self, shield_type: str) -> Optional[ShieldDefWithProvider]:
return self.get_object_by_identifier(shield_type)
async def get_shield(self, identifier: str) -> Optional[ShieldDefWithProvider]:
return await self.get_object_by_identifier(identifier)
async def register_shield(self, shield: ShieldDefWithProvider) -> None:
await self.register_object(shield)
@ -204,15 +213,19 @@ class ShieldsRoutingTable(CommonRoutingTableImpl, Shields):
class MemoryBanksRoutingTable(CommonRoutingTableImpl, MemoryBanks):
async def list_memory_banks(self) -> List[MemoryBankDefWithProvider]:
objects = []
for objs in self.registry.values():
objects.extend(objs)
return objects
return await self.get_all_with_types(
[
MemoryBankType.vector.value,
MemoryBankType.keyvalue.value,
MemoryBankType.keyword.value,
MemoryBankType.graph.value,
]
)
async def get_memory_bank(
self, identifier: str
) -> Optional[MemoryBankDefWithProvider]:
return self.get_object_by_identifier(identifier)
return await self.get_object_by_identifier(identifier)
async def register_memory_bank(
self, memory_bank: MemoryBankDefWithProvider
@ -222,15 +235,12 @@ class MemoryBanksRoutingTable(CommonRoutingTableImpl, MemoryBanks):
class DatasetsRoutingTable(CommonRoutingTableImpl, Datasets):
async def list_datasets(self) -> List[DatasetDefWithProvider]:
objects = []
for objs in self.registry.values():
objects.extend(objs)
return objects
return await self.get_all_with_type("dataset")
async def get_dataset(
self, dataset_identifier: str
) -> Optional[DatasetDefWithProvider]:
return self.get_object_by_identifier(dataset_identifier)
return await self.get_object_by_identifier(dataset_identifier)
async def register_dataset(self, dataset_def: DatasetDefWithProvider) -> None:
await self.register_object(dataset_def)
@ -238,15 +248,12 @@ class DatasetsRoutingTable(CommonRoutingTableImpl, Datasets):
class ScoringFunctionsRoutingTable(CommonRoutingTableImpl, Scoring):
async def list_scoring_functions(self) -> List[ScoringFnDefWithProvider]:
objects = []
for objs in self.registry.values():
objects.extend(objs)
return objects
return await self.get_all_with_type("scoring_function")
async def get_scoring_function(
self, name: str
) -> Optional[ScoringFnDefWithProvider]:
return self.get_object_by_identifier(name)
return await self.get_object_by_identifier(name)
async def register_scoring_function(
self, function_def: ScoringFnDefWithProvider

View file

@ -31,6 +31,8 @@ from llama_stack.distribution.distribution import (
get_provider_registry,
)
from llama_stack.distribution.utils.config_dirs import DISTRIBS_BASE_DIR
from llama_stack.providers.utils.telemetry.tracing import (
end_trace,
setup_logger,
@ -38,9 +40,10 @@ from llama_stack.providers.utils.telemetry.tracing import (
start_trace,
)
from llama_stack.distribution.datatypes import * # noqa: F403
from llama_stack.distribution.request_headers import set_request_provider_data
from llama_stack.distribution.resolver import resolve_impls
from llama_stack.distribution.store import CachedDiskDistributionRegistry
from llama_stack.providers.utils.kvstore import kvstore_impl, SqliteKVStoreConfig
from .endpoints import get_all_api_endpoints
@ -206,7 +209,8 @@ async def maybe_await(value):
async def sse_generator(event_gen):
try:
async for item in await event_gen:
event_gen = await event_gen
async for item in event_gen:
yield create_sse_event(item)
await asyncio.sleep(0.01)
except asyncio.CancelledError:
@ -226,7 +230,6 @@ async def sse_generator(event_gen):
def create_dynamic_typed_route(func: Any, method: str):
async def endpoint(request: Request, **kwargs):
await start_trace(func.__name__)
@ -278,8 +281,23 @@ def main(
config = StackRunConfig(**yaml.safe_load(fp))
app = FastAPI()
# instantiate kvstore for storing and retrieving distribution metadata
if config.metadata_store:
dist_kvstore = asyncio.run(kvstore_impl(config.metadata_store))
else:
dist_kvstore = asyncio.run(
kvstore_impl(
SqliteKVStoreConfig(
db_path=(
DISTRIBS_BASE_DIR / config.image_name / "kvstore.db"
).as_posix()
)
)
)
impls = asyncio.run(resolve_impls(config, get_provider_registry()))
dist_registry = CachedDiskDistributionRegistry(dist_kvstore)
impls = asyncio.run(resolve_impls(config, get_provider_registry(), dist_registry))
if Api.telemetry in impls:
setup_logger(impls[Api.telemetry])

View file

@ -0,0 +1,7 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .registry import * # noqa: F401 F403

View file

@ -0,0 +1,135 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
from typing import Dict, List, Protocol
import pydantic
from llama_stack.distribution.datatypes import RoutableObjectWithProvider
from llama_stack.providers.utils.kvstore import KVStore
class DistributionRegistry(Protocol):
async def get_all(self) -> List[RoutableObjectWithProvider]: ...
async def initialize(self) -> None: ...
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
# The current data structure allows multiple objects with the same identifier but different providers.
# This is not ideal - we should have a single object that can be served by multiple providers,
# suggesting a data structure like (obj: Obj, providers: List[str]) rather than List[RoutableObjectWithProvider].
# The current approach could lead to inconsistencies if the same logical object has different data across providers.
async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
KEY_FORMAT = "distributions:registry:{}"
class DiskDistributionRegistry(DistributionRegistry):
def __init__(self, kvstore: KVStore):
self.kvstore = kvstore
async def initialize(self) -> None:
pass
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]:
# Disk registry does not have a cache
return []
async def get_all(self) -> List[RoutableObjectWithProvider]:
start_key = KEY_FORMAT.format("")
end_key = KEY_FORMAT.format("\xff")
keys = await self.kvstore.range(start_key, end_key)
return [await self.get(key.split(":")[-1]) for key in keys]
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]:
json_str = await self.kvstore.get(KEY_FORMAT.format(identifier))
if not json_str:
return []
objects_data = json.loads(json_str)
return [
pydantic.parse_obj_as(
RoutableObjectWithProvider,
json.loads(obj_str),
)
for obj_str in objects_data
]
async def register(self, obj: RoutableObjectWithProvider) -> bool:
existing_objects = await self.get(obj.identifier)
# dont register if the object's providerid already exists
for eobj in existing_objects:
if eobj.provider_id == obj.provider_id:
return False
existing_objects.append(obj)
objects_json = [
obj.model_dump_json() for obj in existing_objects
] # Fixed variable name
await self.kvstore.set(
KEY_FORMAT.format(obj.identifier), json.dumps(objects_json)
)
return True
class CachedDiskDistributionRegistry(DiskDistributionRegistry):
def __init__(self, kvstore: KVStore):
super().__init__(kvstore)
self.cache: Dict[str, List[RoutableObjectWithProvider]] = {}
async def initialize(self) -> None:
start_key = KEY_FORMAT.format("")
end_key = KEY_FORMAT.format("\xff")
keys = await self.kvstore.range(start_key, end_key)
for key in keys:
identifier = key.split(":")[-1]
objects = await super().get(identifier)
if objects:
self.cache[identifier] = objects
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]:
return self.cache.get(identifier, [])
async def get_all(self) -> List[RoutableObjectWithProvider]:
return [item for sublist in self.cache.values() for item in sublist]
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]:
if identifier in self.cache:
return self.cache[identifier]
objects = await super().get(identifier)
if objects:
self.cache[identifier] = objects
return objects
async def register(self, obj: RoutableObjectWithProvider) -> bool:
# First update disk
success = await super().register(obj)
if success:
# Then update cache
if obj.identifier not in self.cache:
self.cache[obj.identifier] = []
# Check if provider already exists in cache
for cached_obj in self.cache[obj.identifier]:
if cached_obj.provider_id == obj.provider_id:
return success
# If not, update cache
self.cache[obj.identifier].append(obj)
return success

View file

@ -0,0 +1,171 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import os
import pytest
import pytest_asyncio
from llama_stack.distribution.store import * # noqa F403
from llama_stack.apis.inference import ModelDefWithProvider
from llama_stack.apis.memory_banks import VectorMemoryBankDef
from llama_stack.providers.utils.kvstore import kvstore_impl, SqliteKVStoreConfig
from llama_stack.distribution.datatypes import * # noqa F403
@pytest.fixture
def config():
config = SqliteKVStoreConfig(db_path="/tmp/test_registry.db")
if os.path.exists(config.db_path):
os.remove(config.db_path)
return config
@pytest_asyncio.fixture
async def registry(config):
registry = DiskDistributionRegistry(await kvstore_impl(config))
await registry.initialize()
return registry
@pytest_asyncio.fixture
async def cached_registry(config):
registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
await registry.initialize()
return registry
@pytest.fixture
def sample_bank():
return VectorMemoryBankDef(
identifier="test_bank",
embedding_model="all-MiniLM-L6-v2",
chunk_size_in_tokens=512,
overlap_size_in_tokens=64,
provider_id="test-provider",
)
@pytest.fixture
def sample_model():
return ModelDefWithProvider(
identifier="test_model",
llama_model="Llama3.2-3B-Instruct",
provider_id="test-provider",
)
@pytest.mark.asyncio
async def test_registry_initialization(registry):
# Test empty registry
results = await registry.get("nonexistent")
assert len(results) == 0
@pytest.mark.asyncio
async def test_basic_registration(registry, sample_bank, sample_model):
print(f"Registering {sample_bank}")
await registry.register(sample_bank)
print(f"Registering {sample_model}")
await registry.register(sample_model)
print("Getting bank")
results = await registry.get("test_bank")
assert len(results) == 1
result_bank = results[0]
assert result_bank.identifier == sample_bank.identifier
assert result_bank.embedding_model == sample_bank.embedding_model
assert result_bank.chunk_size_in_tokens == sample_bank.chunk_size_in_tokens
assert result_bank.overlap_size_in_tokens == sample_bank.overlap_size_in_tokens
assert result_bank.provider_id == sample_bank.provider_id
results = await registry.get("test_model")
assert len(results) == 1
result_model = results[0]
assert result_model.identifier == sample_model.identifier
assert result_model.llama_model == sample_model.llama_model
assert result_model.provider_id == sample_model.provider_id
@pytest.mark.asyncio
async def test_cached_registry_initialization(config, sample_bank, sample_model):
# First populate the disk registry
disk_registry = DiskDistributionRegistry(await kvstore_impl(config))
await disk_registry.initialize()
await disk_registry.register(sample_bank)
await disk_registry.register(sample_model)
# Test cached version loads from disk
cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
await cached_registry.initialize()
results = await cached_registry.get("test_bank")
assert len(results) == 1
result_bank = results[0]
assert result_bank.identifier == sample_bank.identifier
assert result_bank.embedding_model == sample_bank.embedding_model
assert result_bank.chunk_size_in_tokens == sample_bank.chunk_size_in_tokens
assert result_bank.overlap_size_in_tokens == sample_bank.overlap_size_in_tokens
assert result_bank.provider_id == sample_bank.provider_id
@pytest.mark.asyncio
async def test_cached_registry_updates(config):
cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
await cached_registry.initialize()
new_bank = VectorMemoryBankDef(
identifier="test_bank_2",
embedding_model="all-MiniLM-L6-v2",
chunk_size_in_tokens=256,
overlap_size_in_tokens=32,
provider_id="baz",
)
await cached_registry.register(new_bank)
# Verify in cache
results = await cached_registry.get("test_bank_2")
assert len(results) == 1
result_bank = results[0]
assert result_bank.identifier == new_bank.identifier
assert result_bank.provider_id == new_bank.provider_id
# Verify persisted to disk
new_registry = DiskDistributionRegistry(await kvstore_impl(config))
await new_registry.initialize()
results = await new_registry.get("test_bank_2")
assert len(results) == 1
result_bank = results[0]
assert result_bank.identifier == new_bank.identifier
assert result_bank.provider_id == new_bank.provider_id
@pytest.mark.asyncio
async def test_duplicate_provider_registration(config):
cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
await cached_registry.initialize()
original_bank = VectorMemoryBankDef(
identifier="test_bank_2",
embedding_model="all-MiniLM-L6-v2",
chunk_size_in_tokens=256,
overlap_size_in_tokens=32,
provider_id="baz",
)
await cached_registry.register(original_bank)
duplicate_bank = VectorMemoryBankDef(
identifier="test_bank_2",
embedding_model="different-model",
chunk_size_in_tokens=128,
overlap_size_in_tokens=16,
provider_id="baz", # Same provider_id
)
await cached_registry.register(duplicate_bank)
results = await cached_registry.get("test_bank_2")
assert len(results) == 1 # Still only one result
assert (
results[0].embedding_model == original_bank.embedding_model
) # Original values preserved

View file

@ -1,15 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import VLLMImplConfig
from .vllm import VLLMInferenceAdapter
async def get_adapter_impl(config: VLLMImplConfig, _deps):
assert isinstance(config, VLLMImplConfig), f"Unexpected config type: {type(config)}"
impl = VLLMInferenceAdapter(config)
await impl.initialize()
return impl

View file

@ -1,16 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel, Field
class BedrockSafetyConfig(BaseModel):
"""Configuration information for a guardrail that you want to use in the request."""
aws_profile: str = Field(
default="default",
description="The profile on the machine having valid aws credentials. This will ensure separation of creation to invocation",
)

View file

@ -1,26 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Optional
from llama_models.schema_utils import json_schema_type
from pydantic import BaseModel, Field
class TogetherProviderDataValidator(BaseModel):
together_api_key: str
@json_schema_type
class TogetherSafetyConfig(BaseModel):
url: str = Field(
default="https://api.together.xyz/v1",
description="The URL for the Together AI server",
)
api_key: Optional[str] = Field(
default=None,
description="The Together AI API Key (default for the distribution, if any)",
)

View file

@ -1,101 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from together import Together
from llama_models.llama3.api.datatypes import * # noqa: F403
from llama_stack.apis.safety import * # noqa: F403
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import ShieldsProtocolPrivate
from .config import TogetherSafetyConfig
TOGETHER_SHIELD_MODEL_MAP = {
"llama_guard": "meta-llama/Meta-Llama-Guard-3-8B",
"Llama-Guard-3-8B": "meta-llama/Meta-Llama-Guard-3-8B",
"Llama-Guard-3-11B-Vision": "meta-llama/Llama-Guard-3-11B-Vision-Turbo",
}
class TogetherSafetyImpl(Safety, NeedsRequestProviderData, ShieldsProtocolPrivate):
def __init__(self, config: TogetherSafetyConfig) -> None:
self.config = config
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
pass
async def register_shield(self, shield: ShieldDef) -> None:
raise ValueError("Registering dynamic shields is not supported")
async def list_shields(self) -> List[ShieldDef]:
return [
ShieldDef(
identifier=ShieldType.llama_guard.value,
type=ShieldType.llama_guard.value,
params={},
)
]
async def run_shield(
self, shield_type: str, messages: List[Message], params: Dict[str, Any] = None
) -> RunShieldResponse:
shield_def = await self.shield_store.get_shield(shield_type)
if not shield_def:
raise ValueError(f"Unknown shield {shield_type}")
model = shield_def.params.get("model", "llama_guard")
if model not in TOGETHER_SHIELD_MODEL_MAP:
raise ValueError(f"Unsupported safety model: {model}")
together_api_key = None
if self.config.api_key is not None:
together_api_key = self.config.api_key
else:
provider_data = self.get_request_provider_data()
if provider_data is None or not provider_data.together_api_key:
raise ValueError(
'Pass Together API Key in the header X-LlamaStack-ProviderData as { "together_api_key": <your api key>}'
)
together_api_key = provider_data.together_api_key
# messages can have role assistant or user
api_messages = []
for message in messages:
if message.role in (Role.user.value, Role.assistant.value):
api_messages.append({"role": message.role, "content": message.content})
violation = await get_safety_response(
together_api_key, TOGETHER_SHIELD_MODEL_MAP[model], api_messages
)
return RunShieldResponse(violation=violation)
async def get_safety_response(
api_key: str, model_name: str, messages: List[Dict[str, str]]
) -> Optional[SafetyViolation]:
client = Together(api_key=api_key)
response = client.chat.completions.create(messages=messages, model=model_name)
if len(response.choices) == 0:
return None
response_text = response.choices[0].message.content
if response_text == "safe":
return None
parts = response_text.split("\n")
if len(parts) != 2:
return None
if parts[0] == "unsafe":
return SafetyViolation(
violation_level=ViolationLevel.ERROR,
metadata={"violation_type": parts[1]},
)
return None

View file

@ -6,6 +6,7 @@
from enum import Enum
from typing import Any, List, Optional, Protocol
from urllib.parse import urlparse
from llama_models.schema_utils import json_schema_type
from pydantic import BaseModel, Field
@ -145,11 +146,19 @@ Fully-qualified name of the module to import. The module is expected to have:
class RemoteProviderConfig(BaseModel):
host: str = "localhost"
port: int
port: Optional[int] = None
protocol: str = "http"
@property
def url(self) -> str:
return f"http://{self.host}:{self.port}"
if self.port is None:
return f"{self.protocol}://{self.host}"
return f"{self.protocol}://{self.host}:{self.port}"
@classmethod
def from_url(cls, url: str) -> "RemoteProviderConfig":
parsed = urlparse(url)
return cls(host=parsed.hostname, port=parsed.port, protocol=parsed.scheme)
@json_schema_type

View file

@ -1,120 +0,0 @@
# LocalInference
LocalInference provides a local inference implementation powered by [executorch](https://github.com/pytorch/executorch/).
Llama Stack currently supports on-device inference for iOS with Android coming soon. You can run on-device inference on Android today using [executorch](https://github.com/pytorch/executorch/tree/main/examples/demo-apps/android/LlamaDemo), PyTorchs on-device inference library.
## Installation
We're working on making LocalInference easier to set up. For now, you'll need to import it via `.xcframework`:
1. Clone the executorch submodule in this repo and its dependencies: `git submodule update --init --recursive`
1. Install [Cmake](https://cmake.org/) for the executorch build`
1. Drag `LocalInference.xcodeproj` into your project
1. Add `LocalInference` as a framework in your app target
1. Add a package dependency on https://github.com/pytorch/executorch (branch latest)
1. Add all the kernels / backends from executorch (but not exectuorch itself!) as frameworks in your app target:
- backend_coreml
- backend_mps
- backend_xnnpack
- kernels_custom
- kernels_optimized
- kernels_portable
- kernels_quantized
1. In "Build Settings" > "Other Linker Flags" > "Any iOS Simulator SDK", add:
```
-force_load
$(BUILT_PRODUCTS_DIR)/libkernels_optimized-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libkernels_custom-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libkernels_quantized-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libbackend_xnnpack-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libbackend_coreml-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libbackend_mps-simulator-release.a
```
1. In "Build Settings" > "Other Linker Flags" > "Any iOS SDK", add:
```
-force_load
$(BUILT_PRODUCTS_DIR)/libkernels_optimized-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libkernels_custom-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libkernels_quantized-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libbackend_xnnpack-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libbackend_coreml-simulator-release.a
-force_load
$(BUILT_PRODUCTS_DIR)/libbackend_mps-simulator-release.a
```
## Preparing a model
1. Prepare a `.pte` file [following the executorch docs](https://github.com/pytorch/executorch/blob/main/examples/models/llama/README.md#step-2-prepare-model)
2. Bundle the `.pte` and `tokenizer.model` file into your app
We now support models quantized using SpinQuant and QAT-LoRA which offer a significant performance boost (demo app on iPhone 13 Pro):
| Llama 3.2 1B | Tokens / Second (total) | | Time-to-First-Token (sec) | |
| :---- | :---- | :---- | :---- | :---- |
| | Haiku | Paragraph | Haiku | Paragraph |
| BF16 | 2.2 | 2.5 | 2.3 | 1.9 |
| QAT+LoRA | 7.1 | 3.3 | 0.37 | 0.24 |
| SpinQuant | 10.1 | 5.2 | 0.2 | 0.2 |
## Using LocalInference
1. Instantiate LocalInference with a DispatchQueue. Optionally, pass it into your agents service:
```swift
init () {
runnerQueue = DispatchQueue(label: "org.meta.llamastack")
inferenceService = LocalInferenceService(queue: runnerQueue)
agentsService = LocalAgentsService(inference: inferenceService)
}
```
2. Before making any inference calls, load your model from your bundle:
```swift
let mainBundle = Bundle.main
inferenceService.loadModel(
modelPath: mainBundle.url(forResource: "llama32_1b_spinquant", withExtension: "pte"),
tokenizerPath: mainBundle.url(forResource: "tokenizer", withExtension: "model"),
completion: {_ in } // use to handle load failures
)
```
3. Make inference calls (or agents calls) as you normally would with LlamaStack:
```
for await chunk in try await agentsService.initAndCreateTurn(
messages: [
.UserMessage(Components.Schemas.UserMessage(
content: .case1("Call functions as needed to handle any actions in the following text:\n\n" + text),
role: .user))
]
) {
```
## Troubleshooting
If you receive errors like "missing package product" or "invalid checksum", try cleaning the build folder and resetting the Swift package cache:
(Opt+Click) Product > Clean Build Folder Immediately
```
rm -rf \
~/Library/org.swift.swiftpm \
~/Library/Caches/org.swift.swiftpm \
~/Library/Caches/com.apple.dt.Xcode \
~/Library/Developer/Xcode/DerivedData
```

View file

@ -16,7 +16,7 @@ from llama_stack.apis.datasets import * # noqa: F403
from autoevals.llm import Factuality
from autoevals.ragas import AnswerCorrectness
from llama_stack.providers.datatypes import ScoringFunctionsProtocolPrivate
from llama_stack.providers.impls.meta_reference.scoring.scoring_fn.common import (
from llama_stack.providers.inline.meta_reference.scoring.scoring_fn.common import (
aggregate_average,
)

View file

@ -4,10 +4,11 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel
from pydantic import BaseModel, Field
from llama_stack.providers.utils.kvstore import KVStoreConfig
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
class MetaReferenceAgentsImplConfig(BaseModel):
persistence_store: KVStoreConfig
persistence_store: KVStoreConfig = Field(default=SqliteKVStoreConfig())

View file

@ -32,18 +32,18 @@ class ShieldRunnerMixin:
self.output_shields = output_shields
async def run_multiple_shields(
self, messages: List[Message], shield_types: List[str]
self, messages: List[Message], identifiers: List[str]
) -> None:
responses = await asyncio.gather(
*[
self.safety_api.run_shield(
shield_type=shield_type,
identifier=identifier,
messages=messages,
)
for shield_type in shield_types
for identifier in identifiers
]
)
for shield_type, response in zip(shield_types, responses):
for identifier, response in zip(identifiers, responses):
if not response.violation:
continue
@ -52,6 +52,6 @@ class ShieldRunnerMixin:
raise SafetyException(violation)
elif violation.violation_level == ViolationLevel.WARN:
cprint(
f"[Warn]{shield_type} raised a warning",
f"[Warn]{identifier} raised a warning",
color="red",
)

View file

@ -9,7 +9,7 @@ from typing import List
from llama_stack.apis.inference import Message
from llama_stack.apis.safety import * # noqa: F403
from llama_stack.providers.impls.meta_reference.agents.safety import ShieldRunnerMixin
from llama_stack.providers.inline.meta_reference.agents.safety import ShieldRunnerMixin
from .builtin import BaseTool

View file

@ -25,8 +25,8 @@ class MetaReferenceCodeScannerSafetyImpl(Safety):
pass
async def register_shield(self, shield: ShieldDef) -> None:
if shield.type != ShieldType.code_scanner.value:
raise ValueError(f"Unsupported safety shield type: {shield.type}")
if shield.shield_type != ShieldType.code_scanner.value:
raise ValueError(f"Unsupported safety shield type: {shield.shield_type}")
async def run_shield(
self,

View file

@ -14,6 +14,11 @@ from llama_models.llama3.api.datatypes import * # noqa: F403
from llama_stack.apis.inference import * # noqa: F403
from llama_stack.providers.datatypes import ModelDef, ModelsProtocolPrivate
from llama_stack.providers.utils.inference.prompt_adapter import (
convert_image_media_to_url,
request_has_media,
)
from .config import MetaReferenceInferenceConfig
from .generation import Llama
from .model_parallel import LlamaModelParallelGenerator
@ -87,6 +92,7 @@ class MetaReferenceInferenceImpl(Inference, ModelsProtocolPrivate):
logprobs=logprobs,
)
self.check_model(request)
request = await request_with_localized_media(request)
if request.stream:
return self._stream_completion(request)
@ -211,6 +217,7 @@ class MetaReferenceInferenceImpl(Inference, ModelsProtocolPrivate):
logprobs=logprobs,
)
self.check_model(request)
request = await request_with_localized_media(request)
if self.config.create_distributed_process_group:
if SEMAPHORE.locked():
@ -388,3 +395,31 @@ class MetaReferenceInferenceImpl(Inference, ModelsProtocolPrivate):
contents: List[InterleavedTextMedia],
) -> EmbeddingsResponse:
raise NotImplementedError()
async def request_with_localized_media(
request: Union[ChatCompletionRequest, CompletionRequest],
) -> Union[ChatCompletionRequest, CompletionRequest]:
if not request_has_media(request):
return request
async def _convert_single_content(content):
if isinstance(content, ImageMedia):
url = await convert_image_media_to_url(content, download=True)
return ImageMedia(image=URL(uri=url))
else:
return content
async def _convert_content(content):
if isinstance(content, list):
return [await _convert_single_content(c) for c in content]
else:
return await _convert_single_content(content)
if isinstance(request, ChatCompletionRequest):
for m in request.messages:
m.content = await _convert_content(m.content)
else:
request.content = await _convert_content(request.content)
return request

View file

@ -27,7 +27,7 @@ from torchao.quantization.GPTQ import Int8DynActInt4WeightLinear
from llama_stack.apis.inference import QuantizationType
from llama_stack.providers.impls.meta_reference.inference.config import (
from llama_stack.providers.inline.meta_reference.inference.config import (
MetaReferenceQuantizedInferenceConfig,
)

View file

@ -0,0 +1,21 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_models.schema_utils import json_schema_type
from pydantic import BaseModel
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
@json_schema_type
class FaissImplConfig(BaseModel):
kvstore: KVStoreConfig = SqliteKVStoreConfig(
db_path=(RUNTIME_BASE_DIR / "faiss_store.db").as_posix()
) # Uses SQLite config specific to FAISS storage

View file

@ -16,6 +16,7 @@ from llama_models.llama3.api.datatypes import * # noqa: F403
from llama_stack.apis.memory import * # noqa: F403
from llama_stack.providers.datatypes import MemoryBanksProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.memory.vector_store import (
ALL_MINILM_L6_V2_DIMENSION,
@ -28,6 +29,8 @@ from .config import FaissImplConfig
logger = logging.getLogger(__name__)
MEMORY_BANKS_PREFIX = "memory_banks:"
class FaissIndex(EmbeddingIndex):
id_by_index: Dict[int, str]
@ -69,10 +72,25 @@ class FaissMemoryImpl(Memory, MemoryBanksProtocolPrivate):
def __init__(self, config: FaissImplConfig) -> None:
self.config = config
self.cache = {}
self.kvstore = None
async def initialize(self) -> None: ...
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)
# Load existing banks from kvstore
start_key = MEMORY_BANKS_PREFIX
end_key = f"{MEMORY_BANKS_PREFIX}\xff"
stored_banks = await self.kvstore.range(start_key, end_key)
async def shutdown(self) -> None: ...
for bank_data in stored_banks:
bank = VectorMemoryBankDef.model_validate_json(bank_data)
index = BankWithIndex(
bank=bank, index=FaissIndex(ALL_MINILM_L6_V2_DIMENSION)
)
self.cache[bank.identifier] = index
async def shutdown(self) -> None:
# Cleanup if needed
pass
async def register_memory_bank(
self,
@ -82,6 +100,14 @@ class FaissMemoryImpl(Memory, MemoryBanksProtocolPrivate):
memory_bank.type == MemoryBankType.vector.value
), f"Only vector banks are supported {memory_bank.type}"
# Store in kvstore
key = f"{MEMORY_BANKS_PREFIX}{memory_bank.identifier}"
await self.kvstore.set(
key=key,
value=memory_bank.json(),
)
# Store in cache
index = BankWithIndex(
bank=memory_bank, index=FaissIndex(ALL_MINILM_L6_V2_DIMENSION)
)

View file

@ -0,0 +1,73 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import tempfile
import pytest
from llama_stack.apis.memory import MemoryBankType, VectorMemoryBankDef
from llama_stack.providers.inline.meta_reference.memory.config import FaissImplConfig
from llama_stack.providers.inline.meta_reference.memory.faiss import FaissMemoryImpl
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
class TestFaissMemoryImpl:
@pytest.fixture
def faiss_impl(self):
# Create a temporary SQLite database file
temp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
config = FaissImplConfig(kvstore=SqliteKVStoreConfig(db_path=temp_db.name))
return FaissMemoryImpl(config)
@pytest.mark.asyncio
async def test_initialize(self, faiss_impl):
# Test empty initialization
await faiss_impl.initialize()
assert len(faiss_impl.cache) == 0
# Test initialization with existing banks
bank = VectorMemoryBankDef(
identifier="test_bank",
type=MemoryBankType.vector.value,
embedding_model="all-MiniLM-L6-v2",
chunk_size_in_tokens=512,
overlap_size_in_tokens=64,
)
# Register a bank and reinitialize to test loading
await faiss_impl.register_memory_bank(bank)
# Create new instance to test initialization with existing data
new_impl = FaissMemoryImpl(faiss_impl.config)
await new_impl.initialize()
assert len(new_impl.cache) == 1
assert "test_bank" in new_impl.cache
@pytest.mark.asyncio
async def test_register_memory_bank(self, faiss_impl):
bank = VectorMemoryBankDef(
identifier="test_bank",
type=MemoryBankType.vector.value,
embedding_model="all-MiniLM-L6-v2",
chunk_size_in_tokens=512,
overlap_size_in_tokens=64,
)
await faiss_impl.initialize()
await faiss_impl.register_memory_bank(bank)
assert "test_bank" in faiss_impl.cache
assert faiss_impl.cache["test_bank"].bank == bank
# Verify persistence
new_impl = FaissMemoryImpl(faiss_impl.config)
await new_impl.initialize()
assert "test_bank" in new_impl.cache
if __name__ == "__main__":
pytest.main([__file__])

View file

@ -4,7 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import SafetyConfig
from .config import LlamaGuardShieldConfig, SafetyConfig # noqa: F401
async def get_provider_impl(config: SafetyConfig, deps):

View file

@ -49,7 +49,7 @@ class MetaReferenceSafetyImpl(Safety, ShieldsProtocolPrivate):
return [
ShieldDef(
identifier=shield_type,
type=shield_type,
shield_type=shield_type,
params={},
)
for shield_type in self.available_shields
@ -57,13 +57,13 @@ class MetaReferenceSafetyImpl(Safety, ShieldsProtocolPrivate):
async def run_shield(
self,
shield_type: str,
identifier: str,
messages: List[Message],
params: Dict[str, Any] = None,
) -> RunShieldResponse:
shield_def = await self.shield_store.get_shield(shield_type)
shield_def = await self.shield_store.get_shield(identifier)
if not shield_def:
raise ValueError(f"Unknown shield {shield_type}")
raise ValueError(f"Unknown shield {identifier}")
shield = self.get_shield_impl(shield_def)
@ -92,14 +92,14 @@ class MetaReferenceSafetyImpl(Safety, ShieldsProtocolPrivate):
return RunShieldResponse(violation=violation)
def get_shield_impl(self, shield: ShieldDef) -> ShieldBase:
if shield.type == ShieldType.llama_guard.value:
if shield.shield_type == ShieldType.llama_guard.value:
cfg = self.config.llama_guard_shield
return LlamaGuardShield(
model=cfg.model,
inference_api=self.inference_api,
excluded_categories=cfg.excluded_categories,
)
elif shield.type == ShieldType.prompt_guard.value:
elif shield.shield_type == ShieldType.prompt_guard.value:
model_dir = model_local_dir(PROMPT_GUARD_MODEL)
subtype = shield.params.get("prompt_guard_type", "injection")
if subtype == "injection":
@ -109,4 +109,4 @@ class MetaReferenceSafetyImpl(Safety, ShieldsProtocolPrivate):
else:
raise ValueError(f"Unknown prompt guard type: {subtype}")
else:
raise ValueError(f"Unknown shield type: {shield.type}")
raise ValueError(f"Unknown shield type: {shield.shield_type}")

Some files were not shown because too many files have changed in this diff Show more