fix(inference): enable routing of models with provider_data alone (#3928)

This PR enables routing of fully qualified model IDs of the form
`provider_id/model_id` even when the models are not registered with the
Stack.

Here's the situation: assume a remote inference provider which works
only when users provide their own API keys via
`X-LlamaStack-Provider-Data` header. By definition, we cannot list
models and hence update our routing registry. But because we _require_ a
provider ID in the models now, we can identify which provider to route
to and let that provider decide.

Note that we still try to look up our registry since it may have a
pre-registered alias. Just that we don't outright fail when we are not
able to look it up.

Also, updated inference router so that the responses have the _exact_
model that the request had.

Added an integration test

Closes #3929

---------

Co-authored-by: ehhuang <ehhuang@users.noreply.github.com>
This commit is contained in:
Ashwin Bharambe 2025-10-28 11:16:37 -07:00
parent a6c3a9cadf
commit 84ffcb8c2b
6 changed files with 214 additions and 55 deletions

View file

@ -105,7 +105,8 @@ class InferenceRouter(Inference):
prompt_tokens: int, prompt_tokens: int,
completion_tokens: int, completion_tokens: int,
total_tokens: int, total_tokens: int,
model: Model, fully_qualified_model_id: str,
provider_id: str,
) -> list[MetricEvent]: ) -> list[MetricEvent]:
"""Constructs a list of MetricEvent objects containing token usage metrics. """Constructs a list of MetricEvent objects containing token usage metrics.
@ -113,7 +114,8 @@ class InferenceRouter(Inference):
prompt_tokens: Number of tokens in the prompt prompt_tokens: Number of tokens in the prompt
completion_tokens: Number of tokens in the completion completion_tokens: Number of tokens in the completion
total_tokens: Total number of tokens used total_tokens: Total number of tokens used
model: Model object containing model_id and provider_id fully_qualified_model_id:
provider_id: The provider identifier
Returns: Returns:
List of MetricEvent objects with token usage metrics List of MetricEvent objects with token usage metrics
@ -139,8 +141,8 @@ class InferenceRouter(Inference):
timestamp=datetime.now(UTC), timestamp=datetime.now(UTC),
unit="tokens", unit="tokens",
attributes={ attributes={
"model_id": model.model_id, "model_id": fully_qualified_model_id,
"provider_id": model.provider_id, "provider_id": provider_id,
}, },
) )
) )
@ -153,7 +155,9 @@ class InferenceRouter(Inference):
total_tokens: int, total_tokens: int,
model: Model, model: Model,
) -> list[MetricInResponse]: ) -> list[MetricInResponse]:
metrics = self._construct_metrics(prompt_tokens, completion_tokens, total_tokens, model) metrics = self._construct_metrics(
prompt_tokens, completion_tokens, total_tokens, model.model_id, model.provider_id
)
if self.telemetry: if self.telemetry:
for metric in metrics: for metric in metrics:
enqueue_event(metric) enqueue_event(metric)
@ -173,14 +177,25 @@ class InferenceRouter(Inference):
encoded = self.formatter.encode_content(messages) encoded = self.formatter.encode_content(messages)
return len(encoded.tokens) if encoded and encoded.tokens else 0 return len(encoded.tokens) if encoded and encoded.tokens else 0
async def _get_model(self, model_id: str, expected_model_type: str) -> Model: async def _get_model_provider(self, model_id: str, expected_model_type: str) -> tuple[Inference, str]:
"""takes a model id and gets model after ensuring that it is accessible and of the correct type""" model = await self.routing_table.get_object_by_identifier("model", model_id)
model = await self.routing_table.get_model(model_id) if model:
if model is None: if model.model_type != expected_model_type:
raise ModelTypeError(model_id, model.model_type, expected_model_type)
provider = await self.routing_table.get_provider_impl(model.identifier)
return provider, model.provider_resource_id
splits = model_id.split("/", maxsplit=1)
if len(splits) != 2:
raise ModelNotFoundError(model_id) raise ModelNotFoundError(model_id)
if model.model_type != expected_model_type:
raise ModelTypeError(model_id, model.model_type, expected_model_type) provider_id, provider_resource_id = splits
return model if provider_id not in self.routing_table.impls_by_provider_id:
logger.warning(f"Provider {provider_id} not found for model {model_id}")
raise ModelNotFoundError(model_id)
return self.routing_table.impls_by_provider_id[provider_id], provider_resource_id
async def openai_completion( async def openai_completion(
self, self,
@ -189,24 +204,24 @@ class InferenceRouter(Inference):
logger.debug( logger.debug(
f"InferenceRouter.openai_completion: model={params.model}, stream={params.stream}, prompt={params.prompt}", f"InferenceRouter.openai_completion: model={params.model}, stream={params.stream}, prompt={params.prompt}",
) )
model_obj = await self._get_model(params.model, ModelType.llm) request_model_id = params.model
provider, provider_resource_id = await self._get_model_provider(params.model, ModelType.llm)
params.model = provider_resource_id
# Update params with the resolved model identifier
params.model = model_obj.identifier
provider = await self.routing_table.get_provider_impl(model_obj.identifier)
if params.stream: if params.stream:
return await provider.openai_completion(params) return await provider.openai_completion(params)
# TODO: Metrics do NOT work with openai_completion stream=True due to the fact # TODO: Metrics do NOT work with openai_completion stream=True due to the fact
# that we do not return an AsyncIterator, our tests expect a stream of chunks we cannot intercept currently. # that we do not return an AsyncIterator, our tests expect a stream of chunks we cannot intercept currently.
response = await provider.openai_completion(params) response = await provider.openai_completion(params)
response.model = request_model_id
if self.telemetry: if self.telemetry:
metrics = self._construct_metrics( metrics = self._construct_metrics(
prompt_tokens=response.usage.prompt_tokens, prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens, completion_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens, total_tokens=response.usage.total_tokens,
model=model_obj, fully_qualified_model_id=request_model_id,
provider_id=provider.__provider_id__,
) )
for metric in metrics: for metric in metrics:
enqueue_event(metric) enqueue_event(metric)
@ -224,7 +239,9 @@ class InferenceRouter(Inference):
logger.debug( logger.debug(
f"InferenceRouter.openai_chat_completion: model={params.model}, stream={params.stream}, messages={params.messages}", f"InferenceRouter.openai_chat_completion: model={params.model}, stream={params.stream}, messages={params.messages}",
) )
model_obj = await self._get_model(params.model, ModelType.llm) request_model_id = params.model
provider, provider_resource_id = await self._get_model_provider(params.model, ModelType.llm)
params.model = provider_resource_id
# Use the OpenAI client for a bit of extra input validation without # Use the OpenAI client for a bit of extra input validation without
# exposing the OpenAI client itself as part of our API surface # exposing the OpenAI client itself as part of our API surface
@ -242,10 +259,6 @@ class InferenceRouter(Inference):
params.tool_choice = None params.tool_choice = None
params.tools = None params.tools = None
# Update params with the resolved model identifier
params.model = model_obj.identifier
provider = await self.routing_table.get_provider_impl(model_obj.identifier)
if params.stream: if params.stream:
response_stream = await provider.openai_chat_completion(params) response_stream = await provider.openai_chat_completion(params)
@ -253,11 +266,13 @@ class InferenceRouter(Inference):
# We need to add metrics to each chunk and store the final completion # We need to add metrics to each chunk and store the final completion
return self.stream_tokens_and_compute_metrics_openai_chat( return self.stream_tokens_and_compute_metrics_openai_chat(
response=response_stream, response=response_stream,
model=model_obj, fully_qualified_model_id=request_model_id,
provider_id=provider.__provider_id__,
messages=params.messages, messages=params.messages,
) )
response = await self._nonstream_openai_chat_completion(provider, params) response = await self._nonstream_openai_chat_completion(provider, params)
response.model = request_model_id
# Store the response with the ID that will be returned to the client # Store the response with the ID that will be returned to the client
if self.store: if self.store:
@ -268,7 +283,8 @@ class InferenceRouter(Inference):
prompt_tokens=response.usage.prompt_tokens, prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens, completion_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens, total_tokens=response.usage.total_tokens,
model=model_obj, fully_qualified_model_id=request_model_id,
provider_id=provider.__provider_id__,
) )
for metric in metrics: for metric in metrics:
enqueue_event(metric) enqueue_event(metric)
@ -285,13 +301,13 @@ class InferenceRouter(Inference):
logger.debug( logger.debug(
f"InferenceRouter.openai_embeddings: model={params.model}, input_type={type(params.input)}, encoding_format={params.encoding_format}, dimensions={params.dimensions}", f"InferenceRouter.openai_embeddings: model={params.model}, input_type={type(params.input)}, encoding_format={params.encoding_format}, dimensions={params.dimensions}",
) )
model_obj = await self._get_model(params.model, ModelType.embedding) request_model_id = params.model
provider, provider_resource_id = await self._get_model_provider(params.model, ModelType.embedding)
params.model = provider_resource_id
# Update model to use resolved identifier response = await provider.openai_embeddings(params)
params.model = model_obj.identifier response.model = request_model_id
return response
provider = await self.routing_table.get_provider_impl(model_obj.identifier)
return await provider.openai_embeddings(params)
async def list_chat_completions( async def list_chat_completions(
self, self,
@ -347,7 +363,8 @@ class InferenceRouter(Inference):
self, self,
response, response,
prompt_tokens, prompt_tokens,
model, fully_qualified_model_id: str,
provider_id: str,
tool_prompt_format: ToolPromptFormat | None = None, tool_prompt_format: ToolPromptFormat | None = None,
) -> AsyncGenerator[ChatCompletionResponseStreamChunk, None] | AsyncGenerator[CompletionResponseStreamChunk, None]: ) -> AsyncGenerator[ChatCompletionResponseStreamChunk, None] | AsyncGenerator[CompletionResponseStreamChunk, None]:
completion_text = "" completion_text = ""
@ -385,7 +402,8 @@ class InferenceRouter(Inference):
prompt_tokens=prompt_tokens, prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens, completion_tokens=completion_tokens,
total_tokens=total_tokens, total_tokens=total_tokens,
model=model, fully_qualified_model_id=fully_qualified_model_id,
provider_id=provider_id,
) )
for metric in completion_metrics: for metric in completion_metrics:
if metric.metric in [ if metric.metric in [
@ -405,7 +423,8 @@ class InferenceRouter(Inference):
prompt_tokens or 0, prompt_tokens or 0,
completion_tokens or 0, completion_tokens or 0,
total_tokens, total_tokens,
model, fully_qualified_model_id=fully_qualified_model_id,
provider_id=provider_id,
) )
async_metrics = [ async_metrics = [
MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics
@ -417,7 +436,8 @@ class InferenceRouter(Inference):
self, self,
response: ChatCompletionResponse | CompletionResponse, response: ChatCompletionResponse | CompletionResponse,
prompt_tokens, prompt_tokens,
model, fully_qualified_model_id: str,
provider_id: str,
tool_prompt_format: ToolPromptFormat | None = None, tool_prompt_format: ToolPromptFormat | None = None,
): ):
if isinstance(response, ChatCompletionResponse): if isinstance(response, ChatCompletionResponse):
@ -434,7 +454,8 @@ class InferenceRouter(Inference):
prompt_tokens=prompt_tokens, prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens, completion_tokens=completion_tokens,
total_tokens=total_tokens, total_tokens=total_tokens,
model=model, fully_qualified_model_id=fully_qualified_model_id,
provider_id=provider_id,
) )
for metric in completion_metrics: for metric in completion_metrics:
if metric.metric in ["completion_tokens", "total_tokens"]: # Only log completion and total tokens if metric.metric in ["completion_tokens", "total_tokens"]: # Only log completion and total tokens
@ -448,14 +469,16 @@ class InferenceRouter(Inference):
prompt_tokens or 0, prompt_tokens or 0,
completion_tokens or 0, completion_tokens or 0,
total_tokens, total_tokens,
model, fully_qualified_model_id=fully_qualified_model_id,
provider_id=provider_id,
) )
return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in metrics] return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in metrics]
async def stream_tokens_and_compute_metrics_openai_chat( async def stream_tokens_and_compute_metrics_openai_chat(
self, self,
response: AsyncIterator[OpenAIChatCompletionChunk], response: AsyncIterator[OpenAIChatCompletionChunk],
model: Model, fully_qualified_model_id: str,
provider_id: str,
messages: list[OpenAIMessageParam] | None = None, messages: list[OpenAIMessageParam] | None = None,
) -> AsyncIterator[OpenAIChatCompletionChunk]: ) -> AsyncIterator[OpenAIChatCompletionChunk]:
"""Stream OpenAI chat completion chunks, compute metrics, and store the final completion.""" """Stream OpenAI chat completion chunks, compute metrics, and store the final completion."""
@ -475,6 +498,8 @@ class InferenceRouter(Inference):
if created is None and chunk.created: if created is None and chunk.created:
created = chunk.created created = chunk.created
chunk.model = fully_qualified_model_id
# Accumulate choice data for final assembly # Accumulate choice data for final assembly
if chunk.choices: if chunk.choices:
for choice_delta in chunk.choices: for choice_delta in chunk.choices:
@ -531,7 +556,8 @@ class InferenceRouter(Inference):
prompt_tokens=chunk.usage.prompt_tokens, prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens, completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens, total_tokens=chunk.usage.total_tokens,
model=model, model_id=fully_qualified_model_id,
provider_id=provider_id,
) )
for metric in metrics: for metric in metrics:
enqueue_event(metric) enqueue_event(metric)
@ -579,7 +605,7 @@ class InferenceRouter(Inference):
id=id, id=id,
choices=assembled_choices, choices=assembled_choices,
created=created or int(time.time()), created=created or int(time.time()),
model=model.identifier, model=fully_qualified_model_id,
object="chat.completion", object="chat.completion",
) )
logger.debug(f"InferenceRouter.completion_response: {final_response}") logger.debug(f"InferenceRouter.completion_response: {final_response}")

