feat(file_processors): add Docling(inline and remote) and PyPDF file processors with configuration and processing capabilities

This commit introduces three new file processors: Docling inline, Docling Serve remote, and PyPDF. Each processor includes configuration classes and implements the necessary methods for file processing, enhancing the file management capabilities of the LlamaStack.
This commit is contained in:
Shruthi Sankepelly 2025-10-29 15:20:53 -04:00
parent 25b0298df5
commit 98544bdca8
10 changed files with 463 additions and 0 deletions

View file

@ -0,0 +1,14 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import DoclingConfig
from .docling import DoclingFileProcessorImpl
__all__ = ["DoclingConfig", "DoclingFileProcessorImpl"]
async def get_adapter_impl(config: DoclingConfig, _deps):
return DoclingFileProcessorImpl(config)

View file

@ -0,0 +1,23 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel, Field
class DoclingConfig(BaseModel):
timeout_seconds: int = Field(default=120, ge=1, le=600, description="Processing timeout in seconds")
max_file_size_mb: int = Field(default=100, ge=1, le=1000, description="Maximum file size in MB")
model_cache_dir: str | None = Field(default=None, description="Directory to cache Docling models")
enable_gpu: bool = Field(default=False, description="Enable GPU acceleration if available")
@staticmethod
def sample_run_config(**kwargs):
return {
"timeout_seconds": 120,
"max_file_size_mb": 100,
"model_cache_dir": None,
"enable_gpu": False,
}

View file

@ -0,0 +1,152 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import tempfile
import logging
import time
from typing import Any
from pathlib import Path
from llama_stack.apis.file_processors import FileProcessors, ProcessedContent
from .config import DoclingConfig
logger = logging.getLogger(__name__)
class DoclingFileProcessorImpl(FileProcessors):
def __init__(self, config: DoclingConfig):
self.config = config
self.convert_manager = None
self._initialize_docling()
logger.info("Docling processor initialized with ConvertManager")
def _initialize_docling(self):
"""Initialize Docling using ConvertManager from docling-jobkit"""
try:
from docling_jobkit.convert.manager import ConvertManager
# Initialize ConvertManager with configuration
manager_config = {
"cache_dir": self.config.model_cache_dir,
"enable_gpu": self.config.enable_gpu,
}
# Remove None values from config
manager_config = {k: v for k, v in manager_config.items() if v is not None}
self.convert_manager = ConvertManager(**manager_config)
logger.info("Docling ConvertManager initialized successfully")
except ImportError as e:
logger.error("Docling JobKit not installed. Run: pip install docling-jobkit")
raise ImportError("Docling JobKit not installed. Run: pip install docling-jobkit") from e
except Exception as e:
logger.error(f"Failed to initialize Docling ConvertManager: {e}")
raise RuntimeError(f"Failed to initialize Docling ConvertManager: {e}") from e
def _parse_docling_options(self, options: dict[str, Any]) -> dict[str, Any]:
"""Parse and validate Docling-specific options"""
if not options:
return {}
# ConvertManager supports these options
docling_options = {}
# Output format options
if "format" in options:
docling_options["output_format"] = options["format"]
# Processing options that ConvertManager handles
if "extract_tables" in options:
docling_options["extract_tables"] = bool(options["extract_tables"])
if "extract_figures" in options:
docling_options["extract_figures"] = bool(options["extract_figures"])
if "ocr_enabled" in options:
docling_options["ocr_enabled"] = bool(options["ocr_enabled"])
if "ocr_languages" in options and isinstance(options["ocr_languages"], list):
docling_options["ocr_languages"] = options["ocr_languages"]
if "preserve_layout" in options:
docling_options["preserve_layout"] = bool(options["preserve_layout"])
return docling_options
async def process_file(
self,
file_data: bytes,
filename: str,
options: dict[str, Any] | None = None
) -> ProcessedContent:
start_time = time.time()
options = options or {}
logger.info(f"Processing file with Docling ConvertManager: {filename}, size: {len(file_data)} bytes")
logger.debug(f"Docling options: {options}")
try:
# Parse options for ConvertManager
docling_options = self._parse_docling_options(options)
# Get converter from ConvertManager
# This leverages the official docling-jobkit approach
converter = self.convert_manager.get_converter(**docling_options)
# Process file using temporary file (Docling requirement)
with tempfile.NamedTemporaryFile(suffix=f"_{filename}", delete=False) as tmp:
tmp.write(file_data)
tmp.flush()
tmp_path = Path(tmp.name)
try:
# Convert using the managed converter
result = converter.convert(tmp_path)
# Determine output format
format_type = options.get("format", "markdown")
# Export content based on requested format
if format_type == "markdown":
content = result.document.export_to_markdown()
elif format_type == "html":
content = result.document.export_to_html()
elif format_type == "json":
content = result.document.export_to_json()
else:
content = result.document.export_to_text()
processing_time = time.time() - start_time
# Extract metadata from Docling result
processed = ProcessedContent(
content=content,
metadata={
"pages": len(result.document.pages) if hasattr(result.document, 'pages') else 0,
"tables": len(result.document.tables) if hasattr(result.document, 'tables') else 0,
"figures": len(result.document.figures) if hasattr(result.document, 'figures') else 0,
"format": format_type,
"processor": "docling_jobkit",
"processing_time_seconds": processing_time,
"content_length": len(content),
"filename": filename,
"file_size_bytes": len(file_data),
"converter_options": docling_options,
"docling_version": getattr(result, 'version', 'unknown'),
}
)
logger.info(f"Docling processing completed: {processed.metadata.get('pages', 0)} pages, "
f"{processed.metadata.get('tables', 0)} tables, {processing_time:.2f}s")
return processed
finally:
# Clean up temporary file
try:
tmp_path.unlink()
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup temp file {tmp_path}: {cleanup_error}")
except Exception as e:
logger.error(f"Docling processing failed for {filename}: {str(e)}")
raise RuntimeError(f"Docling processing failed: {str(e)}") from e

