feat: implement get chat completions APIs (#2200)

# What does this PR do?
* Provide sqlite implementation of the APIs introduced in
https://github.com/meta-llama/llama-stack/pull/2145.
* Introduced a SqlStore API: llama_stack/providers/utils/sqlstore/api.py
and the first Sqlite implementation
* Pagination support will be added in a future PR.

## Test Plan
Unit test on sql store:
<img width="1005" alt="image"
src="https://github.com/user-attachments/assets/9b8b7ec8-632b-4667-8127-5583426b2e29"
/>


Integration test:
```
INFERENCE_MODEL="llama3.2:3b-instruct-fp16" llama stack build --template ollama --image-type conda --run
```
```
LLAMA_STACK_CONFIG=http://localhost:5001 INFERENCE_MODEL="llama3.2:3b-instruct-fp16" python -m pytest -v tests/integration/inference/test_openai_completion.py --text-model "llama3.2:3b-instruct-fp16" -k 'inference_store and openai'
```
This commit is contained in:
ehhuang 2025-05-21 22:21:52 -07:00 committed by GitHub
parent 633bb9c5b3
commit 549812f51e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
71 changed files with 1111 additions and 10 deletions

View file

@ -43,8 +43,20 @@ def get_provider_dependencies(
# Extract providers based on config type
if isinstance(config, DistributionTemplate):
providers = config.providers
# TODO: This is a hack to get the dependencies for internal APIs into build
# We should have a better way to do this by formalizing the concept of "internal" APIs
# and providers, with a way to specify dependencies for them.
run_configs = config.run_configs
additional_pip_packages: list[str] = []
if run_configs:
for run_config in run_configs.values():
run_config_ = run_config.run_config(name="", providers={}, container_image=None)
if run_config_.inference_store:
additional_pip_packages.extend(run_config_.inference_store.pip_packages)
elif isinstance(config, BuildConfig):
providers = config.distribution_spec.providers
additional_pip_packages = config.additional_pip_packages
deps = []
registry = get_provider_registry(config)
for api_str, provider_or_providers in providers.items():
@ -72,6 +84,9 @@ def get_provider_dependencies(
else:
normal_deps.append(package)
if additional_pip_packages:
normal_deps.extend(additional_pip_packages)
return list(set(normal_deps)), list(set(special_deps))

View file

@ -26,6 +26,7 @@ from llama_stack.apis.vector_dbs import VectorDB, VectorDBInput
from llama_stack.apis.vector_io import VectorIO
from llama_stack.providers.datatypes import Api, ProviderSpec
from llama_stack.providers.utils.kvstore.config import KVStoreConfig, SqliteKVStoreConfig
from llama_stack.providers.utils.sqlstore.sqlstore import SqlStoreConfig
LLAMA_STACK_BUILD_CONFIG_VERSION = "2"
LLAMA_STACK_RUN_CONFIG_VERSION = "2"
@ -314,6 +315,13 @@ Configuration for the persistence store used by the distribution registry. If no
a default SQLite store will be used.""",
)
inference_store: SqlStoreConfig | None = Field(
default=None,
description="""
Configuration for the persistence store used by the inference API. If not specified,
a default SQLite store will be used.""",
)
# registry of "resources" in the distribution
models: list[ModelInput] = Field(default_factory=list)
shields: list[ShieldInput] = Field(default_factory=list)
@ -362,6 +370,10 @@ class BuildConfig(BaseModel):
description="Path to directory containing external provider implementations. The providers packages will be resolved from this directory. "
"pip_packages MUST contain the provider package name.",
)
additional_pip_packages: list[str] = Field(
default_factory=list,
description="Additional pip packages to install in the distribution. These packages will be installed in the distribution environment.",
)
@field_validator("external_providers_dir")
@classmethod

View file

@ -140,7 +140,7 @@ async def resolve_impls(
sorted_providers = sort_providers_by_deps(providers_with_specs, run_config)
return await instantiate_providers(sorted_providers, router_apis, dist_registry)
return await instantiate_providers(sorted_providers, router_apis, dist_registry, run_config)
def specs_for_autorouted_apis(apis_to_serve: list[str] | set[str]) -> dict[str, dict[str, ProviderWithSpec]]:
@ -243,7 +243,10 @@ def sort_providers_by_deps(
async def instantiate_providers(
sorted_providers: list[tuple[str, ProviderWithSpec]], router_apis: set[Api], dist_registry: DistributionRegistry
sorted_providers: list[tuple[str, ProviderWithSpec]],
router_apis: set[Api],
dist_registry: DistributionRegistry,
run_config: StackRunConfig,
) -> dict:
"""Instantiates providers asynchronously while managing dependencies."""
impls: dict[Api, Any] = {}
@ -258,7 +261,7 @@ async def instantiate_providers(
if isinstance(provider.spec, RoutingTableProviderSpec):
inner_impls = inner_impls_by_provider_id[f"inner-{provider.spec.router_api.value}"]
impl = await instantiate_provider(provider, deps, inner_impls, dist_registry)
impl = await instantiate_provider(provider, deps, inner_impls, dist_registry, run_config)
if api_str.startswith("inner-"):
inner_impls_by_provider_id[api_str][provider.provider_id] = impl
@ -308,6 +311,7 @@ async def instantiate_provider(
deps: dict[Api, Any],
inner_impls: dict[str, Any],
dist_registry: DistributionRegistry,
run_config: StackRunConfig,
):
provider_spec = provider.spec
if not hasattr(provider_spec, "module"):
@ -327,7 +331,7 @@ async def instantiate_provider(
method = "get_auto_router_impl"
config = None
args = [provider_spec.api, deps[provider_spec.routing_table_api], deps]
args = [provider_spec.api, deps[provider_spec.routing_table_api], deps, run_config]
elif isinstance(provider_spec, RoutingTableProviderSpec):
method = "get_routing_table_impl"

View file

@ -7,8 +7,10 @@
from typing import Any
from llama_stack.distribution.datatypes import RoutedProtocol
from llama_stack.distribution.stack import StackRunConfig
from llama_stack.distribution.store import DistributionRegistry
from llama_stack.providers.datatypes import Api, RoutingTable
from llama_stack.providers.utils.inference.inference_store import InferenceStore
from .routing_tables import (
BenchmarksRoutingTable,
@ -45,7 +47,9 @@ async def get_routing_table_impl(
return impl
async def get_auto_router_impl(api: Api, routing_table: RoutingTable, deps: dict[str, Any]) -> Any:
async def get_auto_router_impl(
api: Api, routing_table: RoutingTable, deps: dict[str, Any], run_config: StackRunConfig
) -> Any:
from .routers import (
DatasetIORouter,
EvalRouter,
@ -76,6 +80,12 @@ async def get_auto_router_impl(api: Api, routing_table: RoutingTable, deps: dict
if dep_api in deps:
api_to_dep_impl[dep_name] = deps[dep_api]
# TODO: move pass configs to routers instead
if api == Api.inference and run_config.inference_store:
inference_store = InferenceStore(run_config.inference_store)
await inference_store.initialize()
api_to_dep_impl["store"] = inference_store
impl = api_to_routers[api.value](routing_table, **api_to_dep_impl)
await impl.initialize()
return impl

View file

@ -32,8 +32,11 @@ from llama_stack.apis.inference import (
EmbeddingsResponse,
EmbeddingTaskType,
Inference,
ListOpenAIChatCompletionResponse,
LogProbConfig,
Message,
OpenAICompletionWithInputMessages,
Order,
ResponseFormat,
SamplingParams,
StopReason,
@ -73,6 +76,8 @@ from llama_stack.log import get_logger
from llama_stack.models.llama.llama3.chat_format import ChatFormat
from llama_stack.models.llama.llama3.tokenizer import Tokenizer
from llama_stack.providers.datatypes import HealthResponse, HealthStatus, RoutingTable
from llama_stack.providers.utils.inference.inference_store import InferenceStore
from llama_stack.providers.utils.inference.stream_utils import stream_and_store_openai_completion
from llama_stack.providers.utils.telemetry.tracing import get_current_span
logger = get_logger(name=__name__, category="core")
@ -141,10 +146,12 @@ class InferenceRouter(Inference):
self,
routing_table: RoutingTable,
telemetry: Telemetry | None = None,
store: InferenceStore | None = None,
) -> None:
logger.debug("Initializing InferenceRouter")
self.routing_table = routing_table
self.telemetry = telemetry
self.store = store
if self.telemetry:
self.tokenizer = Tokenizer.get_instance()
self.formatter = ChatFormat(self.tokenizer)
@ -607,9 +614,31 @@ class InferenceRouter(Inference):
provider = self.routing_table.get_provider_impl(model_obj.identifier)
if stream:
return await provider.openai_chat_completion(**params)
response_stream = await provider.openai_chat_completion(**params)
if self.store:
return stream_and_store_openai_completion(response_stream, model, self.store, messages)
return response_stream
else:
return await self._nonstream_openai_chat_completion(provider, params)
response = await self._nonstream_openai_chat_completion(provider, params)
if self.store:
await self.store.store_chat_completion(response, messages)
return response
async def list_chat_completions(
self,
after: str | None = None,
limit: int | None = 20,
model: str | None = None,
order: Order | None = Order.desc,
) -> ListOpenAIChatCompletionResponse:
if self.store:
return await self.store.list_chat_completions(after, limit, model, order)
raise NotImplementedError("List chat completions is not supported: inference store is not configured.")
async def get_chat_completion(self, completion_id: str) -> OpenAICompletionWithInputMessages:
if self.store:
return await self.store.get_chat_completion(completion_id)
raise NotImplementedError("Get chat completion is not supported: inference store is not configured.")
async def _nonstream_openai_chat_completion(self, provider: Inference, params: dict) -> OpenAIChatCompletion:
response = await provider.openai_chat_completion(**params)