View file

@ -46,8 +46,7 @@ class SentenceTransformerEmbeddingMixin:
raise ValueError("Empty list not supported") raise ValueError("Empty list not supported")
# Get the model and generate embeddings # Get the model and generate embeddings
model_obj = await self.model_store.get_model(params.model) embedding_model = await self._load_sentence_transformer_model(params.model)
embedding_model = await self._load_sentence_transformer_model(model_obj.provider_resource_id)
embeddings = await asyncio.to_thread(embedding_model.encode, input_list, show_progress_bar=False) embeddings = await asyncio.to_thread(embedding_model.encode, input_list, show_progress_bar=False)
# Convert embeddings to the requested format # Convert embeddings to the requested format

View file

@ -201,8 +201,11 @@ class OpenAIMixin(NeedsRequestProviderData, ABC, BaseModel):
:param model: The registered model name/identifier :param model: The registered model name/identifier
:return: The provider-specific model ID (e.g., "gpt-4") :return: The provider-specific model ID (e.g., "gpt-4")
""" """
# Look up the registered model to get the provider-specific model ID
# self.model_store is injected by the distribution system at runtime # self.model_store is injected by the distribution system at runtime
if not await self.model_store.has_model(model): # type: ignore[attr-defined]
return model
# Look up the registered model to get the provider-specific model ID
model_obj: Model = await self.model_store.get_model(model) # type: ignore[attr-defined] model_obj: Model = await self.model_store.get_model(model) # type: ignore[attr-defined]
# provider_resource_id is str | None, but we expect it to be str for OpenAI calls # provider_resource_id is str | None, but we expect it to be str for OpenAI calls
if model_obj.provider_resource_id is None: if model_obj.provider_resource_id is None:

