Merge branch 'main' into add-watsonx-inference-adapter

This commit is contained in:
Sajikumar JS 2025-04-25 10:57:45 +05:30
commit 6fe8b292b1
74 changed files with 5033 additions and 1685 deletions

View file

@ -253,7 +253,8 @@ class MetaReferenceInferenceImpl(
def impl():
stop_reason = None
for token_result in self.generator.completion(request):
for token_results in self.generator.completion([request]):
token_result = token_results[0]
if token_result.token == tokenizer.eot_id:
stop_reason = StopReason.end_of_turn
text = ""
@ -515,7 +516,8 @@ class MetaReferenceInferenceImpl(
stop_reason = None
ipython = False
for token_result in self.generator.chat_completion(request):
for token_results in self.generator.chat_completion([request]):
token_result = token_results[0]
if os.environ.get("LLAMA_MODELS_DEBUG", "0") == "1":
cprint(token_result.text, "cyan", end="")
if os.environ.get("LLAMA_MODELS_DEBUG", "0") == "2":

View file

@ -69,7 +69,10 @@ class CancelSentinel(BaseModel):
class TaskRequest(BaseModel):
type: Literal[ProcessingMessageName.task_request] = ProcessingMessageName.task_request
task: Tuple[str, List[CompletionRequestWithRawContent] | List[ChatCompletionRequestWithRawContent]]
task: Tuple[
str,
List[CompletionRequestWithRawContent] | List[ChatCompletionRequestWithRawContent],
]
class TaskResponse(BaseModel):
@ -231,10 +234,10 @@ def worker_process_entrypoint(
while True:
try:
task = req_gen.send(result)
if isinstance(task, str) and task == EndSentinel():
if isinstance(task, EndSentinel):
break
assert isinstance(task, TaskRequest)
assert isinstance(task, TaskRequest), task
result = model(task.task)
except StopIteration:
break
@ -331,7 +334,10 @@ class ModelParallelProcessGroup:
def run_inference(
self,
req: Tuple[str, List[CompletionRequestWithRawContent] | List[ChatCompletionRequestWithRawContent]],
req: Tuple[
str,
List[CompletionRequestWithRawContent] | List[ChatCompletionRequestWithRawContent],
],
) -> Generator:
assert not self.running, "inference already running"

View file

@ -33,6 +33,7 @@ from llama_stack.apis.tools import (
)
from llama_stack.apis.vector_io import QueryChunksResponse, VectorIO
from llama_stack.providers.datatypes import ToolsProtocolPrivate
from llama_stack.providers.utils.inference.prompt_adapter import interleaved_content_as_str
from llama_stack.providers.utils.memory.vector_store import (
content_from_doc,
make_overlapped_chunks,
@ -153,6 +154,11 @@ class MemoryToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, RAGToolRuntime):
)
)
picked.append(TextContentItem(text="END of knowledge_search tool results.\n"))
picked.append(
TextContentItem(
text=f'The above results were retrieved to help answer the user\'s query: "{interleaved_content_as_str(content)}". Use them as supporting information only in answering this query.\n',
)
)
return RAGQueryResult(
content=picked,

View file

@ -6,7 +6,7 @@
from typing import List
from llama_stack.providers.datatypes import Api, InlineProviderSpec, ProviderSpec
from llama_stack.providers.datatypes import AdapterSpec, Api, InlineProviderSpec, ProviderSpec, remote_provider_spec
def available_providers() -> List[ProviderSpec]:
@ -25,4 +25,22 @@ def available_providers() -> List[ProviderSpec]:
Api.agents,
],
),
remote_provider_spec(
api=Api.eval,
adapter=AdapterSpec(
adapter_type="nvidia",
pip_packages=[
"requests",
],
module="llama_stack.providers.remote.eval.nvidia",
config_class="llama_stack.providers.remote.eval.nvidia.NVIDIAEvalConfig",
),
api_dependencies=[
Api.datasetio,
Api.datasets,
Api.scoring,
Api.inference,
Api.agents,
],
),
]

View file

