Restrict the changes to the new preprocessing API only.

This commit is contained in:
ilya-kolchinsky 2025-04-03 12:19:08 +02:00
parent 2008cd7921
commit 863f87aa15
90 changed files with 104 additions and 1138 deletions

View file

@ -1,18 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import InlineBasicPreprocessorConfig
async def get_provider_impl(
config: InlineBasicPreprocessorConfig,
_deps,
):
from .basic import InclineBasicPreprocessorImpl
impl = InclineBasicPreprocessorImpl(config)
await impl.initialize()
return impl

View file

@ -1,151 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import logging
import re
from typing import Any, List, Optional
import httpx
from llama_stack.apis.common.content_types import URL
from llama_stack.apis.preprocessing import (
Preprocessing,
PreprocessingDataElement,
PreprocessingDataFormat,
PreprocessingDataType,
Preprocessor,
PreprocessorChain,
PreprocessorOptions,
PreprocessorResponse,
)
from llama_stack.providers.datatypes import PreprocessorsProtocolPrivate
from llama_stack.providers.inline.preprocessing.basic.config import InlineBasicPreprocessorConfig
from llama_stack.providers.utils.inference.prompt_adapter import interleaved_content_as_str
from llama_stack.providers.utils.memory.vector_store import content_from_data, parse_pdf
log = logging.getLogger(__name__)
class InclineBasicPreprocessorImpl(Preprocessing, PreprocessorsProtocolPrivate):
# this preprocessor can either receive documents (text or binary) or document URIs
input_types = [
PreprocessingDataType.binary_document,
PreprocessingDataType.raw_text_document,
PreprocessingDataType.document_uri,
]
# this preprocessor optionally retrieves the documents and converts them into plain text
output_types = [PreprocessingDataType.raw_text_document]
preprocessor_store = None
URL_VALIDATION_PATTERN = re.compile("^(https?://|file://|data:)")
def __init__(self, config: InlineBasicPreprocessorConfig) -> None:
self.config = config
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
pass
async def register_preprocessor(self, preprocessor: Preprocessor) -> None:
pass
async def unregister_preprocessor(self, preprocessor_id: str) -> None:
pass
async def do_preprocess(
self,
preprocessor_id: str,
preprocessor_inputs: List[PreprocessingDataElement],
options: Optional[PreprocessorOptions] = None,
) -> PreprocessorResponse:
results = []
for inp in preprocessor_inputs:
input_type = self._resolve_input_type(inp)
if input_type == PreprocessingDataType.document_uri:
document = await self._fetch_document(inp)
if document is None:
continue
elif input_type == PreprocessingDataType.binary_document:
document = inp.data_element_path_or_content
if inp.data_element_format is None:
log.error(f"Binary document format is not provided for {inp.data_element_id}, skipping it")
continue
if inp.data_element_format != PreprocessingDataFormat.pdf:
log.error(
f"Unsupported binary document type {inp.data_element_format} for {inp.data_element_id}, skipping it"
)
continue
elif input_type == PreprocessingDataType.raw_text_document:
document = interleaved_content_as_str(inp.data_element_path_or_content) # type: ignore
else:
log.error(f"Unexpected preprocessor input type: {input_type}")
continue
if inp.data_element_format == PreprocessingDataFormat.pdf:
document = parse_pdf(document)
new_result = PreprocessingDataElement(
data_element_id=inp.data_element_id,
data_element_type=PreprocessingDataType.raw_text_document,
data_element_format=PreprocessingDataFormat.txt,
data_element_path_or_content=document,
)
results.append(new_result)
return PreprocessorResponse(
success=True, output_data_type=PreprocessingDataType.raw_text_document, results=results
)
async def preprocess(
self,
preprocessors: PreprocessorChain,
preprocessor_inputs: List[PreprocessingDataElement],
) -> PreprocessorResponse:
return await self.do_preprocess(preprocessor_id="", preprocessor_inputs=preprocessor_inputs)
@staticmethod
def _resolve_input_type(preprocessor_input: PreprocessingDataElement) -> PreprocessingDataType:
if preprocessor_input.data_element_type is not None:
return preprocessor_input.data_element_type
if isinstance(preprocessor_input.data_element_path_or_content, URL):
return PreprocessingDataType.document_uri
if InclineBasicPreprocessorImpl.URL_VALIDATION_PATTERN.match(
str(preprocessor_input.data_element_path_or_content)
):
return PreprocessingDataType.document_uri
if preprocessor_input.data_element_format == PreprocessingDataFormat.pdf:
return PreprocessingDataType.binary_document
return PreprocessingDataType.raw_text_document
@staticmethod
async def _fetch_document(preprocessor_input: PreprocessingDataElement) -> Any:
if isinstance(preprocessor_input.data_element_path_or_content, str):
url = preprocessor_input.data_element_path_or_content
if not InclineBasicPreprocessorImpl.URL_VALIDATION_PATTERN.match(url):
log.error(f"Unexpected URL: {url}")
return None
elif isinstance(preprocessor_input.data_element_path_or_content, URL):
url = preprocessor_input.data_element_path_or_content.uri
else:
log.error(
f"Unexpected type {type(preprocessor_input.data_element_path_or_content)} for input {preprocessor_input.data_element_path_or_content}, skipping this input."
)
return None
if url.startswith("data:"):
return content_from_data(url)
async with httpx.AsyncClient() as client:
r = await client.get(url)
return r.content if preprocessor_input.data_element_format == PreprocessingDataFormat.pdf else r.text

