From c885015e6f544a1fe692684cdf2a10e1c1ea96dd Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 14 Mar 2025 14:14:43 +0000 Subject: [PATCH] add parameter related changes --- llama_stack/apis/common/job_types.py | 1 + .../remote/post_training/nvidia/__init__.py | 6 +-- .../remote/post_training/nvidia/config.py | 38 +++---------- .../post_training/nvidia/post_training.py | 53 ++++++++++++------- 4 files changed, 42 insertions(+), 56 deletions(-) diff --git a/llama_stack/apis/common/job_types.py b/llama_stack/apis/common/job_types.py index bc070017b..24ea2e35d 100644 --- a/llama_stack/apis/common/job_types.py +++ b/llama_stack/apis/common/job_types.py @@ -21,3 +21,4 @@ class JobStatus(Enum): in_progress = "in_progress" failed = "failed" scheduled = "scheduled" + cancelled = "cancelled" diff --git a/llama_stack/providers/remote/post_training/nvidia/__init__.py b/llama_stack/providers/remote/post_training/nvidia/__init__.py index 9210090e7..17c203057 100644 --- a/llama_stack/providers/remote/post_training/nvidia/__init__.py +++ b/llama_stack/providers/remote/post_training/nvidia/__init__.py @@ -4,16 +4,12 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -from typing import Dict - -from llama_stack.distribution.datatypes import Api, ProviderSpec - from .config import NvidiaPostTrainingConfig async def get_adapter_impl( config: NvidiaPostTrainingConfig, - deps: Dict[Api, ProviderSpec], + _deps, ): from .post_training import NvidiaPostTrainingAdapter diff --git a/llama_stack/providers/remote/post_training/nvidia/config.py b/llama_stack/providers/remote/post_training/nvidia/config.py index 750a317e7..810d47e5b 100644 --- a/llama_stack/providers/remote/post_training/nvidia/config.py +++ b/llama_stack/providers/remote/post_training/nvidia/config.py @@ -5,7 +5,6 @@ # the root directory of this source tree. import os -import warnings from typing import Any, Dict, Optional from pydantic import BaseModel, Field @@ -30,18 +29,18 @@ class NvidiaPostTrainingConfig(BaseModel): ) access_policies: Optional[dict] = Field( - default_factory=lambda: os.getenv("NVIDIA_ACCESS_POLICIES", {}), + default_factory=lambda: os.getenv("NVIDIA_ACCESS_POLICIES", {"arbitrary": "json"}), description="The NVIDIA access policies.", ) project_id: Optional[str] = Field( - default_factory=lambda: os.getenv("NVIDIA_PROJECT_ID", "test-project"), + default_factory=lambda: os.getenv("NVIDIA_PROJECT_ID", "test-example-model@v1"), description="The NVIDIA project ID.", ) # ToDO: validate this, add default value - customizer_url: str = Field( - default_factory=lambda: os.getenv("NVIDIA_CUSTOMIZER_URL", "http://nemo.test"), + customizer_url: Optional[str] = Field( + default_factory=lambda: os.getenv("NVIDIA_CUSTOMIZER_URL"), description="Base URL for the NeMo Customizer API", ) @@ -55,43 +54,18 @@ class NvidiaPostTrainingConfig(BaseModel): description="Maximum number of retries for the NVIDIA Post Training API", ) - # ToDo: validate this, add default value + # ToDo: validate this output_model_dir: str = Field( default_factory=lambda: os.getenv("NVIDIA_OUTPUT_MODEL_DIR", "test-example-model@v1"), description="Directory to save the output model", ) - # warning for default values - def __post_init__(self): - default_values = [] - if os.getenv("NVIDIA_OUTPUT_MODEL_DIR") is None: - default_values.append("output_model_dir='test-example-model@v1'") - if os.getenv("NVIDIA_PROJECT_ID") is None: - default_values.append("project_id='test-project'") - if os.getenv("NVIDIA_USER_ID") is None: - default_values.append("user_id='llama-stack-user'") - if os.getenv("NVIDIA_DATASET_NAMESPACE") is None: - default_values.append("dataset_namespace='default'") - if os.getenv("NVIDIA_ACCESS_POLICIES") is None: - default_values.append("access_policies='{}'") - if os.getenv("NVIDIA_CUSTOMIZER_URL") is None: - default_values.append("customizer_url='http://nemo.test'") - - if default_values: - warnings.warn( - f"Using default values: {', '.join(default_values)}. \ - Please set the environment variables to avoid this default behavior.", - stacklevel=2, - ) - @classmethod def sample_run_config(cls, **kwargs) -> Dict[str, Any]: return { "api_key": "${env.NVIDIA_API_KEY:}", "user_id": "${env.NVIDIA_USER_ID:llama-stack-user}", "dataset_namespace": "${env.NVIDIA_DATASET_NAMESPACE:default}", - "access_policies": "${env.NVIDIA_ACCESS_POLICIES:}", "project_id": "${env.NVIDIA_PROJECT_ID:test-project}", - "customizer_url": "${env.NVIDIA_CUSTOMIZER_URL:}", - "output_model_dir": "${env.NVIDIA_OUTPUT_MODEL_DIR:test-example-model@v1}", + "customizer_url": "${env.NVIDIA_CUSTOMIZER_URL:http://nemo.test}", } diff --git a/llama_stack/providers/remote/post_training/nvidia/post_training.py b/llama_stack/providers/remote/post_training/nvidia/post_training.py index 523211cac..fe28de5f1 100644 --- a/llama_stack/providers/remote/post_training/nvidia/post_training.py +++ b/llama_stack/providers/remote/post_training/nvidia/post_training.py @@ -62,8 +62,14 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): self.headers["Authorization"] = f"Bearer {config.api_key}" self.timeout = aiohttp.ClientTimeout(total=config.timeout) - # TODO(mf): filter by available models + # TODO: filter by available models based on /config endpoint ModelRegistryHelper.__init__(self, model_entries=_MODEL_ENTRIES) + self.session = aiohttp.ClientSession(headers=self.headers, timeout=self.timeout) + self.customizer_url = config.customizer_url + + if not self.customizer_url: + warnings.warn("Customizer URL is not set, using default value: http://nemo.test") + self.customizer_url = "http://nemo.test" async def _make_request( self, @@ -75,8 +81,8 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): **kwargs, ) -> Dict[str, Any]: """Helper method to make HTTP requests to the Customizer API.""" - url = f"{self.config.customizer_url}{path}" - request_headers = self.headers.copy() # Create a copy to avoid modifying the original + url = f"{self.customizer_url}{path}" + request_headers = self.headers.copy() if headers: request_headers.update(headers) @@ -86,12 +92,11 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): request_headers["Content-Type"] = "application/json" for _ in range(self.config.max_retries): - async with aiohttp.ClientSession(headers=request_headers, timeout=self.timeout) as session: - async with session.request(method, url, params=params, json=json, **kwargs) as response: - if response.status >= 400: - error_data = await response.json() - raise Exception(f"API request failed: {error_data}") - return await response.json() + async with self.session.request(method, url, params=params, json=json, **kwargs) as response: + if response.status >= 400: + error_data = await response.json() + raise Exception(f"API request failed: {error_data}") + return await response.json() @webmethod(route="/post-training/jobs", method="GET") async def get_training_jobs( @@ -178,8 +183,8 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): logger_config: Dict[str, Any], model: str, checkpoint_dir: Optional[str], - algorithm_config: Optional[AlgorithmConfig], - ) -> PostTrainingJob: + algorithm_config: Optional[AlgorithmConfig] = None, + ) -> NvidiaPostTrainingJob: """ Fine-tunes a model on a dataset. Currently only supports Lora finetuning for standlone docker container. @@ -223,12 +228,13 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): - batch_size - OptimizerConfig: - lr + - weight_decay - LoRA config: - adapter_dim - adapter_dropout Note: - - checkpoint_dir, hyperparam_search_config, logger_config are not supported atm, will be ignored - - output_model_dir is set via environment variable NVIDIA_OUTPUT_MODEL_DIR + - checkpoint_dir, hyperparam_search_config, logger_config are not supported atm, will be ignored and users are informed via warnings. + - Some parameters from TrainingConfig, DataConfig, OptimizerConfig are not supported atm, will be ignored and users are informed via warnings. User is informed about unsupported parameters via warnings. """ @@ -247,7 +253,7 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): """Helper function to warn about unsupported parameters in a config dictionary.""" unsupported_params = [k for k in config_dict.keys() if k not in supported_keys] if unsupported_params: - warnings.warn(f"Parameters: {unsupported_params} in {config_name} not supported and will be ignored.") + warnings.warn(f"Parameters: {unsupported_params} in `{config_name}` not supported and will be ignored.") # Check for unsupported parameters warn_unsupported_params(training_config, ["n_epochs", "data_config", "optimizer_config"], "TrainingConfig") @@ -269,9 +275,9 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): "hyperparameters": { "training_type": "sft", "finetuning_type": "lora", - "epochs": training_config["n_epochs"], - "batch_size": training_config["data_config"]["batch_size"], - "learning_rate": training_config["optimizer_config"]["lr"], + "epochs": training_config.get("n_epochs", 1), + "batch_size": training_config["data_config"].get("batch_size", 8), + "learning_rate": training_config["optimizer_config"].get("lr", 0.0001), }, "project": self.config.project_id, "ownership": {"created_by": self.config.user_id, "access_policies": self.config.access_policies}, @@ -283,7 +289,10 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): if isinstance(algorithm_config, dict) and algorithm_config.get("type") == "LoRA": # Extract LoRA-specific parameters lora_config = {k: v for k, v in algorithm_config.items() if k != "type"} - job_config["hyperparameters"]["lora"] = lora_config + job_config["hyperparameters"]["lora"] = { + "adapter_dim": lora_config.get("adapter_dim", 8), + "adapter_dropout": lora_config.get("adapter_dropout", 1), + } warn_unsupported_params(lora_config, ["adapter_dim", "adapter_dropout"], "LoRA config") else: raise NotImplementedError(f"Unsupported algorithm config: {algorithm_config}") @@ -297,7 +306,13 @@ class NvidiaPostTrainingAdapter(ModelRegistryHelper): ) job_uuid = response["id"] - return PostTrainingJob(job_uuid=job_uuid) + status = STATUS_MAPPING.get(response["status"].lower(), "unknown") + created_at = datetime.fromisoformat(response["created_at"]) + updated_at = datetime.fromisoformat(response["updated_at"]) + + return NvidiaPostTrainingJob( + job_uuid=job_uuid, status=JobStatus(status), created_at=created_at, updated_at=updated_at, **response + ) async def preference_optimize( self,