@ -0,0 +1,5 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -0,0 +1,134 @@
# NVIDIA NeMo Evaluator Eval Provider
## Overview
For the first integration, Benchmarks are mapped to Evaluation Configs on in the NeMo Evaluator. The full evaluation config object is provided as part of the meta-data. The `dataset_id` and `scoring_functions` are not used.
Below are a few examples of how to register a benchmark, which in turn will create an evaluation config in NeMo Evaluator and how to trigger an evaluation.
### Example for register an academic benchmark
```
POST /eval/benchmarks
```
```json
{
"benchmark_id": "mmlu",
"dataset_id": "",
"scoring_functions": [],
"metadata": {
"type": "mmlu"
}
}
```
### Example for register a custom evaluation
```
POST /eval/benchmarks
```
```json
{
"benchmark_id": "my-custom-benchmark",
"dataset_id": "",
"scoring_functions": [],
"metadata": {
"type": "custom",
"params": {
"parallelism": 8
},
"tasks": {
"qa": {
"type": "completion",
"params": {
"template": {
"prompt": "{{prompt}}",
"max_tokens": 200
}
},
"dataset": {
"files_url": "hf://datasets/default/sample-basic-test/testing/testing.jsonl"
},
"metrics": {
"bleu": {
"type": "bleu",
"params": {
"references": [
"{{ideal_response}}"
]
}
}
}
}
}
}
}
```
### Example for triggering a benchmark/custom evaluation
```
POST /eval/benchmarks/{benchmark_id}/jobs
```
```json
{
"benchmark_id": "my-custom-benchmark",
"benchmark_config": {
"eval_candidate": {
"type": "model",
"model": "meta-llama/Llama3.1-8B-Instruct",
"sampling_params": {
"max_tokens": 100,
"temperature": 0.7
}
},
"scoring_params": {}
}
}
```
Response example:
```json
{
"job_id": "eval-1234",
"status": "in_progress"
}
```
### Example for getting the status of a job
```
GET /eval/benchmarks/{benchmark_id}/jobs/{job_id}
```
Response example:
```json
{
"job_id": "eval-1234",
"status": "in_progress"
}
```
### Example for cancelling a job
```
POST /eval/benchmarks/{benchmark_id}/jobs/{job_id}/cancel
```
### Example for getting the results
```
GET /eval/benchmarks/{benchmark_id}/results
```
```json
{
"generations": [],
"scores": {
"{benchmark_id}": {
"score_rows": [],
"aggregated_results": {
"tasks": {},
"groups": {}
}
}
}
}
```

View file

@ -0,0 +1,31 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, Dict
from llama_stack.distribution.datatypes import Api
from .config import NVIDIAEvalConfig
async def get_adapter_impl(
config: NVIDIAEvalConfig,
deps: Dict[Api, Any],
):
from .eval import NVIDIAEvalImpl
impl = NVIDIAEvalImpl(
config,
deps[Api.datasetio],
deps[Api.datasets],
deps[Api.scoring],
deps[Api.inference],
deps[Api.agents],
)
await impl.initialize()
return impl
__all__ = ["get_adapter_impl", "NVIDIAEvalImpl"]

View file

@ -0,0 +1,29 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import os
from typing import Any, Dict
from pydantic import BaseModel, Field
class NVIDIAEvalConfig(BaseModel):
"""
Configuration for the NVIDIA NeMo Evaluator microservice endpoint.
Attributes:
evaluator_url (str): A base url for accessing the NVIDIA evaluation endpoint, e.g. http://localhost:8000.
"""
evaluator_url: str = Field(
default_factory=lambda: os.getenv("NVIDIA_EVALUATOR_URL", "http://0.0.0.0:7331"),
description="The url for accessing the evaluator service",
)
@classmethod
def sample_run_config(cls, **kwargs) -> Dict[str, Any]:
return {
"evaluator_url": "${env.NVIDIA_EVALUATOR_URL:http://localhost:7331}",
}

View file