View file

@ -161,8 +161,7 @@ def test_openai_embeddings_single_string(compat_client, client_with_models, embe
assert response.object == "list" assert response.object == "list"
# Handle provider-scoped model identifiers (e.g., sentence-transformers/nomic-ai/nomic-embed-text-v1.5) assert response.model == embedding_model_id
assert response.model == embedding_model_id or response.model.endswith(f"/{embedding_model_id}")
assert len(response.data) == 1 assert len(response.data) == 1
assert response.data[0].object == "embedding" assert response.data[0].object == "embedding"
assert response.data[0].index == 0 assert response.data[0].index == 0
@ -186,8 +185,7 @@ def test_openai_embeddings_multiple_strings(compat_client, client_with_models, e
assert response.object == "list" assert response.object == "list"
# Handle provider-scoped model identifiers (e.g., sentence-transformers/nomic-ai/nomic-embed-text-v1.5) assert response.model == embedding_model_id
assert response.model == embedding_model_id or response.model.endswith(f"/{embedding_model_id}")
assert len(response.data) == len(input_texts) assert len(response.data) == len(input_texts)
for i, embedding_data in enumerate(response.data): for i, embedding_data in enumerate(response.data):
@ -365,8 +363,7 @@ def test_openai_embeddings_base64_batch_processing(compat_client, client_with_mo
# Validate response structure # Validate response structure
assert response.object == "list" assert response.object == "list"
# Handle provider-scoped model identifiers (e.g., sentence-transformers/nomic-ai/nomic-embed-text-v1.5) assert response.model == embedding_model_id
assert response.model == embedding_model_id or response.model.endswith(f"/{embedding_model_id}")
assert len(response.data) == len(input_texts) assert len(response.data) == len(input_texts)
# Validate each embedding in the batch # Validate each embedding in the batch

View file

@ -0,0 +1,133 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Test that models can be routed using provider_id/model_id format
when the provider is configured but the specific model is not registered.
This test validates the fix in src/llama_stack/core/routers/inference.py
that enables routing based on provider_data alone.
"""
from unittest.mock import AsyncMock, patch
import pytest
from llama_stack import LlamaStackAsLibraryClient
from llama_stack.apis.datatypes import Api
from llama_stack.apis.inference.inference import (
OpenAIAssistantMessageParam,
OpenAIChatCompletion,
OpenAIChatCompletionUsage,
OpenAIChoice,
)
from llama_stack.core.telemetry.telemetry import MetricEvent
class OpenAIChatCompletionWithMetrics(OpenAIChatCompletion):
metrics: list[MetricEvent] | None = None
def test_unregistered_model_routing_with_provider_data(client_with_models):
"""
Test that a model can be routed using provider_id/model_id format
even when the model is not explicitly registered, as long as the provider
is available.
This validates the fix where the router:
1. Tries to lookup model in routing table
2. If not found, splits model_id by "/" to extract provider_id and provider_resource_id
3. Routes directly to the provider with the provider_resource_id
Without the fix, this would raise ModelNotFoundError immediately.
With the fix, the routing succeeds and the request reaches the provider.
"""
if not isinstance(client_with_models, LlamaStackAsLibraryClient):
pytest.skip("Test requires library client for provider-level patching")
client = client_with_models
# Use a model format that follows provider_id/model_id convention
# We'll use anthropic as an example since it's a remote provider that
# benefits from this pattern
test_model_id = "anthropic/claude-3-5-sonnet-20241022"
# First, verify the model is NOT registered
registered_models = {m.identifier for m in client.models.list()}
assert test_model_id not in registered_models, f"Model {test_model_id} should not be pre-registered for this test"
# Check if anthropic provider is available in ci-tests
providers = {p.provider_id: p for p in client.providers.list()}
if "anthropic" not in providers:
pytest.skip("Anthropic provider not configured in ci-tests - cannot test unregistered model routing")
# Get the actual provider implementation from the library client's stack
inference_router = client.async_client.impls.get(Api.inference)
if not inference_router:
raise RuntimeError("No inference router found")
# The inference router's routing_table.impls_by_provider_id should have anthropic
# Let's patch the anthropic provider's openai_chat_completion method
# to avoid making real API calls
mock_response = OpenAIChatCompletionWithMetrics(
id="chatcmpl-test-123",
created=1234567890,
model="claude-3-5-sonnet-20241022",
choices=[
OpenAIChoice(
index=0,
finish_reason="stop",
message=OpenAIAssistantMessageParam(
content="Mocked response to test routing",
),
)
],
usage=OpenAIChatCompletionUsage(
prompt_tokens=5,
completion_tokens=10,
total_tokens=15,
),
)
# Get the routing table from the inference router
routing_table = inference_router.routing_table
# Patch the anthropic provider's openai_chat_completion method
anthropic_provider = routing_table.impls_by_provider_id.get("anthropic")
if not anthropic_provider:
raise RuntimeError("Anthropic provider not found in routing table even though it's in providers list")
with patch.object(
anthropic_provider,
"openai_chat_completion",
new_callable=AsyncMock,
return_value=mock_response,
) as mock_method:
# Make the request with the unregistered model
response = client.chat.completions.create(
model=test_model_id,
messages=[
{
"role": "user",
"content": "Test message for unregistered model routing",
}
],
stream=False,
)
# Verify the provider's method was called
assert mock_method.called, "Provider's openai_chat_completion should have been called"
# Verify the response came through
assert response.choices[0].message.content == "Mocked response to test routing"
# Verify that the router passed the correct model to the provider
# (without the "anthropic/" prefix)
call_args = mock_method.call_args
params = call_args[0][0] # First positional argument is the params object
assert params.model == "claude-3-5-sonnet-20241022", (
f"Provider should receive model without provider prefix, got {params.model}"
)

View file

@ -64,10 +64,11 @@ def test_telemetry_format_completeness(mock_otlp_collector, llama_stack_client,
# Verify spans # Verify spans
spans = mock_otlp_collector.get_spans() spans = mock_otlp_collector.get_spans()
assert len(spans) == 5 # Expected spans: 1 root span + 3 autotraced method calls from routing/inference
assert len(spans) == 4, f"Expected 4 spans, got {len(spans)}"
# we only need this captured one time # Collect all model_ids found in spans
logged_model_id = None logged_model_ids = []
for span in spans: for span in spans:
attrs = span.attributes attrs = span.attributes
@ -87,10 +88,10 @@ def test_telemetry_format_completeness(mock_otlp_collector, llama_stack_client,
args = json.loads(attrs["__args__"]) args = json.loads(attrs["__args__"])
if "model_id" in args: if "model_id" in args:
logged_model_id = args["model_id"] logged_model_ids.append(args["model_id"])
assert logged_model_id is not None # At least one span should capture the fully qualified model ID
assert logged_model_id == text_model_id assert text_model_id in logged_model_ids, f"Expected to find {text_model_id} in spans, but got {logged_model_ids}"
# TODO: re-enable this once metrics get fixed # TODO: re-enable this once metrics get fixed
""" """