View file

@ -1,9 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel
class InlineBasicPreprocessorConfig(BaseModel): ...

View file

@ -1,18 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import InclineSimpleChunkingConfig
async def get_provider_impl(
config: InclineSimpleChunkingConfig,
_deps,
):
from .simple_chunking import InclineSimpleChunkingImpl
impl = InclineSimpleChunkingImpl(config)
await impl.initialize()
return impl

View file

@ -1,11 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel
class InclineSimpleChunkingConfig(BaseModel):
chunk_size_in_tokens: int = 512
chunk_overlap_ratio: int = 4

View file

@ -1,116 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import logging
from enum import Enum
from typing import List, Optional, Tuple
from llama_models.llama3.api import Tokenizer
from llama_stack.apis.preprocessing import (
Preprocessing,
PreprocessingDataElement,
PreprocessingDataFormat,
PreprocessingDataType,
Preprocessor,
PreprocessorChain,
PreprocessorOptions,
PreprocessorResponse,
)
from llama_stack.apis.vector_io import Chunk
from llama_stack.providers.datatypes import PreprocessorsProtocolPrivate
from llama_stack.providers.inline.preprocessing.simple_chunking import InclineSimpleChunkingConfig
log = logging.getLogger(__name__)
class SimpleChunkingOptions(Enum):
chunk_size_in_tokens = "chunk_size_in_tokens"
chunk_overlap_ratio = "chunk_overlap_ratio"
class InclineSimpleChunkingImpl(Preprocessing, PreprocessorsProtocolPrivate):
# this preprocessor receives plain text and returns chunks
input_types = [PreprocessingDataType.raw_text_document]
output_types = [PreprocessingDataType.chunks]
preprocessor_store = None
def __init__(self, config: InclineSimpleChunkingConfig) -> None:
self.config = config
async def initialize(self) -> None: ...
async def shutdown(self) -> None: ...
async def register_preprocessor(self, preprocessor: Preprocessor) -> None: ...
async def unregister_preprocessor(self, preprocessor_id: str) -> None: ...
async def do_preprocess(
self,
preprocessor_id: str,
preprocessor_inputs: List[PreprocessingDataElement],
options: Optional[PreprocessorOptions] = None,
) -> PreprocessorResponse:
chunks = []
window_len, overlap_len = self._resolve_chunk_size_params(options)
for inp in preprocessor_inputs:
new_chunks = self.make_overlapped_chunks(
inp.data_element_id, str(inp.data_element_path_or_content), window_len, overlap_len
)
for i, chunk in enumerate(new_chunks):
new_chunk_data_element = PreprocessingDataElement(
data_element_id=f"{inp.data_element_id}_chunk_{i}",
data_element_type=PreprocessingDataType.chunks,
data_element_format=PreprocessingDataFormat.txt,
data_element_path_or_content=chunk,
)
chunks.append(new_chunk_data_element)
return PreprocessorResponse(success=True, output_data_type=PreprocessingDataType.chunks, results=chunks)
async def preprocess(
self,
preprocessors: PreprocessorChain,
preprocessor_inputs: List[PreprocessingDataElement],
) -> PreprocessorResponse:
return await self.do_preprocess(preprocessor_id="", preprocessor_inputs=preprocessor_inputs)
def _resolve_chunk_size_params(self, options: PreprocessorOptions | None) -> Tuple[int, int]:
window_len = (options or {}).get(
str(SimpleChunkingOptions.chunk_size_in_tokens), self.config.chunk_size_in_tokens
)
chunk_overlap_ratio = (options or {}).get(
str(SimpleChunkingOptions.chunk_overlap_ratio), self.config.chunk_overlap_ratio
)
overlap_len = window_len // chunk_overlap_ratio
return window_len, overlap_len
@staticmethod
def make_overlapped_chunks(document_id: str, text: str, window_len: int, overlap_len: int) -> List[Chunk]:
tokenizer = Tokenizer.get_instance()
tokens = tokenizer.encode(text, bos=False, eos=False)
chunks = []
for i in range(0, len(tokens), window_len - overlap_len):
toks = tokens[i : i + window_len]
chunk = tokenizer.decode(toks)
# chunk is a string
chunks.append(
Chunk(
content=chunk,
metadata={
"token_count": len(toks),
"document_id": document_id,
},
)
)
return chunks