@ -0,0 +1,154 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, Dict, List
import requests
from llama_stack.apis.agents import Agents
from llama_stack.apis.benchmarks import Benchmark
from llama_stack.apis.datasetio import DatasetIO
from llama_stack.apis.datasets import Datasets
from llama_stack.apis.inference import Inference
from llama_stack.apis.scoring import Scoring, ScoringResult
from llama_stack.providers.datatypes import BenchmarksProtocolPrivate
from llama_stack.providers.remote.inference.nvidia.models import MODEL_ENTRIES
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper
from .....apis.common.job_types import Job, JobStatus
from .....apis.eval.eval import BenchmarkConfig, Eval, EvaluateResponse
from .config import NVIDIAEvalConfig
DEFAULT_NAMESPACE = "nvidia"
class NVIDIAEvalImpl(
Eval,
BenchmarksProtocolPrivate,
ModelRegistryHelper,
):
def __init__(
self,
config: NVIDIAEvalConfig,
datasetio_api: DatasetIO,
datasets_api: Datasets,
scoring_api: Scoring,
inference_api: Inference,
agents_api: Agents,
) -> None:
self.config = config
self.datasetio_api = datasetio_api
self.datasets_api = datasets_api
self.scoring_api = scoring_api
self.inference_api = inference_api
self.agents_api = agents_api
ModelRegistryHelper.__init__(self, model_entries=MODEL_ENTRIES)
async def initialize(self) -> None: ...
async def shutdown(self) -> None: ...
async def _evaluator_get(self, path):
"""Helper for making GET requests to the evaluator service."""
response = requests.get(url=f"{self.config.evaluator_url}{path}")
response.raise_for_status()
return response.json()
async def _evaluator_post(self, path, data):
"""Helper for making POST requests to the evaluator service."""
response = requests.post(url=f"{self.config.evaluator_url}{path}", json=data)
response.raise_for_status()
return response.json()
async def register_benchmark(self, task_def: Benchmark) -> None:
"""Register a benchmark as an evaluation configuration."""
await self._evaluator_post(
"/v1/evaluation/configs",
{
"namespace": DEFAULT_NAMESPACE,
"name": task_def.benchmark_id,
# metadata is copied to request body as-is
**task_def.metadata,
},
)
async def run_eval(
self,
benchmark_id: str,
benchmark_config: BenchmarkConfig,
) -> Job:
"""Run an evaluation job for a benchmark."""
model = (
benchmark_config.eval_candidate.model
if benchmark_config.eval_candidate.type == "model"
else benchmark_config.eval_candidate.config.model
)
nvidia_model = self.get_provider_model_id(model) or model
result = await self._evaluator_post(
"/v1/evaluation/jobs",
{
"config": f"{DEFAULT_NAMESPACE}/{benchmark_id}",
"target": {"type": "model", "model": nvidia_model},
},
)
return Job(job_id=result["id"], status=JobStatus.in_progress)
async def evaluate_rows(
self,
benchmark_id: str,
input_rows: List[Dict[str, Any]],
scoring_functions: List[str],
benchmark_config: BenchmarkConfig,
) -> EvaluateResponse:
raise NotImplementedError()
async def job_status(self, benchmark_id: str, job_id: str) -> Job:
"""Get the status of an evaluation job.
EvaluatorStatus: "created", "pending", "running", "cancelled", "cancelling", "failed", "completed".
JobStatus: "scheduled", "in_progress", "completed", "cancelled", "failed"
"""
result = await self._evaluator_get(f"/v1/evaluation/jobs/{job_id}")
result_status = result["status"]
job_status = JobStatus.failed
if result_status in ["created", "pending"]:
job_status = JobStatus.scheduled
elif result_status in ["running"]:
job_status = JobStatus.in_progress
elif result_status in ["completed"]:
job_status = JobStatus.completed
elif result_status in ["cancelled"]:
job_status = JobStatus.cancelled
return Job(job_id=job_id, status=job_status)
async def job_cancel(self, benchmark_id: str, job_id: str) -> None:
"""Cancel the evaluation job."""
await self._evaluator_post(f"/v1/evaluation/jobs/{job_id}/cancel", {})
async def job_result(self, benchmark_id: str, job_id: str) -> EvaluateResponse:
"""Returns the results of the evaluation job."""
job = await self.job_status(benchmark_id, job_id)
status = job.status
if not status or status != JobStatus.completed:
raise ValueError(f"Job {job_id} not completed. Status: {status.value}")
result = await self._evaluator_get(f"/v1/evaluation/jobs/{job_id}/results")
return EvaluateResponse(
# TODO: these are stored in detailed results on NeMo Evaluator side; can be added
generations=[],
scores={
benchmark_id: ScoringResult(
score_rows=[],
aggregated_results=result,
)
},
)

View file

