The first draft of the Preprocessing API.

This commit is contained in:
ilya-kolchinsky 2025-03-03 13:32:17 +01:00
parent 7f9b767277
commit aa1b670d5c
18 changed files with 327 additions and 0 deletions

View file

@ -14,6 +14,7 @@ from .routing_tables import (
BenchmarksRoutingTable,
DatasetsRoutingTable,
ModelsRoutingTable,
PreprocessorsRoutingTable,
ScoringFunctionsRoutingTable,
ShieldsRoutingTable,
ToolGroupsRoutingTable,
@ -35,6 +36,7 @@ async def get_routing_table_impl(
"scoring_functions": ScoringFunctionsRoutingTable,
"benchmarks": BenchmarksRoutingTable,
"tool_groups": ToolGroupsRoutingTable,
"preprocessors": PreprocessorsRoutingTable,
}
if api.value not in api_to_tables:
@ -50,6 +52,7 @@ async def get_auto_router_impl(api: Api, routing_table: RoutingTable, _deps) ->
DatasetIORouter,
EvalRouter,
InferenceRouter,
PreprocessingRouter,
SafetyRouter,
ScoringRouter,
ToolRuntimeRouter,
@ -64,6 +67,7 @@ async def get_auto_router_impl(api: Api, routing_table: RoutingTable, _deps) ->
"scoring": ScoringRouter,
"eval": EvalRouter,
"tool_runtime": ToolRuntimeRouter,
"preprocessing": PreprocessingRouter,
}
if api.value not in api_to_routers:
raise ValueError(f"API {api.value} not found in router map")

View file

@ -34,6 +34,7 @@ from llama_stack.apis.inference import (
ToolPromptFormat,
)
from llama_stack.apis.models import ModelType
from llama_stack.apis.preprocessing import Preprocessing, PreprocessingInput, PreprocessingResponse, PreprocessorOptions
from llama_stack.apis.safety import RunShieldResponse, Safety
from llama_stack.apis.scoring import (
ScoreBatchResponse,
@ -482,3 +483,28 @@ class ToolRuntimeRouter(ToolRuntime):
self, tool_group_id: Optional[str] = None, mcp_endpoint: Optional[URL] = None
) -> List[ToolDef]:
return await self.routing_table.get_provider_impl(tool_group_id).list_tools(tool_group_id, mcp_endpoint)
class PreprocessingRouter(Preprocessing):
def __init__(
self,
routing_table: RoutingTable,
) -> None:
self.routing_table = routing_table
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
pass
async def preprocess(
self,
preprocessor_id: str,
preprocessor_inputs: List[PreprocessingInput],
options: PreprocessorOptions,
) -> PreprocessingResponse:
return await self.routing_table.get_provider_impl(preprocessor_id).preprocess(
preprocessor_inputs=preprocessor_inputs,
options=options,
)

View file

@ -14,6 +14,7 @@ from llama_stack.apis.common.content_types import URL
from llama_stack.apis.common.type_system import ParamType
from llama_stack.apis.datasets import Dataset, Datasets, ListDatasetsResponse
from llama_stack.apis.models import ListModelsResponse, Model, Models, ModelType
from llama_stack.apis.preprocessing.preprocessors import ListPreprocessorsResponse, Preprocessor, Preprocessors
from llama_stack.apis.resource import ResourceType
from llama_stack.apis.scoring_functions import (
ListScoringFunctionsResponse,
@ -66,6 +67,8 @@ async def register_object_with_provider(obj: RoutableObject, p: Any) -> Routable
return await p.register_benchmark(obj)
elif api == Api.tool_runtime:
return await p.register_tool(obj)
elif api == Api.preprocessing:
return await p.register_preprocessor(obj)
else:
raise ValueError(f"Unknown API {api} for registering object with provider")
@ -80,6 +83,8 @@ async def unregister_object_from_provider(obj: RoutableObject, p: Any) -> None:
return await p.unregister_dataset(obj.identifier)
elif api == Api.tool_runtime:
return await p.unregister_tool(obj.identifier)
elif api == Api.preprocessing:
return await p.unregister_preprocessor(obj.identifier)
else:
raise ValueError(f"Unregister not supported for {api}")
@ -127,6 +132,8 @@ class CommonRoutingTableImpl(RoutingTable):
p.benchmark_store = self
elif api == Api.tool_runtime:
p.tool_store = self
elif api == Api.preprocessing:
p.preprocessor_store = self
async def shutdown(self) -> None:
for p in self.impls_by_provider_id.values():
@ -148,6 +155,8 @@ class CommonRoutingTableImpl(RoutingTable):
return ("Eval", "benchmark")
elif isinstance(self, ToolGroupsRoutingTable):
return ("Tools", "tool")
elif isinstance(self, PreprocessorsRoutingTable):
return ("Preprocessing", "preprocessor")
else:
raise ValueError("Unknown routing table type")
@ -536,3 +545,40 @@ class ToolGroupsRoutingTable(CommonRoutingTableImpl, ToolGroups):
async def shutdown(self) -> None:
pass
class PreprocessorsRoutingTable(CommonRoutingTableImpl, Preprocessors):
async def list_preprocessors(self) -> ListPreprocessorsResponse:
return ListPreprocessorsResponse(data=await self.get_all_with_type(ResourceType.preprocessor.value))
async def get_preprocessor(self, preprocessor_id: str) -> Optional[Preprocessor]:
return await self.get_object_by_identifier("preprocessor", preprocessor_id)
async def register_preprocessor(
self,
preprocessor_id: str,
provider_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Preprocessor:
if provider_id is None:
if len(self.impls_by_provider_id) == 1:
provider_id = list(self.impls_by_provider_id.keys())[0]
else:
raise ValueError(
"No provider specified and multiple providers available. Please specify a provider_id."
)
preprocessor = Preprocessor(
identifier=preprocessor_id,
provider_resource_id=preprocessor_id,
provider_id=provider_id,
metadata=metadata,
)
preprocessor.provider_id = provider_id
await self.register_object(preprocessor)
return preprocessor
async def unregister_preprocessor(self, preprocessor_id: str) -> None:
existing_preprocessor = await self.get_preprocessor(preprocessor_id)
if existing_preprocessor is None:
raise ValueError(f"Preprocessor {preprocessor_id} not found")
await self.unregister_object(existing_preprocessor)