diff --git a/llama_stack/providers/remote/inference/nvidia/_config.py b/llama_stack/providers/remote/inference/nvidia/_config.py deleted file mode 100644 index 7934a0f05..000000000 --- a/llama_stack/providers/remote/inference/nvidia/_config.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import os -from typing import Any, Dict, Optional - -from llama_models.schema_utils import json_schema_type -from pydantic import BaseModel, Field - - -@json_schema_type -class NVIDIAConfig(BaseModel): - """ - Configuration for the NVIDIA NIM inference endpoint. - - Attributes: - url (str): A base url for accessing the NVIDIA NIM, e.g. http://localhost:8000 - api_key (str): The access key for the hosted NIM endpoints - - There are two ways to access NVIDIA NIMs - - 0. Hosted: Preview APIs hosted at https://integrate.api.nvidia.com - 1. Self-hosted: You can run NVIDIA NIMs on your own infrastructure - - By default the configuration is set to use the hosted APIs. This requires - an API key which can be obtained from https://ngc.nvidia.com/. - - By default the configuration will attempt to read the NVIDIA_API_KEY environment - variable to set the api_key. Please do not put your API key in code. - - If you are using a self-hosted NVIDIA NIM, you can set the url to the - URL of your running NVIDIA NIM and do not need to set the api_key. - """ - - url: str = Field( - default="https://integrate.api.nvidia.com", - description="A base url for accessing the NVIDIA NIM", - ) - api_key: Optional[str] = Field( - default_factory=lambda: os.getenv("NVIDIA_API_KEY"), - description="The NVIDIA API key, only needed of using the hosted service", - ) - timeout: int = Field( - default=60, - description="Timeout for the HTTP requests", - ) - - @property - def is_hosted(self) -> bool: - return "integrate.api.nvidia.com" in self.url - - @classmethod - def sample_run_config(cls, **kwargs) -> Dict[str, Any]: - return { - "url": "https://integrate.api.nvidia.com", - "api_key": "${env.NVIDIA_API_KEY}", - } diff --git a/llama_stack/providers/remote/inference/nvidia/_nvidia.py b/llama_stack/providers/remote/inference/nvidia/_nvidia.py deleted file mode 100644 index 92c4e1cfb..000000000 --- a/llama_stack/providers/remote/inference/nvidia/_nvidia.py +++ /dev/null @@ -1,182 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import warnings -from typing import AsyncIterator, List, Optional, Union - -from llama_models.datatypes import SamplingParams -from llama_models.llama3.api.datatypes import ( - InterleavedTextMedia, - Message, - ToolChoice, - ToolDefinition, - ToolPromptFormat, -) -from llama_models.sku_list import CoreModelId -from openai import APIConnectionError, AsyncOpenAI - -from llama_stack.apis.inference import ( - ChatCompletionRequest, - ChatCompletionResponse, - ChatCompletionResponseStreamChunk, - CompletionResponse, - CompletionResponseStreamChunk, - EmbeddingsResponse, - Inference, - LogProbConfig, - ResponseFormat, -) -from llama_stack.providers.utils.inference.model_registry import ( - build_model_alias_with_just_provider_model_id, - ModelRegistryHelper, -) - -from ._config import NVIDIAConfig -from ._openai_utils import ( - convert_chat_completion_request, - convert_openai_chat_completion_choice, - convert_openai_chat_completion_stream, -) -from ._utils import check_health - -_MODEL_ALIASES = [ - build_model_alias_with_just_provider_model_id( - "meta/llama3-8b-instruct", - CoreModelId.llama3_8b_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama3-70b-instruct", - CoreModelId.llama3_70b_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama-3.1-8b-instruct", - CoreModelId.llama3_1_8b_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama-3.1-70b-instruct", - CoreModelId.llama3_1_70b_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama-3.1-405b-instruct", - CoreModelId.llama3_1_405b_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama-3.2-1b-instruct", - CoreModelId.llama3_2_1b_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama-3.2-3b-instruct", - CoreModelId.llama3_2_3b_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama-3.2-11b-vision-instruct", - CoreModelId.llama3_2_11b_vision_instruct.value, - ), - build_model_alias_with_just_provider_model_id( - "meta/llama-3.2-90b-vision-instruct", - CoreModelId.llama3_2_90b_vision_instruct.value, - ), - # TODO(mf): how do we handle Nemotron models? - # "Llama3.1-Nemotron-51B-Instruct" -> "meta/llama-3.1-nemotron-51b-instruct", -] - - -class NVIDIAInferenceAdapter(Inference, ModelRegistryHelper): - def __init__(self, config: NVIDIAConfig) -> None: - # TODO(mf): filter by available models - ModelRegistryHelper.__init__(self, model_aliases=_MODEL_ALIASES) - - print(f"Initializing NVIDIAInferenceAdapter({config.url})...") - - if config.is_hosted: - if not config.api_key: - raise RuntimeError( - "API key is required for hosted NVIDIA NIM. " - "Either provide an API key or use a self-hosted NIM." - ) - # elif self._config.api_key: - # - # we don't raise this warning because a user may have deployed their - # self-hosted NIM with an API key requirement. - # - # warnings.warn( - # "API key is not required for self-hosted NVIDIA NIM. " - # "Consider removing the api_key from the configuration." - # ) - - self._config = config - # make sure the client lives longer than any async calls - self._client = AsyncOpenAI( - base_url=f"{self._config.url}/v1", - api_key=self._config.api_key or "NO KEY", - timeout=self._config.timeout, - ) - - def completion( - self, - model_id: str, - content: InterleavedTextMedia, - sampling_params: Optional[SamplingParams] = SamplingParams(), - response_format: Optional[ResponseFormat] = None, - stream: Optional[bool] = False, - logprobs: Optional[LogProbConfig] = None, - ) -> Union[CompletionResponse, AsyncIterator[CompletionResponseStreamChunk]]: - raise NotImplementedError() - - async def embeddings( - self, - model_id: str, - contents: List[InterleavedTextMedia], - ) -> EmbeddingsResponse: - raise NotImplementedError() - - async def chat_completion( - self, - model_id: str, - messages: List[Message], - sampling_params: Optional[SamplingParams] = SamplingParams(), - response_format: Optional[ResponseFormat] = None, - tools: Optional[List[ToolDefinition]] = None, - tool_choice: Optional[ToolChoice] = ToolChoice.auto, - tool_prompt_format: Optional[ - ToolPromptFormat - ] = None, # API default is ToolPromptFormat.json, we default to None to detect user input - stream: Optional[bool] = False, - logprobs: Optional[LogProbConfig] = None, - ) -> Union[ - ChatCompletionResponse, AsyncIterator[ChatCompletionResponseStreamChunk] - ]: - if tool_prompt_format: - warnings.warn("tool_prompt_format is not supported by NVIDIA NIM, ignoring") - - await check_health(self._config) # this raises errors - - request = convert_chat_completion_request( - request=ChatCompletionRequest( - model=self.get_provider_model_id(model_id), - messages=messages, - sampling_params=sampling_params, - tools=tools, - tool_choice=tool_choice, - tool_prompt_format=tool_prompt_format, - stream=stream, - logprobs=logprobs, - ), - n=1, - ) - - try: - response = await self._client.chat.completions.create(**request) - except APIConnectionError as e: - raise ConnectionError( - f"Failed to connect to NVIDIA NIM at {self._config.url}: {e}" - ) from e - - if stream: - return convert_openai_chat_completion_stream(response) - else: - # we pass n=1 to get only one completion - return convert_openai_chat_completion_choice(response.choices[0]) diff --git a/llama_stack/providers/remote/inference/nvidia/_openai_utils.py b/llama_stack/providers/remote/inference/nvidia/_openai_utils.py deleted file mode 100644 index 998b4c275..000000000 --- a/llama_stack/providers/remote/inference/nvidia/_openai_utils.py +++ /dev/null @@ -1,430 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import json -import warnings -from typing import Any, AsyncGenerator, Dict, Generator, List, Optional - -from llama_models.llama3.api.datatypes import ( - CompletionMessage, - StopReason, - TokenLogProbs, - ToolCall, -) -from openai import AsyncStream -from openai.types.chat import ChatCompletionChunk as OpenAIChatCompletionChunk -from openai.types.chat.chat_completion import ( - Choice as OpenAIChoice, - ChoiceLogprobs as OpenAIChoiceLogprobs, # same as chat_completion_chunk ChoiceLogprobs -) -from openai.types.chat.chat_completion_message_tool_call import ( - ChatCompletionMessageToolCall as OpenAIChatCompletionMessageToolCall, -) - -from llama_stack.apis.inference import ( - ChatCompletionRequest, - ChatCompletionResponse, - ChatCompletionResponseEvent, - ChatCompletionResponseEventType, - ChatCompletionResponseStreamChunk, - Message, - ToolCallDelta, - ToolCallParseStatus, -) - - -def _convert_message(message: Message) -> Dict: - """ - Convert a Message to an OpenAI API-compatible dictionary. - """ - out_dict = message.dict() - # Llama Stack uses role="ipython" for tool call messages, OpenAI uses "tool" - if out_dict["role"] == "ipython": - out_dict.update(role="tool") - - if "stop_reason" in out_dict: - out_dict.update(stop_reason=out_dict["stop_reason"].value) - - # TODO(mf): tool_calls - - return out_dict - - -def convert_chat_completion_request( - request: ChatCompletionRequest, - n: int = 1, -) -> dict: - """ - Convert a ChatCompletionRequest to an OpenAI API-compatible dictionary. - """ - # model -> model - # messages -> messages - # sampling_params TODO(mattf): review strategy - # strategy=greedy -> nvext.top_k = -1, temperature = temperature - # strategy=top_p -> nvext.top_k = -1, top_p = top_p - # strategy=top_k -> nvext.top_k = top_k - # temperature -> temperature - # top_p -> top_p - # top_k -> nvext.top_k - # max_tokens -> max_tokens - # repetition_penalty -> nvext.repetition_penalty - # tools -> tools - # tool_choice ("auto", "required") -> tool_choice - # tool_prompt_format -> TBD - # stream -> stream - # logprobs -> logprobs - - nvext = {} - payload: Dict[str, Any] = dict( - model=request.model, - messages=[_convert_message(message) for message in request.messages], - stream=request.stream, - n=n, - extra_body=dict(nvext=nvext), - extra_headers={ - b"User-Agent": b"llama-stack: nvidia-inference-adapter", - }, - ) - - if request.tools: - payload.update(tools=request.tools) - if request.tool_choice: - payload.update( - tool_choice=request.tool_choice.value - ) # we cannot include tool_choice w/o tools, server will complain - - if request.logprobs: - payload.update(logprobs=True) - payload.update(top_logprobs=request.logprobs.top_k) - - if request.sampling_params: - nvext.update(repetition_penalty=request.sampling_params.repetition_penalty) - - if request.sampling_params.max_tokens: - payload.update(max_tokens=request.sampling_params.max_tokens) - - if request.sampling_params.strategy == "top_p": - nvext.update(top_k=-1) - payload.update(top_p=request.sampling_params.top_p) - elif request.sampling_params.strategy == "top_k": - if ( - request.sampling_params.top_k != -1 - and request.sampling_params.top_k < 1 - ): - warnings.warn("top_k must be -1 or >= 1") - nvext.update(top_k=request.sampling_params.top_k) - elif request.sampling_params.strategy == "greedy": - nvext.update(top_k=-1) - payload.update(temperature=request.sampling_params.temperature) - - return payload - - -def _convert_openai_finish_reason(finish_reason: str) -> StopReason: - """ - Convert an OpenAI chat completion finish_reason to a StopReason. - - finish_reason: Literal["stop", "length", "tool_calls", ...] - - stop: model hit a natural stop point or a provided stop sequence - - length: maximum number of tokens specified in the request was reached - - tool_calls: model called a tool - - -> - - class StopReason(Enum): - end_of_turn = "end_of_turn" - end_of_message = "end_of_message" - out_of_tokens = "out_of_tokens" - """ - - # TODO(mf): are end_of_turn and end_of_message semantics correct? - return { - "stop": StopReason.end_of_turn, - "length": StopReason.out_of_tokens, - "tool_calls": StopReason.end_of_message, - }.get(finish_reason, StopReason.end_of_turn) - - -def _convert_openai_tool_calls( - tool_calls: List[OpenAIChatCompletionMessageToolCall], -) -> List[ToolCall]: - """ - Convert an OpenAI ChatCompletionMessageToolCall list into a list of ToolCall. - - OpenAI ChatCompletionMessageToolCall: - id: str - function: Function - type: Literal["function"] - - OpenAI Function: - arguments: str - name: str - - -> - - ToolCall: - call_id: str - tool_name: str - arguments: Dict[str, ...] - """ - if not tool_calls: - return [] # CompletionMessage tool_calls is not optional - - return [ - ToolCall( - call_id=call.id, - tool_name=call.function.name, - arguments=json.loads(call.function.arguments), - ) - for call in tool_calls - ] - - -def _convert_openai_logprobs( - logprobs: OpenAIChoiceLogprobs, -) -> Optional[List[TokenLogProbs]]: - """ - Convert an OpenAI ChoiceLogprobs into a list of TokenLogProbs. - - OpenAI ChoiceLogprobs: - content: Optional[List[ChatCompletionTokenLogprob]] - - OpenAI ChatCompletionTokenLogprob: - token: str - logprob: float - top_logprobs: List[TopLogprob] - - OpenAI TopLogprob: - token: str - logprob: float - - -> - - TokenLogProbs: - logprobs_by_token: Dict[str, float] - - token, logprob - - """ - if not logprobs: - return None - - return [ - TokenLogProbs( - logprobs_by_token={ - logprobs.token: logprobs.logprob for logprobs in content.top_logprobs - } - ) - for content in logprobs.content - ] - - -def convert_openai_chat_completion_choice( - choice: OpenAIChoice, -) -> ChatCompletionResponse: - """ - Convert an OpenAI Choice into a ChatCompletionResponse. - - OpenAI Choice: - message: ChatCompletionMessage - finish_reason: str - logprobs: Optional[ChoiceLogprobs] - - OpenAI ChatCompletionMessage: - role: Literal["assistant"] - content: Optional[str] - tool_calls: Optional[List[ChatCompletionMessageToolCall]] - - -> - - ChatCompletionResponse: - completion_message: CompletionMessage - logprobs: Optional[List[TokenLogProbs]] - - CompletionMessage: - role: Literal["assistant"] - content: str | ImageMedia | List[str | ImageMedia] - stop_reason: StopReason - tool_calls: List[ToolCall] - - class StopReason(Enum): - end_of_turn = "end_of_turn" - end_of_message = "end_of_message" - out_of_tokens = "out_of_tokens" - """ - assert ( - hasattr(choice, "message") and choice.message - ), "error in server response: message not found" - assert ( - hasattr(choice, "finish_reason") and choice.finish_reason - ), "error in server response: finish_reason not found" - - return ChatCompletionResponse( - completion_message=CompletionMessage( - content=choice.message.content - or "", # CompletionMessage content is not optional - stop_reason=_convert_openai_finish_reason(choice.finish_reason), - tool_calls=_convert_openai_tool_calls(choice.message.tool_calls), - ), - logprobs=_convert_openai_logprobs(choice.logprobs), - ) - - -async def convert_openai_chat_completion_stream( - stream: AsyncStream[OpenAIChatCompletionChunk], -) -> AsyncGenerator[ChatCompletionResponseStreamChunk, None]: - """ - Convert a stream of OpenAI chat completion chunks into a stream - of ChatCompletionResponseStreamChunk. - - OpenAI ChatCompletionChunk: - choices: List[Choice] - - OpenAI Choice: # different from the non-streamed Choice - delta: ChoiceDelta - finish_reason: Optional[Literal["stop", "length", "tool_calls", "content_filter", "function_call"]] - logprobs: Optional[ChoiceLogprobs] - - OpenAI ChoiceDelta: - content: Optional[str] - role: Optional[Literal["system", "user", "assistant", "tool"]] - tool_calls: Optional[List[ChoiceDeltaToolCall]] - - OpenAI ChoiceDeltaToolCall: - index: int - id: Optional[str] - function: Optional[ChoiceDeltaToolCallFunction] - type: Optional[Literal["function"]] - - OpenAI ChoiceDeltaToolCallFunction: - name: Optional[str] - arguments: Optional[str] - - -> - - ChatCompletionResponseStreamChunk: - event: ChatCompletionResponseEvent - - ChatCompletionResponseEvent: - event_type: ChatCompletionResponseEventType - delta: Union[str, ToolCallDelta] - logprobs: Optional[List[TokenLogProbs]] - stop_reason: Optional[StopReason] - - ChatCompletionResponseEventType: - start = "start" - progress = "progress" - complete = "complete" - - ToolCallDelta: - content: Union[str, ToolCall] - parse_status: ToolCallParseStatus - - ToolCall: - call_id: str - tool_name: str - arguments: str - - ToolCallParseStatus: - started = "started" - in_progress = "in_progress" - failure = "failure" - success = "success" - - TokenLogProbs: - logprobs_by_token: Dict[str, float] - - token, logprob - - StopReason: - end_of_turn = "end_of_turn" - end_of_message = "end_of_message" - out_of_tokens = "out_of_tokens" - """ - - # generate a stream of ChatCompletionResponseEventType: start -> progress -> progress -> ... - def _event_type_generator() -> ( - Generator[ChatCompletionResponseEventType, None, None] - ): - yield ChatCompletionResponseEventType.start - while True: - yield ChatCompletionResponseEventType.progress - - event_type = _event_type_generator() - - # we implement NIM specific semantics, the main difference from OpenAI - # is that tool_calls are always produced as a complete call. there is no - # intermediate / partial tool call streamed. because of this, we can - # simplify the logic and not concern outselves with parse_status of - # started/in_progress/failed. we can always assume success. - # - # a stream of ChatCompletionResponseStreamChunk consists of - # 0. a start event - # 1. zero or more progress events - # - each progress event has a delta - # - each progress event may have a stop_reason - # - each progress event may have logprobs - # - each progress event may have tool_calls - # if a progress event has tool_calls, - # it is fully formed and - # can be emitted with a parse_status of success - # 2. a complete event - - stop_reason = None - - async for chunk in stream: - choice = chunk.choices[0] # assuming only one choice per chunk - - # we assume there's only one finish_reason in the stream - stop_reason = _convert_openai_finish_reason(choice.finish_reason) or stop_reason - - # if there's a tool call, emit an event for each tool in the list - # if tool call and content, emit both separately - - if choice.delta.tool_calls: - # the call may have content and a tool call. ChatCompletionResponseEvent - # does not support both, so we emit the content first - if choice.delta.content: - yield ChatCompletionResponseStreamChunk( - event=ChatCompletionResponseEvent( - event_type=next(event_type), - delta=choice.delta.content, - logprobs=_convert_openai_logprobs(choice.logprobs), - ) - ) - - # it is possible to have parallel tool calls in stream, but - # ChatCompletionResponseEvent only supports one per stream - if len(choice.delta.tool_calls) > 1: - warnings.warn( - "multiple tool calls found in a single delta, using the first, ignoring the rest" - ) - - # NIM only produces fully formed tool calls, so we can assume success - yield ChatCompletionResponseStreamChunk( - event=ChatCompletionResponseEvent( - event_type=next(event_type), - delta=ToolCallDelta( - content=_convert_openai_tool_calls(choice.delta.tool_calls)[0], - parse_status=ToolCallParseStatus.success, - ), - logprobs=_convert_openai_logprobs(choice.logprobs), - ) - ) - else: - yield ChatCompletionResponseStreamChunk( - event=ChatCompletionResponseEvent( - event_type=next(event_type), - delta=choice.delta.content or "", # content is not optional - logprobs=_convert_openai_logprobs(choice.logprobs), - ) - ) - - yield ChatCompletionResponseStreamChunk( - event=ChatCompletionResponseEvent( - event_type=ChatCompletionResponseEventType.complete, - delta="", - stop_reason=stop_reason, - ) - ) diff --git a/llama_stack/providers/remote/inference/nvidia/_utils.py b/llama_stack/providers/remote/inference/nvidia/_utils.py deleted file mode 100644 index c66cf75f4..000000000 --- a/llama_stack/providers/remote/inference/nvidia/_utils.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from typing import Tuple - -import httpx - -from ._config import NVIDIAConfig - - -async def _get_health(url: str) -> Tuple[bool, bool]: - """ - Query {url}/v1/health/{live,ready} to check if the server is running and ready - - Args: - url (str): URL of the server - - Returns: - Tuple[bool, bool]: (is_live, is_ready) - """ - async with httpx.AsyncClient() as client: - live = await client.get(f"{url}/v1/health/live") - ready = await client.get(f"{url}/v1/health/ready") - return live.status_code == 200, ready.status_code == 200 - - -async def check_health(config: NVIDIAConfig) -> None: - """ - Check if the server is running and ready - - Args: - url (str): URL of the server - - Raises: - RuntimeError: If the server is not running or ready - """ - if not config.is_hosted: - print("Checking NVIDIA NIM health...") - try: - is_live, is_ready = await _get_health(config.url) - if not is_live: - raise ConnectionError("NVIDIA NIM is not running") - if not is_ready: - raise ConnectionError("NVIDIA NIM is not ready") - # TODO(mf): should we wait for the server to be ready? - except httpx.ConnectError as e: - raise ConnectionError(f"Failed to connect to NVIDIA NIM: {e}") from e diff --git a/llama_stack/templates/nvidia/doc_template.md b/llama_stack/templates/nvidia/doc_template.md index a9db77055..949018f8d 100644 --- a/llama_stack/templates/nvidia/doc_template.md +++ b/llama_stack/templates/nvidia/doc_template.md @@ -47,14 +47,14 @@ docker run \ llamastack/distribution-{{ name }} \ --yaml-config /root/my-run.yaml \ --port $LLAMA_STACK_PORT \ - --env FIREWORKS_API_KEY=$FIREWORKS_API_KEY + --env NVIDIA_API_KEY=$NVIDIA_API_KEY ``` ### Via Conda ```bash -llama stack build --template fireworks --image-type conda +llama stack build --template nvidia --image-type conda llama stack run ./run.yaml \ --port 5001 \ - --env FIREWORKS_API_KEY=$FIREWORKS_API_KEY + --env NVIDIA_API_KEY=$NVIDIA_API_KEY ``` diff --git a/llama_stack/templates/nvidia/nvidia.py b/llama_stack/templates/nvidia/nvidia.py index 0f1551180..22aa1f4b0 100644 --- a/llama_stack/templates/nvidia/nvidia.py +++ b/llama_stack/templates/nvidia/nvidia.py @@ -6,11 +6,9 @@ from pathlib import Path -from llama_models.sku_list import all_registered_models - -from llama_stack.distribution.datatypes import ModelInput, Provider, ShieldInput +from llama_stack.distribution.datatypes import ModelInput, Provider from llama_stack.providers.remote.inference.nvidia import NVIDIAConfig -from llama_stack.providers.remote.inference.nvidia._nvidia import _MODEL_ALIASES +from llama_stack.providers.remote.inference.nvidia.nvidia import _MODEL_ALIASES from llama_stack.templates.template import DistributionTemplate, RunConfigSettings