@ -362,6 +362,39 @@ class FireworksInferenceAdapter(ModelRegistryHelper, Inference, NeedsRequestProv
user: Optional[str] = None,
) -> Union[OpenAIChatCompletion, AsyncIterator[OpenAIChatCompletionChunk]]:
model_obj = await self.model_store.get_model(model)
# Divert Llama Models through Llama Stack inference APIs because
# Fireworks chat completions OpenAI-compatible API does not support
# tool calls properly.
llama_model = self.get_llama_model(model_obj.provider_resource_id)
if llama_model:
return await OpenAIChatCompletionToLlamaStackMixin.openai_chat_completion(
self,
model=model,
messages=messages,
frequency_penalty=frequency_penalty,
function_call=function_call,
functions=functions,
logit_bias=logit_bias,
logprobs=logprobs,
max_completion_tokens=max_completion_tokens,
max_tokens=max_tokens,
n=n,
parallel_tool_calls=parallel_tool_calls,
presence_penalty=presence_penalty,
response_format=response_format,
seed=seed,
stop=stop,
stream=stream,
stream_options=stream_options,
temperature=temperature,
tool_choice=tool_choice,
tools=tools,
top_logprobs=top_logprobs,
top_p=top_p,
user=user,
)
params = await prepare_openai_completion_params(
messages=messages,
frequency_penalty=frequency_penalty,
@ -387,11 +420,4 @@ class FireworksInferenceAdapter(ModelRegistryHelper, Inference, NeedsRequestProv
user=user,
)
# Divert Llama Models through Llama Stack inference APIs because
# Fireworks chat completions OpenAI-compatible API does not support
# tool calls properly.
llama_model = self.get_llama_model(model_obj.provider_resource_id)
if llama_model:
return await OpenAIChatCompletionToLlamaStackMixin.openai_chat_completion(self, model=model, **params)
return await self._get_openai_client().chat.completions.create(model=model_obj.provider_resource_id, **params)

View file

@ -47,10 +47,15 @@ class NVIDIAConfig(BaseModel):
default=60,
description="Timeout for the HTTP requests",
)
append_api_version: bool = Field(
default_factory=lambda: os.getenv("NVIDIA_APPEND_API_VERSION", "True").lower() != "false",
description="When set to false, the API version will not be appended to the base_url. By default, it is true.",
)
@classmethod
def sample_run_config(cls, **kwargs) -> Dict[str, Any]:
return {
"url": "${env.NVIDIA_BASE_URL:https://integrate.api.nvidia.com}",
"api_key": "${env.NVIDIA_API_KEY:}",
"append_api_version": "${env.NVIDIA_APPEND_API_VERSION:True}",
}

View file

