debugged impl errors for building container and running data prep

Signed-off-by: James Kunstle <jkunstle@redhat.com>
This commit is contained in:
James Kunstle 2025-03-14 00:42:20 -07:00
parent 06465441f2
commit 68000499f7
6 changed files with 66 additions and 19 deletions

25
hf-ilab-build.yaml Normal file
View file

@ -0,0 +1,25 @@
version: "2"
distribution_spec:
description: Use (an external) Ollama server for running LLM inference
providers:
inference:
- remote::ollama
vector_io:
- inline::faiss
safety:
- inline::llama-guard
telemetry:
- inline::meta-reference
agents:
- inline::meta-reference
eval:
- inline::meta-reference
datasetio:
- inline::localfs
scoring:
- inline::llm-as-judge
tool_runtime:
- remote::brave-search
post_training:
- inline::huggingface-ilab
image_type: venv

View file

@ -63,8 +63,10 @@ class HFilabPostTrainingImpl:
if self.current_job is None: if self.current_job is None:
return True return True
finalized_job_states = [JobStatus.completed.value, JobStatus.failed.value] finalized_job_states = [JobStatus.completed, JobStatus.failed]
if self.current_job.status in finalized_job_states:
# check most recent status of job.
if self.current_job.status[-1] in finalized_job_states:
return True return True
return False return False
@ -87,7 +89,8 @@ class HFilabPostTrainingImpl:
checkpoint_dir: Optional[str], checkpoint_dir: Optional[str],
algorithm_config: Optional[AlgorithmConfig], algorithm_config: Optional[AlgorithmConfig],
) -> JSONResponse: ) -> JSONResponse:
if not self.can_schedule_new_job(): if not await self.can_schedule_new_job():
# TODO: this status code isn't making its way up to the user. User just getting 500 from SDK.
raise fastapi.HTTPException( raise fastapi.HTTPException(
status_code=503, # service unavailable, try again later. status_code=503, # service unavailable, try again later.
detail="A tuning job is currently running; this could take a while.", detail="A tuning job is currently running; this could take a while.",

View file

@ -7,6 +7,7 @@ from typing import Callable
import datasets import datasets
import transformers import transformers
from termcolor import cprint
from transformers.configuration_utils import PretrainedConfig from transformers.configuration_utils import PretrainedConfig
from transformers.tokenization_utils import PreTrainedTokenizer from transformers.tokenization_utils import PreTrainedTokenizer
from transformers.tokenization_utils_fast import PreTrainedTokenizerFast from transformers.tokenization_utils_fast import PreTrainedTokenizerFast
@ -72,6 +73,10 @@ class FullPrecisionFineTuning:
def logs_dir(self): def logs_dir(self):
return self.storage_dir / "logs" return self.storage_dir / "logs"
@property
def hfcache_dir(self):
return self.storage_dir / "hf_cache"
@staticmethod @staticmethod
def check_model_arch_validated(model_config: PretrainedConfig) -> bool: def check_model_arch_validated(model_config: PretrainedConfig) -> bool:
"""Check whether input model architecture from config is among the pre-validated architectures. """Check whether input model architecture from config is among the pre-validated architectures.
@ -98,7 +103,9 @@ class FullPrecisionFineTuning:
PretrainedConfig: model config associated with model. PretrainedConfig: model config associated with model.
""" """
try: try:
model_config: PretrainedConfig = transformers.AutoConfig.from_pretrained(self.model_name_or_path) model_config: PretrainedConfig = transformers.AutoConfig.from_pretrained(
self.model_name_or_path, cache_dir=self.hfcache_dir
)
except OSError: except OSError:
print( print(
f"Attempted to load model config for ({self.model_name_or_path}) but failed. Model config will be loaded by `AutoConfig.from_pretrained()`" f"Attempted to load model config for ({self.model_name_or_path}) but failed. Model config will be loaded by `AutoConfig.from_pretrained()`"
@ -115,7 +122,7 @@ class FullPrecisionFineTuning:
""" """
try: try:
tokenizer: SomePretrainedTokenizer = transformers.AutoTokenizer.from_pretrained( tokenizer: SomePretrainedTokenizer = transformers.AutoTokenizer.from_pretrained(
self.model_name_or_path, use_fast=True self.model_name_or_path, use_fast=True, cache_dir=self.hfcache_dir
) )
except OSError: except OSError:
print( print(
@ -150,6 +157,9 @@ class FullPrecisionFineTuning:
dataset_id=self.training_config.data_config.dataset_id, rows_in_page=-1 dataset_id=self.training_config.data_config.dataset_id, rows_in_page=-1
) )
self.loaded_dataset = dataset.rows self.loaded_dataset = dataset.rows
cprint(
f"Dataset loaded! len: ({len(self.loaded_dataset)}), example row: ({self.loaded_dataset[0]})", color="cyan"
)
def preflight(self, set_status_callback: Callable[[JobStatus], None]): def preflight(self, set_status_callback: Callable[[JobStatus], None]):
"""Set of checks that should run before any heavier-weight preprocessing runs to validate starting state. """Set of checks that should run before any heavier-weight preprocessing runs to validate starting state.
@ -175,19 +185,22 @@ class FullPrecisionFineTuning:
RuntimeError: If tokenizer doesn't have chat template available. RuntimeError: If tokenizer doesn't have chat template available.
OSError: Can be raised via this function if config or tokenizer not available via model's name. OSError: Can be raised via this function if config or tokenizer not available via model's name.
""" """
model_config = self.__try_load_config() model_config = self.__try_load_config()
cprint("Loaded model config", color="cyan")
if not self.check_model_arch_validated(model_config=model_config): if not self.check_model_arch_validated(model_config=model_config):
# could raise Error if we need a strong check against this. # could raise Error if we need a strong check against this.
print( print(
f"Input model ({self.model_name_or_path}) architecture ({model_config.architectures}) is not among validated architectures." f"Input model ({self.model_name_or_path}) architecture ({model_config.architectures}) is not among validated architectures."
) )
cprint("Validated model config", color="cyan")
model_tokenizer = self.__try_load_tokenizer() model_tokenizer = self.__try_load_tokenizer()
cprint("Loaded model tokenizer", color="cyan")
if not self.check_tokenizer_has_chat_template(model_tokenizer): if not self.check_tokenizer_has_chat_template(model_tokenizer):
raise RuntimeError( raise RuntimeError(
f"Input model ({self.model_name_or_path})'s tokenizer ({model_tokenizer.__name__}) has no chat template from associated `tokenizer_config.json`" f"Input model ({self.model_name_or_path})'s tokenizer ({model_tokenizer.__name__}) has no chat template from associated `tokenizer_config.json`"
) )
cprint("Validated model tokenizer", color="cyan")
try: try:
_ = model_tokenizer.apply_chat_template(self.loaded_dataset[0]["messages"]) _ = model_tokenizer.apply_chat_template(self.loaded_dataset[0]["messages"])
@ -198,6 +211,8 @@ class FullPrecisionFineTuning:
) )
raise raise
cprint("Model tokenizer applied template to row.", color="cyan")
# Success! Preflight checks haven't caught any immediate problems. # Success! Preflight checks haven't caught any immediate problems.
set_status_callback(JobStatus.scheduled) set_status_callback(JobStatus.scheduled)
@ -221,7 +236,6 @@ class FullPrecisionFineTuning:
Returns: Returns:
dict[str, list[int]]: Of shape {input_ids, labels, attention_mask, loss_mask} dict[str, list[int]]: Of shape {input_ids, labels, attention_mask, loss_mask}
""" """
input_ids = tokenizer.apply_chat_template(conversation=sample, tokenize=True) input_ids = tokenizer.apply_chat_template(conversation=sample, tokenize=True)
input_ids = typing.cast( input_ids = typing.cast(
list[int], input_ids list[int], input_ids
@ -243,10 +257,17 @@ class FullPrecisionFineTuning:
""" """
dataset = datasets.Dataset.from_list(self.loaded_dataset) dataset = datasets.Dataset.from_list(self.loaded_dataset)
cprint(f"Dataset loaded. Example row: ({dataset[0]})", color="cyan")
model_tok = self.__try_load_tokenizer() model_tok = self.__try_load_tokenizer()
cprint("Tokenizer loaded.", color="cyan")
# NOTE: not implementing as batched for the moment; need to know how batching impacts memory usage on machine. # NOTE: not implementing as batched for the moment; need to know how batching impacts memory usage on machine.
dataset = dataset.map(lambda x: self.__tokenize_and_generate_labels_and_mask(tokenizer=model_tok, sample=x)) dataset = dataset.map(
lambda x: self.__tokenize_and_generate_labels_and_mask(
tokenizer=model_tok, sample=x["messages"]
) # TODO: get this key from input dataset schema
)
dataset = dataset.remove_columns(column_names=["messages"])
return dataset return dataset
def setup(self): def setup(self):
@ -267,8 +288,8 @@ class FullPrecisionFineTuning:
set_subproc_ref_callback (Callable[[subprocess.Process], None]): Sets subprocess reference in 'Impl' class' ref to this job set_subproc_ref_callback (Callable[[subprocess.Process], None]): Sets subprocess reference in 'Impl' class' ref to this job
""" """
training_subproc = await asyncio.create_subprocess_exec( training_subproc = await asyncio.create_subprocess_shell(
"echo 'yay Im running in a subprocess: $$'; sleep 30; echo 'exiting process $$'" 'echo "yay Im running in a subprocess: $$"; sleep 5; echo "exiting subprocess $$"'
) )
set_subproc_ref_callback(training_subproc) set_subproc_ref_callback(training_subproc)
await training_subproc.wait() await training_subproc.wait()

View file

@ -83,7 +83,7 @@ def available_providers() -> List[ProviderSpec]:
api=Api.inference, api=Api.inference,
adapter=AdapterSpec( adapter=AdapterSpec(
adapter_type="ollama", adapter_type="ollama",
pip_packages=["ollama", "aiohttp"], pip_packages=["ollama", "aiohttp", "openai"],
config_class="llama_stack.providers.remote.inference.ollama.OllamaImplConfig", config_class="llama_stack.providers.remote.inference.ollama.OllamaImplConfig",
module="llama_stack.providers.remote.inference.ollama", module="llama_stack.providers.remote.inference.ollama",
), ),

View file

@ -14,7 +14,7 @@ def available_providers() -> List[ProviderSpec]:
InlineProviderSpec( InlineProviderSpec(
api=Api.post_training, api=Api.post_training,
provider_type="inline::torchtune", provider_type="inline::torchtune",
pip_packages=["torch", "torchtune==0.5.0", "torchao==0.8.0", "numpy"], pip_packages=["torch", "torchtune==0.5.0", "torchao==0.8.0", "numpy", "openai"],
module="llama_stack.providers.inline.post_training.torchtune", module="llama_stack.providers.inline.post_training.torchtune",
config_class="llama_stack.providers.inline.post_training.torchtune.TorchtunePostTrainingConfig", config_class="llama_stack.providers.inline.post_training.torchtune.TorchtunePostTrainingConfig",
api_dependencies=[ api_dependencies=[
@ -25,7 +25,7 @@ def available_providers() -> List[ProviderSpec]:
InlineProviderSpec( InlineProviderSpec(
api=Api.post_training, api=Api.post_training,
provider_type="inline::huggingface-ilab", provider_type="inline::huggingface-ilab",
pip_packages=["torch", "transformers", "datasets", "numpy"], pip_packages=["torch", "transformers", "datasets", "numpy", "openai"],
module="llama_stack.providers.inline.post_training.huggingface_ilab", module="llama_stack.providers.inline.post_training.huggingface_ilab",
config_class="llama_stack.providers.inline.post_training.huggingface_ilab.HFilabPostTrainingConfig", config_class="llama_stack.providers.inline.post_training.huggingface_ilab.HFilabPostTrainingConfig",
api_dependencies=[ api_dependencies=[

View file

@ -4,7 +4,6 @@
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
import asyncio
import base64 import base64
import io import io
from urllib.parse import unquote from urllib.parse import unquote
@ -17,12 +16,11 @@ from llama_stack.providers.utils.memory.vector_store import parse_data_url
async def get_dataframe_from_uri(uri: str): async def get_dataframe_from_uri(uri: str):
df = None df = None
if uri.endswith(".csv"): if uri.endswith(".csv"):
# Moving to its own thread to avoid io from blocking the eventloop df = pandas.read_csv(uri)
# This isn't ideal as it moves more then just the IO to a new thread
# but it is as close as we can easly get
df = await asyncio.to_thread(pandas.read_csv, uri)
elif uri.endswith(".xlsx"): elif uri.endswith(".xlsx"):
df = await asyncio.to_thread(pandas.read_excel, uri) df = pandas.read_excel(uri)
elif uri.endswith(".jsonl"):
df = pandas.read_json(uri, lines=True)
elif uri.startswith("data:"): elif uri.startswith("data:"):
parts = parse_data_url(uri) parts = parse_data_url(uri)
data = parts["data"] data = parts["data"]