Make TGI adapter compatible with HF Inference API (#97)

This commit is contained in:
Lucain 2024-09-25 23:08:31 +02:00 committed by GitHub
parent 851c30597a
commit 615ed4bfbc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 122 additions and 96 deletions

View file

@ -5,54 +5,33 @@
# the root directory of this source tree.
from typing import Any, AsyncGenerator, Dict
import logging
from typing import AsyncGenerator
import requests
from huggingface_hub import HfApi, InferenceClient
from huggingface_hub import AsyncInferenceClient, HfApi
from llama_models.llama3.api.chat_format import ChatFormat
from llama_models.llama3.api.datatypes import StopReason
from llama_models.llama3.api.tokenizer import Tokenizer
from llama_stack.apis.inference import * # noqa: F403
from llama_stack.providers.utils.inference.augment_messages import (
augment_messages_for_tools,
)
from .config import TGIImplConfig
from .config import InferenceAPIImplConfig, InferenceEndpointImplConfig, TGIImplConfig
logger = logging.getLogger(__name__)
class TGIAdapter(Inference):
def __init__(self, config: TGIImplConfig) -> None:
self.config = config
class _HfAdapter(Inference):
client: AsyncInferenceClient
max_tokens: int
model_id: str
def __init__(self) -> None:
self.tokenizer = Tokenizer.get_instance()
self.formatter = ChatFormat(self.tokenizer)
@property
def client(self) -> InferenceClient:
return InferenceClient(model=self.config.url, token=self.config.api_token)
def _get_endpoint_info(self) -> Dict[str, Any]:
return {
**self.client.get_endpoint_info(),
"inference_url": self.config.url,
}
async def initialize(self) -> None:
try:
info = self._get_endpoint_info()
if "model_id" not in info:
raise RuntimeError("Missing model_id in model info")
if "max_total_tokens" not in info:
raise RuntimeError("Missing max_total_tokens in model info")
self.max_tokens = info["max_total_tokens"]
self.inference_url = info["inference_url"]
except Exception as e:
import traceback
traceback.print_exc()
raise RuntimeError(f"Error initializing TGIAdapter: {e}") from e
async def shutdown(self) -> None:
pass
@ -111,7 +90,7 @@ class TGIAdapter(Inference):
options = self.get_chat_options(request)
if not request.stream:
response = self.client.text_generation(
response = await self.client.text_generation(
prompt=prompt,
stream=False,
details=True,
@ -147,7 +126,7 @@ class TGIAdapter(Inference):
stop_reason = None
tokens = []
for response in self.client.text_generation(
async for response in await self.client.text_generation(
prompt=prompt,
stream=True,
details=True,
@ -239,46 +218,36 @@ class TGIAdapter(Inference):
)
class InferenceEndpointAdapter(TGIAdapter):
def __init__(self, config: TGIImplConfig) -> None:
super().__init__(config)
self.config.url = self._construct_endpoint_url()
class TGIAdapter(_HfAdapter):
async def initialize(self, config: TGIImplConfig) -> None:
self.client = AsyncInferenceClient(model=config.url, token=config.api_token)
endpoint_info = await self.client.get_endpoint_info()
self.max_tokens = endpoint_info["max_total_tokens"]
self.model_id = endpoint_info["model_id"]
def _construct_endpoint_url(self) -> str:
hf_endpoint_name = self.config.hf_endpoint_name
assert hf_endpoint_name.count("/") <= 1, (
"Endpoint name must be in the format of 'namespace/endpoint_name' "
"or 'endpoint_name'"
class InferenceAPIAdapter(_HfAdapter):
async def initialize(self, config: InferenceAPIImplConfig) -> None:
self.client = AsyncInferenceClient(
model=config.model_id, token=config.api_token
)
if "/" not in hf_endpoint_name:
hf_namespace: str = self.get_namespace()
endpoint_path = f"{hf_namespace}/{hf_endpoint_name}"
else:
endpoint_path = hf_endpoint_name
return f"https://api.endpoints.huggingface.cloud/v2/endpoint/{endpoint_path}"
endpoint_info = await self.client.get_endpoint_info()
self.max_tokens = endpoint_info["max_total_tokens"]
self.model_id = endpoint_info["model_id"]
def get_namespace(self) -> str:
return HfApi().whoami()["name"]
@property
def client(self) -> InferenceClient:
return InferenceClient(model=self.inference_url, token=self.config.api_token)
class InferenceEndpointAdapter(_HfAdapter):
async def initialize(self, config: InferenceEndpointImplConfig) -> None:
# Get the inference endpoint details
api = HfApi(token=config.api_token)
endpoint = api.get_inference_endpoint(config.endpoint_name)
def _get_endpoint_info(self) -> Dict[str, Any]:
headers = {
"accept": "application/json",
"authorization": f"Bearer {self.config.api_token}",
}
response = requests.get(self.config.url, headers=headers)
response.raise_for_status()
endpoint_info = response.json()
return {
"inference_url": endpoint_info["status"]["url"],
"model_id": endpoint_info["model"]["repository"],
"max_total_tokens": int(
endpoint_info["model"]["image"]["custom"]["env"]["MAX_TOTAL_TOKENS"]
),
}
# Wait for the endpoint to be ready (if not already)
endpoint.wait(timeout=60)
async def initialize(self) -> None:
await super().initialize()
# Initialize the adapter
self.client = endpoint.async_client
self.model_id = endpoint.repository
self.max_tokens = int(
endpoint.raw["model"]["image"]["custom"]["env"]["MAX_TOTAL_TOKENS"]
)