View file

@ -0,0 +1,14 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import PyPDFConfig
from .pypdf import PyPDFFileProcessorImpl
__all__ = ["PyPDFConfig", "PyPDFFileProcessorImpl"]
async def get_adapter_impl(config: PyPDFConfig, _deps):
return PyPDFFileProcessorImpl(config)

View file

@ -0,0 +1,19 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel, Field
class PyPDFConfig(BaseModel):
timeout_seconds: int = Field(default=30, ge=1, le=300, description="Processing timeout in seconds")
max_file_size_mb: int = Field(default=50, ge=1, le=500, description="Maximum file size in MB")
@staticmethod
def sample_run_config(**kwargs):
return {
"timeout_seconds": 30,
"max_file_size_mb": 50,
}

View file

@ -0,0 +1,62 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import io
import logging
import time
from typing import Any
from llama_stack.apis.file_processors import FileProcessors, ProcessedContent
from .config import PyPDFConfig
logger = logging.getLogger(__name__)
class PyPDFFileProcessorImpl(FileProcessors):
def __init__(self, config: PyPDFConfig):
self.config = config
logger.info("PyPDF processor initialized")
async def process_file(
self,
file_data: bytes,
filename: str,
options: dict[str, Any] | None = None
) -> ProcessedContent:
start_time = time.time()
logger.info(f"Processing PDF file: {filename}, size: {len(file_data)} bytes")
try:
# Import here to avoid dependency issues if pypdf not installed
from pypdf import PdfReader
# Migrate existing 3-line logic from vector_store.py
pdf_reader = PdfReader(io.BytesIO(file_data))
text = "\n".join([page.extract_text() for page in pdf_reader.pages])
processing_time = time.time() - start_time
result = ProcessedContent(
content=text,
metadata={
"pages": len(pdf_reader.pages),
"processor": "pypdf",
"processing_time_seconds": processing_time,
"content_length": len(text),
"filename": filename,
"file_size_bytes": len(file_data)
}
)
logger.info(f"PyPDF processing completed: {len(pdf_reader.pages)} pages, {len(text)} chars, {processing_time:.2f}s")
return result
except ImportError:
logger.error("PyPDF not installed. Run: pip install pypdf")
raise RuntimeError("PyPDF not installed. Run: pip install pypdf")
except Exception as e:
logger.error(f"PyPDF processing failed for {filename}: {str(e)}")
raise RuntimeError(f"PyPDF processing failed: {str(e)}") from e

View file

