add parameter related changes

This commit is contained in:
Ubuntu 2025-03-14 14:14:43 +00:00
parent 65beb929a2
commit c885015e6f
4 changed files with 42 additions and 56 deletions

View file

@ -21,3 +21,4 @@ class JobStatus(Enum):
in_progress = "in_progress" in_progress = "in_progress"
failed = "failed" failed = "failed"
scheduled = "scheduled" scheduled = "scheduled"
cancelled = "cancelled"

View file

@ -4,16 +4,12 @@
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
from typing import Dict
from llama_stack.distribution.datatypes import Api, ProviderSpec
from .config import NvidiaPostTrainingConfig from .config import NvidiaPostTrainingConfig
async def get_adapter_impl( async def get_adapter_impl(
config: NvidiaPostTrainingConfig, config: NvidiaPostTrainingConfig,
deps: Dict[Api, ProviderSpec], _deps,
): ):
from .post_training import NvidiaPostTrainingAdapter from .post_training import NvidiaPostTrainingAdapter

View file

@ -5,7 +5,6 @@
# the root directory of this source tree. # the root directory of this source tree.
import os import os
import warnings
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@ -30,18 +29,18 @@ class NvidiaPostTrainingConfig(BaseModel):
) )
access_policies: Optional[dict] = Field( access_policies: Optional[dict] = Field(
default_factory=lambda: os.getenv("NVIDIA_ACCESS_POLICIES", {}), default_factory=lambda: os.getenv("NVIDIA_ACCESS_POLICIES", {"arbitrary": "json"}),
description="The NVIDIA access policies.", description="The NVIDIA access policies.",
) )
project_id: Optional[str] = Field( project_id: Optional[str] = Field(
default_factory=lambda: os.getenv("NVIDIA_PROJECT_ID", "test-project"), default_factory=lambda: os.getenv("NVIDIA_PROJECT_ID", "test-example-model@v1"),
description="The NVIDIA project ID.", description="The NVIDIA project ID.",
) )
# ToDO: validate this, add default value # ToDO: validate this, add default value
customizer_url: str = Field( customizer_url: Optional[str] = Field(
default_factory=lambda: os.getenv("NVIDIA_CUSTOMIZER_URL", "http://nemo.test"), default_factory=lambda: os.getenv("NVIDIA_CUSTOMIZER_URL"),
description="Base URL for the NeMo Customizer API", description="Base URL for the NeMo Customizer API",
) )
@ -55,43 +54,18 @@ class NvidiaPostTrainingConfig(BaseModel):
description="Maximum number of retries for the NVIDIA Post Training API", description="Maximum number of retries for the NVIDIA Post Training API",
) )
# ToDo: validate this, add default value # ToDo: validate this
output_model_dir: str = Field( output_model_dir: str = Field(
default_factory=lambda: os.getenv("NVIDIA_OUTPUT_MODEL_DIR", "test-example-model@v1"), default_factory=lambda: os.getenv("NVIDIA_OUTPUT_MODEL_DIR", "test-example-model@v1"),
description="Directory to save the output model", description="Directory to save the output model",
) )
# warning for default values
def __post_init__(self):
default_values = []
if os.getenv("NVIDIA_OUTPUT_MODEL_DIR") is None:
default_values.append("output_model_dir='test-example-model@v1'")
if os.getenv("NVIDIA_PROJECT_ID") is None:
default_values.append("project_id='test-project'")
if os.getenv("NVIDIA_USER_ID") is None:
default_values.append("user_id='llama-stack-user'")
if os.getenv("NVIDIA_DATASET_NAMESPACE") is None:
default_values.append("dataset_namespace='default'")
if os.getenv("NVIDIA_ACCESS_POLICIES") is None:
default_values.append("access_policies='{}'")
if os.getenv("NVIDIA_CUSTOMIZER_URL") is None:
default_values.append("customizer_url='http://nemo.test'")
if default_values:
warnings.warn(
f"Using default values: {', '.join(default_values)}. \
Please set the environment variables to avoid this default behavior.",
stacklevel=2,
)
@classmethod @classmethod
def sample_run_config(cls, **kwargs) -> Dict[str, Any]: def sample_run_config(cls, **kwargs) -> Dict[str, Any]:
return { return {
"api_key": "${env.NVIDIA_API_KEY:}", "api_key": "${env.NVIDIA_API_KEY:}",
"user_id": "${env.NVIDIA_USER_ID:llama-stack-user}", "user_id": "${env.NVIDIA_USER_ID:llama-stack-user}",
"dataset_namespace": "${env.NVIDIA_DATASET_NAMESPACE:default}", "dataset_namespace": "${env.NVIDIA_DATASET_NAMESPACE:default}",
"access_policies": "${env.NVIDIA_ACCESS_POLICIES:}",
"project_id": "${env.NVIDIA_PROJECT_ID:test-project}", "project_id": "${env.NVIDIA_PROJECT_ID:test-project}",
"customizer_url": "${env.NVIDIA_CUSTOMIZER_URL:}", "customizer_url": "${env.NVIDIA_CUSTOMIZER_URL:http://nemo.test}",
"output_model_dir": "${env.NVIDIA_OUTPUT_MODEL_DIR:test-example-model@v1}",
} }

