mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-08 19:10:56 +00:00
181 lines
6.3 KiB
Python
181 lines
6.3 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
|
|
import logging
|
|
from typing import AsyncGenerator
|
|
|
|
from huggingface_hub import AsyncInferenceClient, HfApi
|
|
from llama_models.llama3.api.chat_format import ChatFormat
|
|
from llama_models.llama3.api.tokenizer import Tokenizer
|
|
from llama_models.sku_list import resolve_model
|
|
|
|
from llama_stack.apis.inference import * # noqa: F403
|
|
from llama_stack.providers.utils.inference.augment_messages import (
|
|
chat_completion_request_to_model_input_info,
|
|
)
|
|
from llama_stack.providers.utils.inference.openai_compat import (
|
|
get_sampling_options,
|
|
OpenAICompatCompletionChoice,
|
|
OpenAICompatCompletionResponse,
|
|
process_chat_completion_response,
|
|
process_chat_completion_stream_response,
|
|
)
|
|
|
|
from .config import InferenceAPIImplConfig, InferenceEndpointImplConfig, TGIImplConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class _HfAdapter(Inference):
|
|
client: AsyncInferenceClient
|
|
max_tokens: int
|
|
model_id: str
|
|
|
|
def __init__(self) -> None:
|
|
self.formatter = ChatFormat(Tokenizer.get_instance())
|
|
|
|
async def register_model(self, model: ModelDef) -> None:
|
|
resolved_model = resolve_model(model.identifier)
|
|
if resolved_model is None:
|
|
raise ValueError(f"Unknown model: {model.identifier}")
|
|
|
|
if not resolved_model.huggingface_repo:
|
|
raise ValueError(
|
|
f"Model {model.identifier} does not have a HuggingFace repo"
|
|
)
|
|
|
|
if self.model_id != resolved_model.huggingface_repo:
|
|
raise ValueError(f"Model mismatch: {model.identifier} != {self.model_id}")
|
|
|
|
async def shutdown(self) -> None:
|
|
pass
|
|
|
|
def completion(
|
|
self,
|
|
model: str,
|
|
content: InterleavedTextMedia,
|
|
sampling_params: Optional[SamplingParams] = SamplingParams(),
|
|
stream: Optional[bool] = False,
|
|
logprobs: Optional[LogProbConfig] = None,
|
|
) -> AsyncGenerator:
|
|
raise NotImplementedError()
|
|
|
|
def chat_completion(
|
|
self,
|
|
model: str,
|
|
messages: List[Message],
|
|
sampling_params: Optional[SamplingParams] = SamplingParams(),
|
|
tools: Optional[List[ToolDefinition]] = None,
|
|
tool_choice: Optional[ToolChoice] = ToolChoice.auto,
|
|
tool_prompt_format: Optional[ToolPromptFormat] = ToolPromptFormat.json,
|
|
stream: Optional[bool] = False,
|
|
logprobs: Optional[LogProbConfig] = None,
|
|
) -> AsyncGenerator:
|
|
request = ChatCompletionRequest(
|
|
model=model,
|
|
messages=messages,
|
|
sampling_params=sampling_params,
|
|
tools=tools or [],
|
|
tool_choice=tool_choice,
|
|
tool_prompt_format=tool_prompt_format,
|
|
stream=stream,
|
|
logprobs=logprobs,
|
|
)
|
|
|
|
if stream:
|
|
return self._stream_chat_completion(request)
|
|
else:
|
|
return self._nonstream_chat_completion(request)
|
|
|
|
async def _nonstream_chat_completion(
|
|
self, request: ChatCompletionRequest
|
|
) -> ChatCompletionResponse:
|
|
params = self._get_params(request)
|
|
r = await self.client.text_generation(**params)
|
|
|
|
choice = OpenAICompatCompletionChoice(
|
|
finish_reason=r.details.finish_reason,
|
|
text="".join(t.text for t in r.details.tokens),
|
|
)
|
|
response = OpenAICompatCompletionResponse(
|
|
choices=[choice],
|
|
)
|
|
return process_chat_completion_response(request, response, self.formatter)
|
|
|
|
async def _stream_chat_completion(
|
|
self, request: ChatCompletionRequest
|
|
) -> AsyncGenerator:
|
|
params = self._get_params(request)
|
|
|
|
async def _generate_and_convert_to_openai_compat():
|
|
s = await self.client.text_generation(**params)
|
|
async for chunk in s:
|
|
token_result = chunk.token
|
|
|
|
choice = OpenAICompatCompletionChoice(text=token_result.text)
|
|
yield OpenAICompatCompletionResponse(
|
|
choices=[choice],
|
|
)
|
|
|
|
stream = _generate_and_convert_to_openai_compat()
|
|
async for chunk in process_chat_completion_stream_response(
|
|
request, stream, self.formatter
|
|
):
|
|
yield chunk
|
|
|
|
def _get_params(self, request: ChatCompletionRequest) -> dict:
|
|
prompt, input_tokens = chat_completion_request_to_model_input_info(
|
|
request, self.formatter
|
|
)
|
|
max_new_tokens = min(
|
|
request.sampling_params.max_tokens or (self.max_tokens - input_tokens),
|
|
self.max_tokens - input_tokens - 1,
|
|
)
|
|
options = get_sampling_options(request)
|
|
return dict(
|
|
prompt=prompt,
|
|
stream=request.stream,
|
|
details=True,
|
|
max_new_tokens=max_new_tokens,
|
|
stop_sequences=["<|eom_id|>", "<|eot_id|>"],
|
|
**options,
|
|
)
|
|
|
|
|
|
class TGIAdapter(_HfAdapter):
|
|
async def initialize(self, config: TGIImplConfig) -> None:
|
|
self.client = AsyncInferenceClient(model=config.url, token=config.api_token)
|
|
endpoint_info = await self.client.get_endpoint_info()
|
|
self.max_tokens = endpoint_info["max_total_tokens"]
|
|
self.model_id = endpoint_info["model_id"]
|
|
|
|
|
|
class InferenceAPIAdapter(_HfAdapter):
|
|
async def initialize(self, config: InferenceAPIImplConfig) -> None:
|
|
self.client = AsyncInferenceClient(
|
|
model=config.huggingface_repo, token=config.api_token
|
|
)
|
|
endpoint_info = await self.client.get_endpoint_info()
|
|
self.max_tokens = endpoint_info["max_total_tokens"]
|
|
self.model_id = endpoint_info["model_id"]
|
|
|
|
|
|
class InferenceEndpointAdapter(_HfAdapter):
|
|
async def initialize(self, config: InferenceEndpointImplConfig) -> None:
|
|
# Get the inference endpoint details
|
|
api = HfApi(token=config.api_token)
|
|
endpoint = api.get_inference_endpoint(config.endpoint_name)
|
|
|
|
# Wait for the endpoint to be ready (if not already)
|
|
endpoint.wait(timeout=60)
|
|
|
|
# Initialize the adapter
|
|
self.client = endpoint.async_client
|
|
self.model_id = endpoint.repository
|
|
self.max_tokens = int(
|
|
endpoint.raw["model"]["image"]["custom"]["env"]["MAX_TOTAL_TOKENS"]
|
|
)
|