@ -33,7 +33,6 @@ from llama_stack.apis.inference import (
TextTruncation,
ToolChoice,
ToolConfig,
ToolDefinition,
)
from llama_stack.apis.inference.inference import (
OpenAIChatCompletion,
@ -42,7 +41,11 @@ from llama_stack.apis.inference.inference import (
OpenAIMessageParam,
OpenAIResponseFormatParam,
)
from llama_stack.models.llama.datatypes import ToolPromptFormat
from llama_stack.apis.models import Model, ModelType
from llama_stack.models.llama.datatypes import ToolDefinition, ToolPromptFormat
from llama_stack.providers.utils.inference import (
ALL_HUGGINGFACE_REPOS_TO_MODEL_DESCRIPTOR,
)
from llama_stack.providers.utils.inference.model_registry import (
ModelRegistryHelper,
)
@ -120,12 +123,20 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
"meta/llama-3.2-90b-vision-instruct": "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct",
}
base_url = f"{self._config.url}/v1"
base_url = f"{self._config.url}/v1" if self._config.append_api_version else self._config.url
if _is_nvidia_hosted(self._config) and provider_model_id in special_model_urls:
base_url = special_model_urls[provider_model_id]
return _get_client_for_base_url(base_url)
async def _get_provider_model_id(self, model_id: str) -> str:
if not self.model_store:
raise RuntimeError("Model store is not set")
model = await self.model_store.get_model(model_id)
if model is None:
raise ValueError(f"Model {model_id} is unknown")
return model.provider_model_id
async def completion(
self,
model_id: str,
@ -144,7 +155,7 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
# removing this health check as NeMo customizer endpoint health check is returning 404
# await check_health(self._config) # this raises errors
provider_model_id = self.get_provider_model_id(model_id)
provider_model_id = await self._get_provider_model_id(model_id)
request = convert_completion_request(
request=CompletionRequest(
model=provider_model_id,
@ -188,7 +199,7 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
#
flat_contents = [content.text if isinstance(content, TextContentItem) else content for content in contents]
input = [content.text if isinstance(content, TextContentItem) else content for content in flat_contents]
model = self.get_provider_model_id(model_id)
provider_model_id = await self._get_provider_model_id(model_id)
extra_body = {}
@ -211,8 +222,8 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
extra_body["input_type"] = task_type_options[task_type]
try:
response = await self._get_client(model).embeddings.create(
model=model,
response = await self._get_client(provider_model_id).embeddings.create(
model=provider_model_id,
input=input,
extra_body=extra_body,
)
@ -246,10 +257,10 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
# await check_health(self._config) # this raises errors
provider_model_id = self.get_provider_model_id(model_id)
provider_model_id = await self._get_provider_model_id(model_id)
request = await convert_chat_completion_request(
request=ChatCompletionRequest(
model=self.get_provider_model_id(model_id),
model=provider_model_id,
messages=messages,
sampling_params=sampling_params,
response_format=response_format,
@ -294,7 +305,7 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
guided_choice: Optional[List[str]] = None,
prompt_logprobs: Optional[int] = None,
) -> OpenAICompletion:
provider_model_id = self.get_provider_model_id(model)
provider_model_id = await self._get_provider_model_id(model)
params = await prepare_openai_completion_params(
model=provider_model_id,
@ -347,7 +358,7 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
top_p: Optional[float] = None,
user: Optional[str] = None,
) -> Union[OpenAIChatCompletion, AsyncIterator[OpenAIChatCompletionChunk]]:
provider_model_id = self.get_provider_model_id(model)
provider_model_id = await self._get_provider_model_id(model)
params = await prepare_openai_completion_params(
model=provider_model_id,
@ -379,3 +390,44 @@ class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper):
return await self._get_client(provider_model_id).chat.completions.create(**params)
except APIConnectionError as e:
raise ConnectionError(f"Failed to connect to NVIDIA NIM at {self._config.url}: {e}") from e
async def register_model(self, model: Model) -> Model:
"""
Allow non-llama model registration.
Non-llama model registration: API Catalogue models, post-training models, etc.
client = LlamaStackAsLibraryClient("nvidia")
client.models.register(
model_id="mistralai/mixtral-8x7b-instruct-v0.1",
model_type=ModelType.llm,
provider_id="nvidia",
provider_model_id="mistralai/mixtral-8x7b-instruct-v0.1"
)
NOTE: Only supports models endpoints compatible with AsyncOpenAI base_url format.
"""
if model.model_type == ModelType.embedding:
# embedding models are always registered by their provider model id and does not need to be mapped to a llama model
provider_resource_id = model.provider_resource_id
else:
provider_resource_id = self.get_provider_model_id(model.provider_resource_id)
if provider_resource_id:
model.provider_resource_id = provider_resource_id
else:
llama_model = model.metadata.get("llama_model")
existing_llama_model = self.get_llama_model(model.provider_resource_id)
if existing_llama_model:
if existing_llama_model != llama_model:
raise ValueError(
f"Provider model id '{model.provider_resource_id}' is already registered to a different llama model: '{existing_llama_model}'"
)
else:
# not llama model
if llama_model in ALL_HUGGINGFACE_REPOS_TO_MODEL_DESCRIPTOR:
self.provider_id_to_llama_model_map[model.provider_resource_id] = (
ALL_HUGGINGFACE_REPOS_TO_MODEL_DESCRIPTOR[llama_model]
)
else:
self.alias_to_provider_id_map[model.provider_model_id] = model.provider_model_id
return model

View file

@ -76,8 +76,11 @@ class TogetherInferenceAdapter(ModelRegistryHelper, Inference, NeedsRequestProvi
async def shutdown(self) -> None:
if self._client:
await self._client.close()
# Together client has no close method, so just set to None
self._client = None
if self._openai_client:
await self._openai_client.close()
self._openai_client = None
async def completion(
self,
@ -359,7 +362,7 @@ class TogetherInferenceAdapter(ModelRegistryHelper, Inference, NeedsRequestProvi
top_p=top_p,
user=user,
)
if params.get("stream", True):
if params.get("stream", False):
return self._stream_openai_chat_completion(params)
return await self._get_openai_client().chat.completions.create(**params) # type: ignore

View file

@ -231,12 +231,7 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
self.client = None
async def initialize(self) -> None:
log.info(f"Initializing VLLM client with base_url={self.config.url}")
self.client = AsyncOpenAI(
base_url=self.config.url,
api_key=self.config.api_token,
http_client=None if self.config.tls_verify else httpx.AsyncClient(verify=False),
)
pass
async def shutdown(self) -> None:
pass
@ -249,6 +244,20 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
raise ValueError("Model store not set")
return await self.model_store.get_model(model_id)
def _lazy_initialize_client(self):
if self.client is not None:
return
log.info(f"Initializing vLLM client with base_url={self.config.url}")
self.client = self._create_client()
def _create_client(self):
return AsyncOpenAI(
base_url=self.config.url,
api_key=self.config.api_token,
http_client=None if self.config.tls_verify else httpx.AsyncClient(verify=False),
)
async def completion(
self,
model_id: str,
@ -258,6 +267,7 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
stream: Optional[bool] = False,
logprobs: Optional[LogProbConfig] = None,
) -> CompletionResponse | AsyncGenerator[CompletionResponseStreamChunk, None]:
self._lazy_initialize_client()
if sampling_params is None:
sampling_params = SamplingParams()
model = await self._get_model(model_id)
@ -287,6 +297,7 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
logprobs: Optional[LogProbConfig] = None,
tool_config: Optional[ToolConfig] = None,
) -> ChatCompletionResponse | AsyncGenerator[ChatCompletionResponseStreamChunk, None]:
self._lazy_initialize_client()
if sampling_params is None:
sampling_params = SamplingParams()
model = await self._get_model(model_id)
@ -357,9 +368,12 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
yield chunk
async def register_model(self, model: Model) -> Model:
assert self.client is not None
# register_model is called during Llama Stack initialization, hence we cannot init self.client if not initialized yet.
# self.client should only be created after the initialization is complete to avoid asyncio cross-context errors.
# Changing this may lead to unpredictable behavior.
client = self._create_client() if self.client is None else self.client
model = await self.register_helper.register_model(model)
res = await self.client.models.list()
res = await client.models.list()
available_models = [m.id async for m in res]
if model.provider_resource_id not in available_models:
raise ValueError(
@ -410,6 +424,7 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
output_dimension: Optional[int] = None,
task_type: Optional[EmbeddingTaskType] = None,
) -> EmbeddingsResponse:
self._lazy_initialize_client()
assert self.client is not None
model = await self._get_model(model_id)
@ -449,6 +464,7 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
guided_choice: Optional[List[str]] = None,
prompt_logprobs: Optional[int] = None,
) -> OpenAICompletion:
self._lazy_initialize_client()
model_obj = await self._get_model(model)
extra_body: Dict[str, Any] = {}
@ -505,6 +521,7 @@ class VLLMInferenceAdapter(Inference, ModelsProtocolPrivate):
top_p: Optional[float] = None,
user: Optional[str] = None,
) -> Union[OpenAIChatCompletion, AsyncIterator[OpenAIChatCompletionChunk]]:
self._lazy_initialize_client()
model_obj = await self._get_model(model)
params = await prepare_openai_completion_params(
model=model_obj.provider_resource_id,

View file

@ -36,7 +36,6 @@ import os
os.environ["NVIDIA_API_KEY"] = "your-api-key"
os.environ["NVIDIA_CUSTOMIZER_URL"] = "http://nemo.test"
os.environ["NVIDIA_USER_ID"] = "llama-stack-user"
os.environ["NVIDIA_DATASET_NAMESPACE"] = "default"
os.environ["NVIDIA_PROJECT_ID"] = "test-project"
os.environ["NVIDIA_OUTPUT_MODEL_DIR"] = "test-example-model@v1"
@ -125,6 +124,21 @@ client.post_training.job.cancel(job_uuid="your-job-id")
### Inference with the fine-tuned model
#### 1. Register the model
```python
from llama_stack.apis.models import Model, ModelType
client.models.register(
model_id="test-example-model@v1",
provider_id="nvidia",
provider_model_id="test-example-model@v1",
model_type=ModelType.llm,
)
```
#### 2. Inference with the fine-tuned model
```python
response = client.inference.completion(
content="Complete the sentence using one word: Roses are red, violets are ",

View file

@ -8,7 +8,17 @@ import logging
import time
import uuid
import warnings
from typing import Any, AsyncGenerator, AsyncIterator, Awaitable, Dict, Iterable, List, Optional, Union
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Awaitable,
Dict,
Iterable,
List,
Optional,
Union,
)
from openai import AsyncStream
from openai.types.chat import (
@ -78,6 +88,7 @@ from llama_stack.apis.common.content_types import (
TextDelta,
ToolCallDelta,
ToolCallParseStatus,
_URLOrData,
)
from llama_stack.apis.inference import (
ChatCompletionRequest,
@ -93,6 +104,7 @@ from llama_stack.apis.inference import (
SamplingParams,
SystemMessage,
TokenLogProbs,
ToolChoice,
ToolResponseMessage,
TopKSamplingStrategy,
TopPSamplingStrategy,
@ -103,7 +115,6 @@ from llama_stack.apis.inference.inference import (
OpenAIChatCompletion,
OpenAICompletion,
OpenAICompletionChoice,
OpenAIMessageParam,
OpenAIResponseFormatParam,
ToolConfig,
)
@ -513,11 +524,26 @@ async def convert_message_to_openai_dict(message: Message, download: bool = Fals
else:
content = [await _convert_content(message.content)]
return {
result = {
"role": message.role,
"content": content,
}
if hasattr(message, "tool_calls") and message.tool_calls:
result["tool_calls"] = []
for tc in message.tool_calls:
result["tool_calls"].append(
{
"id": tc.call_id,
"type": "function",
"function": {
"name": tc.tool_name,
"arguments": tc.arguments_json if hasattr(tc, "arguments_json") else json.dumps(tc.arguments),
},
}
)
return result
class UnparseableToolCall(BaseModel):
"""
@ -612,13 +638,10 @@ async def convert_message_to_openai_dict_new(
)
for tool in message.tool_calls
]
params = {}
if tool_calls:
params = {"tool_calls": tool_calls}
out = OpenAIChatCompletionAssistantMessage(
role="assistant",
content=await _convert_message_content(message.content),
**params,
tool_calls=tool_calls or None,
)
elif isinstance(message, ToolResponseMessage):
out = OpenAIChatCompletionToolMessage(
@ -695,7 +718,10 @@ def to_openai_param_type(param_type: str) -> dict:
if param_type.startswith("list[") and param_type.endswith("]"):
inner_type = param_type[5:-1]
if inner_type in basic_types:
return {"type": "array", "items": {"type": basic_types.get(inner_type, inner_type)}}
return {
"type": "array",
"items": {"type": basic_types.get(inner_type, inner_type)},
}
return {"type": param_type}
@ -815,6 +841,10 @@ def _convert_openai_finish_reason(finish_reason: str) -> StopReason:
def _convert_openai_request_tool_config(tool_choice: Optional[Union[str, Dict[str, Any]]] = None) -> ToolConfig:
tool_config = ToolConfig()
if tool_choice:
try:
tool_choice = ToolChoice(tool_choice)
except ValueError:
pass
tool_config.tool_choice = tool_choice
return tool_config
@ -849,7 +879,9 @@ def _convert_openai_request_tools(tools: Optional[List[Dict[str, Any]]] = None)
return lls_tools
def _convert_openai_request_response_format(response_format: OpenAIResponseFormatParam = None):
def _convert_openai_request_response_format(
response_format: OpenAIResponseFormatParam = None,
):
if not response_format:
return None
# response_format can be a dict or a pydantic model
@ -957,38 +989,50 @@ def _convert_openai_sampling_params(
return sampling_params
def _convert_openai_request_messages(messages: List[OpenAIMessageParam]):
# Llama Stack messages and OpenAI messages are similar, but not identical.
lls_messages = []
def openai_messages_to_messages(
messages: List[OpenAIChatCompletionMessage],
) -> List[Message]:
"""
Convert a list of OpenAIChatCompletionMessage into a list of Message.
"""
converted_messages = []
for message in messages:
lls_message = dict(message)
if message.role == "system":
converted_message = SystemMessage(content=message.content)
elif message.role == "user":
converted_message = UserMessage(content=openai_content_to_content(message.content))
elif message.role == "assistant":
converted_message = CompletionMessage(
content=message.content,
tool_calls=_convert_openai_tool_calls(message.tool_calls),
stop_reason=StopReason.end_of_turn,
)
elif message.role == "tool":
converted_message = ToolResponseMessage(
role="tool",
call_id=message.tool_call_id,
content=openai_content_to_content(message.content),
)
else:
raise ValueError(f"Unknown role {message.role}")
converted_messages.append(converted_message)
return converted_messages
# Llama Stack expects `call_id` but OpenAI uses `tool_call_id`
tool_call_id = lls_message.pop("tool_call_id", None)
if tool_call_id:
lls_message["call_id"] = tool_call_id
content = lls_message.get("content", None)
if isinstance(content, list):
lls_content = []
for item in content:
# items can either by pydantic models or dicts here...
item = dict(item)
if item.get("type", "") == "image_url":
lls_item = ImageContentItem(
type="image",
image=URL(uri=item.get("image_url", {}).get("url", "")),
)
elif item.get("type", "") == "text":
lls_item = TextContentItem(
type="text",
text=item.get("text", ""),
)
lls_content.append(lls_item)
lls_message["content"] = lls_content
lls_messages.append(lls_message)
return lls_messages
def openai_content_to_content(content: Union[str, Iterable[OpenAIChatCompletionContentPartParam]]):
if isinstance(content, str):
return content
elif isinstance(content, list):
return [openai_content_to_content(c) for c in content]
elif hasattr(content, "type"):
if content.type == "text":
return TextContentItem(type="text", text=content.text)
elif content.type == "image_url":
return ImageContentItem(type="image", image=_URLOrData(url=URL(uri=content.image_url.url)))
else:
raise ValueError(f"Unknown content type: {content.type}")
else:
raise ValueError(f"Unknown content type: {content}")
def convert_openai_chat_completion_choice(
@ -1313,7 +1357,7 @@ class OpenAIChatCompletionToLlamaStackMixin:
top_p: Optional[float] = None,
user: Optional[str] = None,
) -> Union[OpenAIChatCompletion, AsyncIterator[OpenAIChatCompletionChunk]]:
messages = _convert_openai_request_messages(messages)
messages = openai_messages_to_messages(messages)
response_format = _convert_openai_request_response_format(response_format)
sampling_params = _convert_openai_sampling_params(
max_tokens=max_tokens,
@ -1321,7 +1365,10 @@ class OpenAIChatCompletionToLlamaStackMixin:
top_p=top_p,
)
tool_config = _convert_openai_request_tool_config(tool_choice)
tools = _convert_openai_request_tools(tools)
if tool_config.tool_choice == ToolChoice.none:
tools = []
outstanding_responses = []
# "n" is the number of completions to generate per prompt
@ -1346,7 +1393,9 @@ class OpenAIChatCompletionToLlamaStackMixin:
)
async def _process_stream_response(
self, model: str, outstanding_responses: List[Awaitable[AsyncIterator[ChatCompletionResponseStreamChunk]]]
self,
model: str,
outstanding_responses: List[Awaitable[AsyncIterator[ChatCompletionResponseStreamChunk]]],
):
id = f"chatcmpl-{uuid.uuid4()}"
for outstanding_response in outstanding_responses:
@ -1369,11 +1418,31 @@ class OpenAIChatCompletionToLlamaStackMixin:
elif isinstance(event.delta, ToolCallDelta):
if event.delta.parse_status == ToolCallParseStatus.succeeded:
tool_call = event.delta.tool_call
# First chunk includes full structure
openai_tool_call = OpenAIChoiceDeltaToolCall(
index=0,
id=tool_call.call_id,
function=OpenAIChoiceDeltaToolCallFunction(
name=tool_call.tool_name, arguments=tool_call.arguments_json
name=tool_call.tool_name,
arguments="",
),
)
delta = OpenAIChoiceDelta(tool_calls=[openai_tool_call])
yield OpenAIChatCompletionChunk(
id=id,
choices=[
OpenAIChatCompletionChunkChoice(index=i, finish_reason=finish_reason, delta=delta)
],
created=int(time.time()),
model=model,
object="chat.completion.chunk",
)
# arguments
openai_tool_call = OpenAIChoiceDeltaToolCall(
index=0,
function=OpenAIChoiceDeltaToolCallFunction(
arguments=tool_call.arguments_json,
),
)
delta = OpenAIChoiceDelta(tool_calls=[openai_tool_call])