@ -0,0 +1,42 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.providers.datatypes import Api, InlineProviderSpec, RemoteProviderSpec, ProviderSpec
def available_providers() -> list[ProviderSpec]:
return [
# PyPDF - Default provider for backward compatibility
InlineProviderSpec(
api=Api.file_processors,
provider_type="inline::pypdf",
pip_packages=["pypdf"],
module="llama_stack.providers.inline.file_processors.pypdf",
config_class="llama_stack.providers.inline.file_processors.pypdf.PyPDFConfig",
description="Simple PDF text extraction using PyPDF library. Default processor for backward compatibility."
),
# Docling with JobKit - Advanced inline processing
InlineProviderSpec(
api=Api.file_processors,
provider_type="inline::docling",
pip_packages=["docling-jobkit", "docling", "torch", "torchvision"], # Updated with docling-jobkit
module="llama_stack.providers.inline.file_processors.docling",
config_class="llama_stack.providers.inline.file_processors.docling.DoclingConfig",
description="Advanced document processing using Docling JobKit ConvertManager with table/figure extraction"
),
# Docling Serve - Remote processing
RemoteProviderSpec(
api=Api.file_processors,
adapter_type="docling_serve",
provider_type="remote::docling_serve",
pip_packages=["aiohttp"],
module="llama_stack.providers.remote.file_processors.docling_serve",
config_class="llama_stack.providers.remote.file_processors.docling_serve.DoclingServeConfig",
description="Remote Docling processing via Docling Serve API endpoint"
),
]

View file

@ -0,0 +1,14 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import DoclingServeConfig
from .docling_serve import DoclingServeFileProcessorImpl
__all__ = ["DoclingServeConfig", "DoclingServeFileProcessorImpl"]
async def get_adapter_impl(config: DoclingServeConfig, _deps):
return DoclingServeFileProcessorImpl(config)

View file

@ -0,0 +1,23 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel, Field
class DoclingServeConfig(BaseModel):
base_url: str = Field(..., description="Base URL of the Docling Serve endpoint")
api_key: str | None = Field(default=None, description="API key for authentication")
timeout_seconds: int = Field(default=120, ge=1, le=600, description="Request timeout in seconds")
max_file_size_mb: int = Field(default=100, ge=1, le=1000, description="Maximum file size in MB")
@staticmethod
def sample_run_config(**kwargs):
return {
"base_url": "http://localhost:8080",
"api_key": None,
"timeout_seconds": 120,
"max_file_size_mb": 100,
}

View file

@ -0,0 +1,100 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import aiohttp
import logging
import time
from typing import Any
from llama_stack.apis.file_processors import FileProcessors, ProcessedContent
from .config import DoclingServeConfig
logger = logging.getLogger(__name__)
class DoclingServeFileProcessorImpl(FileProcessors):
def __init__(self, config: DoclingServeConfig):
self.config = config
self.base_url = config.base_url.rstrip('/')
self.api_key = config.api_key
self.timeout = config.timeout_seconds
logger.info(f"DoclingServe processor initialized with endpoint: {self.base_url}")
async def process_file(
self,
file_data: bytes,
filename: str,
options: dict[str, Any] | None = None
) -> ProcessedContent:
start_time = time.time()
options = options or {}
logger.info(f"Processing file with DoclingServe: {filename}, size: {len(file_data)} bytes")
logger.debug(f"DoclingServe options: {options}")
try:
headers = {'Content-Type': 'application/octet-stream'}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# Prepare request parameters
params = {
'filename': filename,
'output_format': options.get('format', 'markdown'),
}
# Add other docling-specific options
if 'extract_tables' in options:
params['extract_tables'] = str(options['extract_tables']).lower()
if 'extract_figures' in options:
params['extract_figures'] = str(options['extract_figures']).lower()
if 'ocr_enabled' in options:
params['ocr_enabled'] = str(options['ocr_enabled']).lower()
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
async with session.post(
f"{self.base_url}/v1/convert",
headers=headers,
params=params,
data=file_data
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"DoclingServe API error: {response.status} - {error_text}")
raise RuntimeError(f"DoclingServe API error: {response.status} - {error_text}")
result_data = await response.json()
processing_time = time.time() - start_time
processed = ProcessedContent(
content=result_data.get('content', ''),
metadata={
'pages': result_data.get('pages', 0),
'tables': result_data.get('tables_extracted', 0),
'figures': result_data.get('figures_extracted', 0),
'format': result_data.get('output_format', 'markdown'),
'processor': 'docling_serve',
'processing_time_seconds': processing_time,
'content_length': len(result_data.get('content', '')),
'server_processing_time': result_data.get('server_processing_time'),
'server_version': result_data.get('server_version'),
'filename': filename,
'file_size_bytes': len(file_data)
}
)
logger.info(f"DoclingServe processing completed: {result_data.get('pages', 0)} pages, "
f"{result_data.get('tables_extracted', 0)} tables, {processing_time:.2f}s")
return processed
except aiohttp.ClientTimeout:
logger.error(f"DoclingServe timeout after {self.timeout}s for {filename}")
raise RuntimeError(f"DoclingServe processing timeout after {self.timeout} seconds")
except Exception as e:
logger.error(f"DoclingServe processing failed for {filename}: {str(e)}")
raise RuntimeError(f"DoclingServe processing failed: {str(e)}") from e