Added output type to PreprocessorResponse.

This commit is contained in:
ilya-kolchinsky 2025-03-06 14:05:05 +01:00
parent b981181b25
commit 4c81a72214
7 changed files with 33 additions and 25 deletions

View file

@ -56,7 +56,8 @@ PreprocessorChain = List[PreprocessorChainElement]
@json_schema_type
class PreprocessorResponse(BaseModel):
status: bool
success: bool
preprocessor_output_type: PreprocessingDataType
results: Optional[List[str | InterleavedContent | Chunk]] = None
@ -84,5 +85,4 @@ class Preprocessing(Protocol):
self,
preprocessors: PreprocessorChain,
preprocessor_inputs: List[PreprocessorInput],
is_rag_chain: Optional[bool] = False,
) -> PreprocessorResponse: ...

View file

@ -521,7 +521,6 @@ class PreprocessingRouter(Preprocessing):
self,
preprocessors: PreprocessorChain,
preprocessor_inputs: List[PreprocessorInput],
is_rag_chain: Optional[bool] = False,
) -> PreprocessorResponse:
preprocessor_impls = [self.routing_table.get_provider_impl(p.preprocessor_id) for p in preprocessors]
return await execute_preprocessor_chain(preprocessors, preprocessor_impls, preprocessor_inputs, is_rag_chain)
return await execute_preprocessor_chain(preprocessors, preprocessor_impls, preprocessor_inputs)

View file

@ -9,7 +9,6 @@ from typing import List
from llama_stack.apis.preprocessing import (
Preprocessing,
PreprocessingDataType,
PreprocessorChain,
PreprocessorInput,
PreprocessorResponse,
@ -18,17 +17,11 @@ from llama_stack.apis.preprocessing import (
log = logging.getLogger(__name__)
def validate_chain(chain_impls: List[Preprocessing], is_rag_chain: bool) -> bool:
def validate_chain(chain_impls: List[Preprocessing]) -> bool:
if len(chain_impls) == 0:
log.error("Empty preprocessing chain was provided")
return False
if is_rag_chain and PreprocessingDataType.chunks not in chain_impls[-1].output_types:
log.error(
f"RAG preprocessing chain must end with a chunk-producing preprocessor, but the last preprocessor in the provided chain only supports {chain_impls[-1].output_types}"
)
return False
for current_preprocessor, next_preprocessor in pairwise(chain_impls):
current_output_types = current_preprocessor.output_types
next_input_types = next_preprocessor.input_types
@ -46,13 +39,13 @@ async def execute_preprocessor_chain(
preprocessor_chain: PreprocessorChain,
preprocessor_chain_impls: List[Preprocessing],
preprocessor_inputs: List[PreprocessorInput],
is_rag_chain: bool,
) -> PreprocessorResponse:
if not validate_chain(preprocessor_chain_impls, is_rag_chain):
return PreprocessorResponse(status=False, results=[])
if not validate_chain(preprocessor_chain_impls):
return PreprocessorResponse(success=False, results=[])
current_inputs = preprocessor_inputs
current_outputs = []
current_result_type = None
# TODO: replace with a parallel implementation
for i, current_params in enumerate(preprocessor_chain):
@ -62,10 +55,13 @@ async def execute_preprocessor_chain(
preprocessor_inputs=current_inputs,
options=current_params.options,
)
if not response.status:
if not response.success:
log.error(f"Preprocessor {current_params.preprocessor_id} returned an error")
return PreprocessorResponse(status=False, results=[])
return PreprocessorResponse(
success=False, preprocessor_output_type=response.preprocessor_output_type, results=[]
)
current_outputs = response.results
current_inputs = current_outputs
current_result_type = response.preprocessor_output_type
return PreprocessorResponse(status=True, results=current_outputs)
return PreprocessorResponse(success=True, preprocessor_output_type=current_result_type, results=current_outputs)

View file

@ -88,13 +88,14 @@ class InclineBasicPreprocessorImpl(Preprocessing, PreprocessorsProtocolPrivate):
results.append(document)
return PreprocessorResponse(status=True, results=results)
return PreprocessorResponse(
success=True, preprocessor_output_type=PreprocessingDataType.raw_text_document, results=results
)
async def chain_preprocess(
self,
preprocessors: PreprocessorChain,
preprocessor_inputs: List[PreprocessorInput],
is_rag_chain: Optional[bool] = False,
) -> PreprocessorResponse:
return await self.preprocess(preprocessor_id="", preprocessor_inputs=preprocessor_inputs)

View file

@ -75,12 +75,14 @@ class InclineDoclingPreprocessorImpl(Preprocessing, PreprocessorsProtocolPrivate
result = converted_document.export_to_markdown()
results.append(result)
return PreprocessorResponse(status=True, results=results)
preprocessor_output_type = (
PreprocessingDataType.chunks if self.config.chunk else PreprocessingDataType.raw_text_document
)
return PreprocessorResponse(success=True, preprocessor_output_type=preprocessor_output_type, results=results)
async def chain_preprocess(
self,
preprocessors: PreprocessorChain,
preprocessor_inputs: List[PreprocessorInput],
is_rag_chain: Optional[bool] = False,
) -> PreprocessorResponse:
return await self.preprocess(preprocessor_id="", preprocessor_inputs=preprocessor_inputs)

View file

@ -62,13 +62,12 @@ class InclineSimpleChunkingImpl(Preprocessing, PreprocessorsProtocolPrivate):
)
chunks.extend(new_chunks)
return PreprocessorResponse(status=True, results=chunks)
return PreprocessorResponse(success=True, preprocessor_output_type=PreprocessingDataType.chunks, results=chunks)
async def chain_preprocess(
self,
preprocessors: PreprocessorChain,
preprocessor_inputs: List[PreprocessorInput],
is_rag_chain: Optional[bool] = False,
) -> PreprocessorResponse:
return await self.preprocess(preprocessor_id="", preprocessor_inputs=preprocessor_inputs)

View file

@ -21,6 +21,7 @@ from llama_stack.apis.inference import Inference
from llama_stack.apis.preprocessing import (
Preprocessing,
PreprocessingDataFormat,
PreprocessingDataType,
PreprocessorChainElement,
PreprocessorInput,
)
@ -81,9 +82,19 @@ class MemoryToolRuntimeImpl(ToolsProtocolPrivate, ToolRuntime, RAGToolRuntime):
preprocessors=preprocessor_chain, preprocessor_inputs=preprocessor_inputs
)
chunks = preprocessor_response.results
if not preprocessor_response.success:
log.error("Preprocessor chain returned an error")
return
if preprocessor_response.preprocessor_output_type != PreprocessingDataType.chunks:
log.error(
f"Preprocessor chain returned {preprocessor_response.preprocessor_output_type} instead of {PreprocessingDataType.chunks}"
)
return
chunks = preprocessor_response.results
if not chunks:
log.error("No chunks returned by the preprocessor chain")
return
await self.vector_io_api.insert_chunks(