getting closer to a distro definition, distro install + configure works

This commit is contained in:
Ashwin Bharambe 2024-08-01 22:59:11 -07:00
parent dac2b5a1ed
commit 041cafbee3
11 changed files with 471 additions and 130 deletions

View file

@ -5,18 +5,24 @@
# the root directory of this source tree.
import argparse
import os
import importlib
import inspect
import shlex
from pathlib import Path
from typing import Annotated, get_args, get_origin, Literal, Union
import pkg_resources
import yaml
from pydantic import BaseModel
from termcolor import cprint
from llama_toolchain.cli.subcommand import Subcommand
from llama_toolchain.distribution.registry import all_registered_distributions
from llama_toolchain.utils import LLAMA_STACK_CONFIG_DIR
from llama_toolchain.distribution.datatypes import Distribution, PassthroughApiAdapter
from llama_toolchain.distribution.registry import available_distributions
from llama_toolchain.utils import DISTRIBS_BASE_DIR
from .utils import run_command
CONFIGS_BASE_DIR = os.path.join(LLAMA_STACK_CONFIG_DIR, "configs")
DISTRIBS = available_distributions()
class DistributionConfigure(Subcommand):
@ -34,59 +40,198 @@ class DistributionConfigure(Subcommand):
self.parser.set_defaults(func=self._run_distribution_configure_cmd)
def _add_arguments(self):
distribs = all_registered_distributions()
self.parser.add_argument(
"--name",
type=str,
help="Mame of the distribution to configure",
default="local-source",
choices=[d.name for d in distribs],
choices=[d.name for d in available_distributions()],
)
def read_user_inputs(self):
checkpoint_dir = input(
"Enter the checkpoint directory for the model (e.g., ~/.llama/checkpoints/Meta-Llama-3-8B/): "
)
model_parallel_size = input(
"Enter model parallel size (e.g., 1 for 8B / 8 for 70B and 405B): "
)
assert model_parallel_size.isdigit() and int(model_parallel_size) in {
1,
8,
}, "model parallel size must be 1 or 8"
return checkpoint_dir, model_parallel_size
def write_output_yaml(self, checkpoint_dir, model_parallel_size, yaml_output_path):
default_conf_path = pkg_resources.resource_filename(
"llama_toolchain", "data/default_distribution_config.yaml"
)
with open(default_conf_path, "r") as f:
yaml_content = f.read()
yaml_content = yaml_content.format(
checkpoint_dir=checkpoint_dir,
model_parallel_size=model_parallel_size,
)
with open(yaml_output_path, "w") as yaml_file:
yaml_file.write(yaml_content.strip())
print(f"YAML configuration has been written to {yaml_output_path}")
def _run_distribution_configure_cmd(self, args: argparse.Namespace) -> None:
checkpoint_dir, model_parallel_size = self.read_user_inputs()
checkpoint_dir = os.path.expanduser(checkpoint_dir)
dist = None
for d in DISTRIBS:
if d.name == args.name:
dist = d
break
assert (
Path(checkpoint_dir).exists() and Path(checkpoint_dir).is_dir()
), f"{checkpoint_dir} does not exist or it not a directory"
if dist is None:
self.parser.error(f"Could not find distribution {args.name}")
return
os.makedirs(CONFIGS_BASE_DIR, exist_ok=True)
yaml_output_path = Path(CONFIGS_BASE_DIR) / "distribution.yaml"
env_file = DISTRIBS_BASE_DIR / dist.name / "conda.env"
# read this file to get the conda env name
assert env_file.exists(), f"Could not find conda env file {env_file}"
with open(env_file, "r") as f:
conda_env = f.read().strip()
self.write_output_yaml(
checkpoint_dir,
model_parallel_size,
yaml_output_path,
configure_llama_distribution(dist, conda_env)
def configure_llama_distribution(dist: Distribution, conda_env: str):
python_exe = run_command(shlex.split("which python"))
# simple check
if conda_env not in python_exe:
raise ValueError(
f"Please re-run configure by activating the `{conda_env}` conda environment"
)
adapter_configs = {}
for api_surface, adapter in dist.adapters.items():
if isinstance(adapter, PassthroughApiAdapter):
adapter_configs[api_surface.value] = adapter.dict()
else:
cprint(
f"Configuring API surface: {api_surface.value}", "white", attrs=["bold"]
)
config_type = instantiate_class_type(adapter.config_class)
# TODO: when we are re-configuring, we should read existing values
config = prompt_for_config(config_type)
adapter_configs[api_surface.value] = config.dict()
dist_config = {
"adapters": adapter_configs,
"conda_env": conda_env,
}
yaml_output_path = Path(DISTRIBS_BASE_DIR) / dist.name / "config.yaml"
with open(yaml_output_path, "w") as fp:
fp.write(yaml.dump(dist_config, sort_keys=False))
print(f"YAML configuration has been written to {yaml_output_path}")
def instantiate_class_type(fully_qualified_name):
module_name, class_name = fully_qualified_name.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)
def get_literal_values(field):
"""Extract literal values from a field if it's a Literal type."""
if get_origin(field.annotation) is Literal:
return get_args(field.annotation)
return None
def is_optional(field_type):
"""Check if a field type is Optional."""
return get_origin(field_type) is Union and type(None) in get_args(field_type)
def get_non_none_type(field_type):
"""Get the non-None type from an Optional type."""
return next(arg for arg in get_args(field_type) if arg is not type(None))
def prompt_for_config(config_type: type[BaseModel]) -> BaseModel:
"""
Recursively prompt the user for configuration values based on a Pydantic BaseModel.
Args:
config_type: A Pydantic BaseModel class representing the configuration structure.
Returns:
An instance of the config_type with user-provided values.
"""
config_data = {}
for field_name, field in config_type.__fields__.items():
field_type = field.annotation
default_value = (
field.default if not isinstance(field.default, type(Ellipsis)) else None
)
is_required = field.required
# Skip fields with Literal type
if get_origin(field_type) is Literal:
continue
# Check if the field is a discriminated union
if get_origin(field_type) is Annotated:
inner_type = get_args(field_type)[0]
if get_origin(inner_type) is Union:
discriminator = field.field_info.discriminator
if discriminator:
union_types = get_args(inner_type)
# Find the discriminator field in each union type
type_map = {}
for t in union_types:
disc_field = t.__fields__[discriminator]
literal_values = get_literal_values(disc_field)
if literal_values:
for value in literal_values:
type_map[value] = t
while True:
discriminator_value = input(
f"Enter the {discriminator} (options: {', '.join(type_map.keys())}): "
)
if discriminator_value in type_map:
chosen_type = type_map[discriminator_value]
print(f"\nConfiguring {chosen_type.__name__}:")
sub_config = prompt_for_config(chosen_type)
config_data[field_name] = sub_config
# Set the discriminator field in the sub-config
setattr(sub_config, discriminator, discriminator_value)
break
else:
print(f"Invalid {discriminator}. Please try again.")
continue
if inspect.isclass(field_type) and issubclass(field_type, BaseModel):
print(f"\nEntering sub-configuration for {field_name}:")
config_data[field_name] = prompt_for_config(field_type)
else:
prompt = f"Enter value for {field_name}"
if default_value is not None:
prompt += f" (default: {default_value})"
if is_optional(field_type):
prompt += " (optional)"
elif is_required:
prompt += " (required)"
prompt += ": "
while True:
user_input = input(prompt)
if user_input == "":
if default_value is not None:
config_data[field_name] = default_value
break
elif is_optional(field_type):
config_data[field_name] = None
break
elif not is_required:
config_data[field_name] = None
break
else:
print("This field is required. Please provide a value.")
continue
try:
# Handle Optional types
if is_optional(field_type):
if user_input.lower() == "none":
config_data[field_name] = None
break
field_type = get_non_none_type(field_type)
# Convert the input to the correct type
if inspect.isclass(field_type) and issubclass(
field_type, BaseModel
):
# For nested BaseModels, we assume a dictionary-like string input
import ast
config_data[field_name] = field_type(
**ast.literal_eval(user_input)
)
else:
config_data[field_name] = field_type(user_input)
break
except ValueError:
print(
f"Invalid input. Expected type: {getattr(field_type, '__name__', str(field_type))}"
)
return config_type(**config_data)