View file

@ -62,8 +62,14 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
self.headers["Authorization"] = f"Bearer {config.api_key}" self.headers["Authorization"] = f"Bearer {config.api_key}"
self.timeout = aiohttp.ClientTimeout(total=config.timeout) self.timeout = aiohttp.ClientTimeout(total=config.timeout)
# TODO(mf): filter by available models # TODO: filter by available models based on /config endpoint
ModelRegistryHelper.__init__(self, model_entries=_MODEL_ENTRIES) ModelRegistryHelper.__init__(self, model_entries=_MODEL_ENTRIES)
self.session = aiohttp.ClientSession(headers=self.headers, timeout=self.timeout)
self.customizer_url = config.customizer_url
if not self.customizer_url:
warnings.warn("Customizer URL is not set, using default value: http://nemo.test")
self.customizer_url = "http://nemo.test"
async def _make_request( async def _make_request(
self, self,
@ -75,8 +81,8 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
**kwargs, **kwargs,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Helper method to make HTTP requests to the Customizer API.""" """Helper method to make HTTP requests to the Customizer API."""
url = f"{self.config.customizer_url}{path}" url = f"{self.customizer_url}{path}"
request_headers = self.headers.copy() # Create a copy to avoid modifying the original request_headers = self.headers.copy()
if headers: if headers:
request_headers.update(headers) request_headers.update(headers)
@ -86,8 +92,7 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
request_headers["Content-Type"] = "application/json" request_headers["Content-Type"] = "application/json"
for _ in range(self.config.max_retries): for _ in range(self.config.max_retries):
async with aiohttp.ClientSession(headers=request_headers, timeout=self.timeout) as session: async with self.session.request(method, url, params=params, json=json, **kwargs) as response:
async with session.request(method, url, params=params, json=json, **kwargs) as response:
if response.status >= 400: if response.status >= 400:
error_data = await response.json() error_data = await response.json()
raise Exception(f"API request failed: {error_data}") raise Exception(f"API request failed: {error_data}")
@ -178,8 +183,8 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
logger_config: Dict[str, Any], logger_config: Dict[str, Any],
model: str, model: str,
checkpoint_dir: Optional[str], checkpoint_dir: Optional[str],
algorithm_config: Optional[AlgorithmConfig], algorithm_config: Optional[AlgorithmConfig] = None,
) -> PostTrainingJob: ) -> NvidiaPostTrainingJob:
""" """
Fine-tunes a model on a dataset. Fine-tunes a model on a dataset.
Currently only supports Lora finetuning for standlone docker container. Currently only supports Lora finetuning for standlone docker container.
@ -223,12 +228,13 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
- batch_size - batch_size
- OptimizerConfig: - OptimizerConfig:
- lr - lr
- weight_decay
- LoRA config: - LoRA config:
- adapter_dim - adapter_dim
- adapter_dropout - adapter_dropout
Note: Note:
- checkpoint_dir, hyperparam_search_config, logger_config are not supported atm, will be ignored - checkpoint_dir, hyperparam_search_config, logger_config are not supported atm, will be ignored and users are informed via warnings.
- output_model_dir is set via environment variable NVIDIA_OUTPUT_MODEL_DIR - Some parameters from TrainingConfig, DataConfig, OptimizerConfig are not supported atm, will be ignored and users are informed via warnings.
User is informed about unsupported parameters via warnings. User is informed about unsupported parameters via warnings.
""" """
@ -247,7 +253,7 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
"""Helper function to warn about unsupported parameters in a config dictionary.""" """Helper function to warn about unsupported parameters in a config dictionary."""
unsupported_params = [k for k in config_dict.keys() if k not in supported_keys] unsupported_params = [k for k in config_dict.keys() if k not in supported_keys]
if unsupported_params: if unsupported_params:
warnings.warn(f"Parameters: {unsupported_params} in {config_name} not supported and will be ignored.") warnings.warn(f"Parameters: {unsupported_params} in `{config_name}` not supported and will be ignored.")
# Check for unsupported parameters # Check for unsupported parameters
warn_unsupported_params(training_config, ["n_epochs", "data_config", "optimizer_config"], "TrainingConfig") warn_unsupported_params(training_config, ["n_epochs", "data_config", "optimizer_config"], "TrainingConfig")
@ -269,9 +275,9 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
"hyperparameters": { "hyperparameters": {
"training_type": "sft", "training_type": "sft",
"finetuning_type": "lora", "finetuning_type": "lora",
"epochs": training_config["n_epochs"], "epochs": training_config.get("n_epochs", 1),
"batch_size": training_config["data_config"]["batch_size"], "batch_size": training_config["data_config"].get("batch_size", 8),
"learning_rate": training_config["optimizer_config"]["lr"], "learning_rate": training_config["optimizer_config"].get("lr", 0.0001),
}, },
"project": self.config.project_id, "project": self.config.project_id,
"ownership": {"created_by": self.config.user_id, "access_policies": self.config.access_policies}, "ownership": {"created_by": self.config.user_id, "access_policies": self.config.access_policies},
@ -283,7 +289,10 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
if isinstance(algorithm_config, dict) and algorithm_config.get("type") == "LoRA": if isinstance(algorithm_config, dict) and algorithm_config.get("type") == "LoRA":
# Extract LoRA-specific parameters # Extract LoRA-specific parameters
lora_config = {k: v for k, v in algorithm_config.items() if k != "type"} lora_config = {k: v for k, v in algorithm_config.items() if k != "type"}
job_config["hyperparameters"]["lora"] = lora_config job_config["hyperparameters"]["lora"] = {
"adapter_dim": lora_config.get("adapter_dim", 8),
"adapter_dropout": lora_config.get("adapter_dropout", 1),
}
warn_unsupported_params(lora_config, ["adapter_dim", "adapter_dropout"], "LoRA config") warn_unsupported_params(lora_config, ["adapter_dim", "adapter_dropout"], "LoRA config")
else: else:
raise NotImplementedError(f"Unsupported algorithm config: {algorithm_config}") raise NotImplementedError(f"Unsupported algorithm config: {algorithm_config}")
@ -297,7 +306,13 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper):
) )
job_uuid = response["id"] job_uuid = response["id"]
return PostTrainingJob(job_uuid=job_uuid) status = STATUS_MAPPING.get(response["status"].lower(), "unknown")
created_at = datetime.fromisoformat(response["created_at"])
updated_at = datetime.fromisoformat(response["updated_at"])
return NvidiaPostTrainingJob(
job_uuid=job_uuid, status=JobStatus(status), created_at=created_at, updated_at=updated_at, **response
)
async def preference_optimize( async def preference_optimize(
self, self,