From 98544bdca889e1d5340e4e37e979dad18f95bb00 Mon Sep 17 00:00:00 2001 From: Shruthi Sankepelly Date: Wed, 29 Oct 2025 15:20:53 -0400 Subject: [PATCH] feat(file_processors): add Docling(inline and remote) and PyPDF file processors with configuration and processing capabilities This commit introduces three new file processors: Docling inline, Docling Serve remote, and PyPDF. Each processor includes configuration classes and implements the necessary methods for file processing, enhancing the file management capabilities of the LlamaStack. --- .../file_processors/docling/__init__.py | 14 ++ .../inline/file_processors/docling/config.py | 23 +++ .../inline/file_processors/docling/docling.py | 152 ++++++++++++++++++ .../inline/file_processors/pypdf/__init__.py | 14 ++ .../inline/file_processors/pypdf/config.py | 19 +++ .../inline/file_processors/pypdf/pypdf.py | 62 +++++++ .../providers/registry/file_processors.py | 42 +++++ .../file_processors/docling_serve/__init__.py | 14 ++ .../file_processors/docling_serve/config.py | 23 +++ .../docling_serve/docling_serve.py | 100 ++++++++++++ 10 files changed, 463 insertions(+) create mode 100644 src/llama_stack/providers/inline/file_processors/docling/__init__.py create mode 100644 src/llama_stack/providers/inline/file_processors/docling/config.py create mode 100644 src/llama_stack/providers/inline/file_processors/docling/docling.py create mode 100644 src/llama_stack/providers/inline/file_processors/pypdf/__init__.py create mode 100644 src/llama_stack/providers/inline/file_processors/pypdf/config.py create mode 100644 src/llama_stack/providers/inline/file_processors/pypdf/pypdf.py create mode 100644 src/llama_stack/providers/registry/file_processors.py create mode 100644 src/llama_stack/providers/remote/file_processors/docling_serve/__init__.py create mode 100644 src/llama_stack/providers/remote/file_processors/docling_serve/config.py create mode 100644 src/llama_stack/providers/remote/file_processors/docling_serve/docling_serve.py diff --git a/src/llama_stack/providers/inline/file_processors/docling/__init__.py b/src/llama_stack/providers/inline/file_processors/docling/__init__.py new file mode 100644 index 000000000..5e0beb2ea --- /dev/null +++ b/src/llama_stack/providers/inline/file_processors/docling/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .config import DoclingConfig +from .docling import DoclingFileProcessorImpl + +__all__ = ["DoclingConfig", "DoclingFileProcessorImpl"] + + +async def get_adapter_impl(config: DoclingConfig, _deps): + return DoclingFileProcessorImpl(config) \ No newline at end of file diff --git a/src/llama_stack/providers/inline/file_processors/docling/config.py b/src/llama_stack/providers/inline/file_processors/docling/config.py new file mode 100644 index 000000000..63c3fd28c --- /dev/null +++ b/src/llama_stack/providers/inline/file_processors/docling/config.py @@ -0,0 +1,23 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from pydantic import BaseModel, Field + + +class DoclingConfig(BaseModel): + timeout_seconds: int = Field(default=120, ge=1, le=600, description="Processing timeout in seconds") + max_file_size_mb: int = Field(default=100, ge=1, le=1000, description="Maximum file size in MB") + model_cache_dir: str | None = Field(default=None, description="Directory to cache Docling models") + enable_gpu: bool = Field(default=False, description="Enable GPU acceleration if available") + + @staticmethod + def sample_run_config(**kwargs): + return { + "timeout_seconds": 120, + "max_file_size_mb": 100, + "model_cache_dir": None, + "enable_gpu": False, + } \ No newline at end of file diff --git a/src/llama_stack/providers/inline/file_processors/docling/docling.py b/src/llama_stack/providers/inline/file_processors/docling/docling.py new file mode 100644 index 000000000..b6c68a07d --- /dev/null +++ b/src/llama_stack/providers/inline/file_processors/docling/docling.py @@ -0,0 +1,152 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import tempfile +import logging +import time +from typing import Any +from pathlib import Path + +from llama_stack.apis.file_processors import FileProcessors, ProcessedContent +from .config import DoclingConfig + +logger = logging.getLogger(__name__) + + +class DoclingFileProcessorImpl(FileProcessors): + def __init__(self, config: DoclingConfig): + self.config = config + self.convert_manager = None + self._initialize_docling() + logger.info("Docling processor initialized with ConvertManager") + + def _initialize_docling(self): + """Initialize Docling using ConvertManager from docling-jobkit""" + try: + from docling_jobkit.convert.manager import ConvertManager + + # Initialize ConvertManager with configuration + manager_config = { + "cache_dir": self.config.model_cache_dir, + "enable_gpu": self.config.enable_gpu, + } + + # Remove None values from config + manager_config = {k: v for k, v in manager_config.items() if v is not None} + + self.convert_manager = ConvertManager(**manager_config) + logger.info("Docling ConvertManager initialized successfully") + + except ImportError as e: + logger.error("Docling JobKit not installed. Run: pip install docling-jobkit") + raise ImportError("Docling JobKit not installed. Run: pip install docling-jobkit") from e + except Exception as e: + logger.error(f"Failed to initialize Docling ConvertManager: {e}") + raise RuntimeError(f"Failed to initialize Docling ConvertManager: {e}") from e + + def _parse_docling_options(self, options: dict[str, Any]) -> dict[str, Any]: + """Parse and validate Docling-specific options""" + if not options: + return {} + + # ConvertManager supports these options + docling_options = {} + + # Output format options + if "format" in options: + docling_options["output_format"] = options["format"] + + # Processing options that ConvertManager handles + if "extract_tables" in options: + docling_options["extract_tables"] = bool(options["extract_tables"]) + if "extract_figures" in options: + docling_options["extract_figures"] = bool(options["extract_figures"]) + if "ocr_enabled" in options: + docling_options["ocr_enabled"] = bool(options["ocr_enabled"]) + if "ocr_languages" in options and isinstance(options["ocr_languages"], list): + docling_options["ocr_languages"] = options["ocr_languages"] + if "preserve_layout" in options: + docling_options["preserve_layout"] = bool(options["preserve_layout"]) + + return docling_options + + async def process_file( + self, + file_data: bytes, + filename: str, + options: dict[str, Any] | None = None + ) -> ProcessedContent: + start_time = time.time() + options = options or {} + + logger.info(f"Processing file with Docling ConvertManager: {filename}, size: {len(file_data)} bytes") + logger.debug(f"Docling options: {options}") + + try: + # Parse options for ConvertManager + docling_options = self._parse_docling_options(options) + + # Get converter from ConvertManager + # This leverages the official docling-jobkit approach + converter = self.convert_manager.get_converter(**docling_options) + + # Process file using temporary file (Docling requirement) + with tempfile.NamedTemporaryFile(suffix=f"_{filename}", delete=False) as tmp: + tmp.write(file_data) + tmp.flush() + tmp_path = Path(tmp.name) + + try: + # Convert using the managed converter + result = converter.convert(tmp_path) + + # Determine output format + format_type = options.get("format", "markdown") + + # Export content based on requested format + if format_type == "markdown": + content = result.document.export_to_markdown() + elif format_type == "html": + content = result.document.export_to_html() + elif format_type == "json": + content = result.document.export_to_json() + else: + content = result.document.export_to_text() + + processing_time = time.time() - start_time + + # Extract metadata from Docling result + processed = ProcessedContent( + content=content, + metadata={ + "pages": len(result.document.pages) if hasattr(result.document, 'pages') else 0, + "tables": len(result.document.tables) if hasattr(result.document, 'tables') else 0, + "figures": len(result.document.figures) if hasattr(result.document, 'figures') else 0, + "format": format_type, + "processor": "docling_jobkit", + "processing_time_seconds": processing_time, + "content_length": len(content), + "filename": filename, + "file_size_bytes": len(file_data), + "converter_options": docling_options, + "docling_version": getattr(result, 'version', 'unknown'), + } + ) + + logger.info(f"Docling processing completed: {processed.metadata.get('pages', 0)} pages, " + f"{processed.metadata.get('tables', 0)} tables, {processing_time:.2f}s") + return processed + + finally: + # Clean up temporary file + try: + tmp_path.unlink() + except Exception as cleanup_error: + logger.warning(f"Failed to cleanup temp file {tmp_path}: {cleanup_error}") + + except Exception as e: + logger.error(f"Docling processing failed for {filename}: {str(e)}") + raise RuntimeError(f"Docling processing failed: {str(e)}") from e \ No newline at end of file diff --git a/src/llama_stack/providers/inline/file_processors/pypdf/__init__.py b/src/llama_stack/providers/inline/file_processors/pypdf/__init__.py new file mode 100644 index 000000000..5dcb981e1 --- /dev/null +++ b/src/llama_stack/providers/inline/file_processors/pypdf/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .config import PyPDFConfig +from .pypdf import PyPDFFileProcessorImpl + +__all__ = ["PyPDFConfig", "PyPDFFileProcessorImpl"] + + +async def get_adapter_impl(config: PyPDFConfig, _deps): + return PyPDFFileProcessorImpl(config) \ No newline at end of file diff --git a/src/llama_stack/providers/inline/file_processors/pypdf/config.py b/src/llama_stack/providers/inline/file_processors/pypdf/config.py new file mode 100644 index 000000000..b077a1177 --- /dev/null +++ b/src/llama_stack/providers/inline/file_processors/pypdf/config.py @@ -0,0 +1,19 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from pydantic import BaseModel, Field + + +class PyPDFConfig(BaseModel): + timeout_seconds: int = Field(default=30, ge=1, le=300, description="Processing timeout in seconds") + max_file_size_mb: int = Field(default=50, ge=1, le=500, description="Maximum file size in MB") + + @staticmethod + def sample_run_config(**kwargs): + return { + "timeout_seconds": 30, + "max_file_size_mb": 50, + } \ No newline at end of file diff --git a/src/llama_stack/providers/inline/file_processors/pypdf/pypdf.py b/src/llama_stack/providers/inline/file_processors/pypdf/pypdf.py new file mode 100644 index 000000000..495163b8e --- /dev/null +++ b/src/llama_stack/providers/inline/file_processors/pypdf/pypdf.py @@ -0,0 +1,62 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import io +import logging +import time +from typing import Any + +from llama_stack.apis.file_processors import FileProcessors, ProcessedContent +from .config import PyPDFConfig + +logger = logging.getLogger(__name__) + + +class PyPDFFileProcessorImpl(FileProcessors): + def __init__(self, config: PyPDFConfig): + self.config = config + logger.info("PyPDF processor initialized") + + async def process_file( + self, + file_data: bytes, + filename: str, + options: dict[str, Any] | None = None + ) -> ProcessedContent: + start_time = time.time() + logger.info(f"Processing PDF file: {filename}, size: {len(file_data)} bytes") + + try: + # Import here to avoid dependency issues if pypdf not installed + from pypdf import PdfReader + + # Migrate existing 3-line logic from vector_store.py + pdf_reader = PdfReader(io.BytesIO(file_data)) + text = "\n".join([page.extract_text() for page in pdf_reader.pages]) + + processing_time = time.time() - start_time + + result = ProcessedContent( + content=text, + metadata={ + "pages": len(pdf_reader.pages), + "processor": "pypdf", + "processing_time_seconds": processing_time, + "content_length": len(text), + "filename": filename, + "file_size_bytes": len(file_data) + } + ) + + logger.info(f"PyPDF processing completed: {len(pdf_reader.pages)} pages, {len(text)} chars, {processing_time:.2f}s") + return result + + except ImportError: + logger.error("PyPDF not installed. Run: pip install pypdf") + raise RuntimeError("PyPDF not installed. Run: pip install pypdf") + except Exception as e: + logger.error(f"PyPDF processing failed for {filename}: {str(e)}") + raise RuntimeError(f"PyPDF processing failed: {str(e)}") from e \ No newline at end of file diff --git a/src/llama_stack/providers/registry/file_processors.py b/src/llama_stack/providers/registry/file_processors.py new file mode 100644 index 000000000..d82d5ba85 --- /dev/null +++ b/src/llama_stack/providers/registry/file_processors.py @@ -0,0 +1,42 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from llama_stack.providers.datatypes import Api, InlineProviderSpec, RemoteProviderSpec, ProviderSpec + + +def available_providers() -> list[ProviderSpec]: + return [ + # PyPDF - Default provider for backward compatibility + InlineProviderSpec( + api=Api.file_processors, + provider_type="inline::pypdf", + pip_packages=["pypdf"], + module="llama_stack.providers.inline.file_processors.pypdf", + config_class="llama_stack.providers.inline.file_processors.pypdf.PyPDFConfig", + description="Simple PDF text extraction using PyPDF library. Default processor for backward compatibility." + ), + + # Docling with JobKit - Advanced inline processing + InlineProviderSpec( + api=Api.file_processors, + provider_type="inline::docling", + pip_packages=["docling-jobkit", "docling", "torch", "torchvision"], # Updated with docling-jobkit + module="llama_stack.providers.inline.file_processors.docling", + config_class="llama_stack.providers.inline.file_processors.docling.DoclingConfig", + description="Advanced document processing using Docling JobKit ConvertManager with table/figure extraction" + ), + + # Docling Serve - Remote processing + RemoteProviderSpec( + api=Api.file_processors, + adapter_type="docling_serve", + provider_type="remote::docling_serve", + pip_packages=["aiohttp"], + module="llama_stack.providers.remote.file_processors.docling_serve", + config_class="llama_stack.providers.remote.file_processors.docling_serve.DoclingServeConfig", + description="Remote Docling processing via Docling Serve API endpoint" + ), + ] \ No newline at end of file diff --git a/src/llama_stack/providers/remote/file_processors/docling_serve/__init__.py b/src/llama_stack/providers/remote/file_processors/docling_serve/__init__.py new file mode 100644 index 000000000..5f4cbf1bf --- /dev/null +++ b/src/llama_stack/providers/remote/file_processors/docling_serve/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .config import DoclingServeConfig +from .docling_serve import DoclingServeFileProcessorImpl + +__all__ = ["DoclingServeConfig", "DoclingServeFileProcessorImpl"] + + +async def get_adapter_impl(config: DoclingServeConfig, _deps): + return DoclingServeFileProcessorImpl(config) \ No newline at end of file diff --git a/src/llama_stack/providers/remote/file_processors/docling_serve/config.py b/src/llama_stack/providers/remote/file_processors/docling_serve/config.py new file mode 100644 index 000000000..a6bbfb9e0 --- /dev/null +++ b/src/llama_stack/providers/remote/file_processors/docling_serve/config.py @@ -0,0 +1,23 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from pydantic import BaseModel, Field + + +class DoclingServeConfig(BaseModel): + base_url: str = Field(..., description="Base URL of the Docling Serve endpoint") + api_key: str | None = Field(default=None, description="API key for authentication") + timeout_seconds: int = Field(default=120, ge=1, le=600, description="Request timeout in seconds") + max_file_size_mb: int = Field(default=100, ge=1, le=1000, description="Maximum file size in MB") + + @staticmethod + def sample_run_config(**kwargs): + return { + "base_url": "http://localhost:8080", + "api_key": None, + "timeout_seconds": 120, + "max_file_size_mb": 100, + } \ No newline at end of file diff --git a/src/llama_stack/providers/remote/file_processors/docling_serve/docling_serve.py b/src/llama_stack/providers/remote/file_processors/docling_serve/docling_serve.py new file mode 100644 index 000000000..e9adab10c --- /dev/null +++ b/src/llama_stack/providers/remote/file_processors/docling_serve/docling_serve.py @@ -0,0 +1,100 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import aiohttp +import logging +import time +from typing import Any + +from llama_stack.apis.file_processors import FileProcessors, ProcessedContent +from .config import DoclingServeConfig + +logger = logging.getLogger(__name__) + + +class DoclingServeFileProcessorImpl(FileProcessors): + def __init__(self, config: DoclingServeConfig): + self.config = config + self.base_url = config.base_url.rstrip('/') + self.api_key = config.api_key + self.timeout = config.timeout_seconds + logger.info(f"DoclingServe processor initialized with endpoint: {self.base_url}") + + async def process_file( + self, + file_data: bytes, + filename: str, + options: dict[str, Any] | None = None + ) -> ProcessedContent: + start_time = time.time() + options = options or {} + + logger.info(f"Processing file with DoclingServe: {filename}, size: {len(file_data)} bytes") + logger.debug(f"DoclingServe options: {options}") + + try: + headers = {'Content-Type': 'application/octet-stream'} + if self.api_key: + headers['Authorization'] = f'Bearer {self.api_key}' + + # Prepare request parameters + params = { + 'filename': filename, + 'output_format': options.get('format', 'markdown'), + } + + # Add other docling-specific options + if 'extract_tables' in options: + params['extract_tables'] = str(options['extract_tables']).lower() + if 'extract_figures' in options: + params['extract_figures'] = str(options['extract_figures']).lower() + if 'ocr_enabled' in options: + params['ocr_enabled'] = str(options['ocr_enabled']).lower() + + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session: + async with session.post( + f"{self.base_url}/v1/convert", + headers=headers, + params=params, + data=file_data + ) as response: + + if response.status != 200: + error_text = await response.text() + logger.error(f"DoclingServe API error: {response.status} - {error_text}") + raise RuntimeError(f"DoclingServe API error: {response.status} - {error_text}") + + result_data = await response.json() + + processing_time = time.time() - start_time + + processed = ProcessedContent( + content=result_data.get('content', ''), + metadata={ + 'pages': result_data.get('pages', 0), + 'tables': result_data.get('tables_extracted', 0), + 'figures': result_data.get('figures_extracted', 0), + 'format': result_data.get('output_format', 'markdown'), + 'processor': 'docling_serve', + 'processing_time_seconds': processing_time, + 'content_length': len(result_data.get('content', '')), + 'server_processing_time': result_data.get('server_processing_time'), + 'server_version': result_data.get('server_version'), + 'filename': filename, + 'file_size_bytes': len(file_data) + } + ) + + logger.info(f"DoclingServe processing completed: {result_data.get('pages', 0)} pages, " + f"{result_data.get('tables_extracted', 0)} tables, {processing_time:.2f}s") + return processed + + except aiohttp.ClientTimeout: + logger.error(f"DoclingServe timeout after {self.timeout}s for {filename}") + raise RuntimeError(f"DoclingServe processing timeout after {self.timeout} seconds") + except Exception as e: + logger.error(f"DoclingServe processing failed for {filename}: {str(e)}") + raise RuntimeError(f"DoclingServe processing failed: {str(e)}") from e \ No newline at end of file