Compare commits

...
Sign in to create a new pull request.

55 commits

Author SHA1 Message Date
db19c4f4f8 Add Logs 2025-07-24 10:05:48 +02:00
cf88959576 Update query parameters and response handling in server.py
Replaced hardcoded date values with environment variables for flexibility and added a new filter for newspapers. Enhanced response parsing by including facets for related persons and organizations.
2025-07-03 23:33:12 +02:00
e5a6c59f7a Add execution time logging to smd_detail_article function
Introduced performance tracking in the smd_detail_article function by adding execution time logging. This helps in monitoring and optimizing the function’s runtime during API requests and processing.
2025-06-21 22:22:21 +02:00
bbec6df3fd Refactor response handling to parse JSON content.
Replace raw response text retrieval with JSON parsing to extract specific content. This ensures more precise data handling and aligns with expected response structure.
2025-06-21 22:19:50 +02:00
4c554b1472 Fix incorrect API key header in summarization function
The "Bearer" prefix was mistakenly included in the API key header. This change removes it to align with the expected format required by the external service.
2025-06-21 21:57:17 +02:00
cfc124915f Fix incorrect string formatting for model payload key
Replaced curly brackets with direct reference to the environment variable in the payload model value. This ensures the correct model is passed dynamically from environment configurations.
2025-06-21 21:54:29 +02:00
5799319ade Update response handling and add summarized content return
Modified the response handling to return raw text instead of JSON in one case and updated another to include summarized content with article ID. These changes ensure consistency and enhance the response structure for better client integration.
2025-06-21 21:47:13 +02:00
0d532d147b Simplify server response handling by returning raw data.
Replaced the summarized content processing with direct data return. This improves efficiency by skipping the summarization step and ensures the full response data is available to the caller for further use.
2025-06-21 21:43:51 +02:00
6f9f74dae0 Refactor summarize_to_words to accept text and title.
Updated the `summarize_to_words` function to take `text` and `title` as separate parameters instead of a single `article` dictionary. Adjusted payload and function calls accordingly for better clarity and flexibility.
2025-06-21 21:35:54 +02:00
1d5aeb1644 Add article summarization via external API integration
Introduced a new `summarize_to_words` function to summarize articles using an external API. Integrated it into `smd_detail_article` to return summarized article content instead of the full text. Updated header key capitalization for consistency.
2025-06-21 21:18:32 +02:00
3c1f8a2c25 Optimize query formulation instructions in docstring
Clarified that the SMD search query should be written in German and enriched with relevant contextual keywords. This ensures better alignment with language requirements and improves search accuracy.
2025-06-21 20:52:00 +02:00
25000459cb Set default page size from environment variable
Replaced the hardcoded page size with a value fetched from the `SWISSDOX_PAGESIZE` environment variable, defaulting to 10 if not set. This allows greater flexibility and configurability for pagination settings.
2025-06-21 20:39:22 +02:00
87d4884eea Switch HTTP method from GET to POST in server request.
Updated the SMD server client to use POST instead of GET for sending requests. This change ensures the payload is sent as JSON in compliance with the server's expected input format.
2025-06-21 20:25:52 +02:00
8eda903185 Switch to aiohttp for async HTTP requests in server logic
Replaced synchronous requests with aiohttp to enable non-blocking, async HTTP requests. Updated `smd_detail_article` and `smd_research` to utilize asyncio for parallel execution, significantly improving efficiency for handling multiple API calls. Added aiohttp and asyncio dependencies to requirements.txt.
2025-06-21 20:17:59 +02:00
1554d5e4ab Update article query logic and improve response structure
Removed the unused `smd_detail_article` decorator and expanded the response structure to include detailed article data in nested JSON. Adjusted the pagination default `pageSize` from 10 to 25 to handle larger datasets effectively.
2025-06-21 20:06:17 +02:00
d779b5bd98 Update SMD server to fix API URL and clarify docstring
Revised the API endpoint URL from smd.ch to swissdox.ch for accuracy and updated the function docstring to clarify the purpose of the article_id parameter. These changes improve clarity and ensure API requests are routed correctly.
2025-06-21 19:50:27 +02:00
4793aca10e Update docstring for smd_detail_article function
Clarify the usage instructions and context for the smd_detail_article function. This ensures better understanding of its relationship with the smd_research tool.
2025-06-21 19:35:33 +02:00
a36990856c Update default onlyResults parameter to False
The `onlyResults` parameter was changed from `True` to `False` in the API request configuration. This ensures more comprehensive data retrieval by including additional metadata in the API response.
2025-06-21 19:31:05 +02:00
bf695bef09 Refactor SMD utilities and streamline research query usage
Remove unnecessary SMD utilities and shift query creation logic directly into `smd_research` for simplicity. This eliminates duplicate functionality and reduces reliance on external utility files, consolidating responsibilities within server implementation.
2025-06-21 19:14:11 +02:00
60c441c817 Add GPT Researcher MCP server implementation
Introduce an MCP server for GPT Researcher using FastMCP, enabling AI assistants to perform web research and generate reports via SSE. Includes async processing steps with real-time updates and a runnable server script.
2025-06-01 13:31:42 +02:00
895671189e Refactor API methods to use AsyncGenerator for SSE.
Updated `write_report` and `get_research_sources` to stream results via server-sent events (SSE) using `AsyncGenerator`. This allows incremental data delivery and better error reporting, improving client-server communication.
2025-06-01 13:00:41 +02:00
1b60eb0ae6 Refactor quick_search to support SSE response streaming.
Updated `quick_search` to return an `AsyncGenerator` for streaming server-sent events (SSE) during quick research execution. Enhanced error handling to provide detailed feedback and upgraded `gpt-researcher` dependency to version 0.13.3 for compatibility.
2025-06-01 12:56:34 +02:00
d66bb1cd7a Standardize SSE event types to "message"
Replaced various custom SSE event types like "tool_update", "tool_result", and "tool_error" with the unified "message" event type. This simplifies event handling logic and ensures consistency across all server-sent event flows.
2025-06-01 12:19:21 +02:00
cda44df1a0 Refactor deep_research to support SSE-based streaming.
Updated the `deep_research` function to use asynchronous generators and send Server-Sent Events (SSE) for real-time streaming updates. Added a new utility for formatting SSE events and improved research lifecycle visibility with intermediate progress steps and error handling.
2025-06-01 11:46:11 +02:00
11bfff7ff7 Set FastMCP server to listen on all interfaces with a config.
Updated the FastMCP initialization to explicitly bind to 0.0.0.0, set the port to 8000, and specify a keep-alive timeout of 720 seconds. This ensures the server is properly configured for broader accessibility and stability.
2025-05-31 23:59:42 +02:00
b1ad64cd75 Introduce research resource API and improve research caching
Add a `research://{topic}` resource endpoint for direct access to research context, reducing redundant API calls. Introduced `research_store` for caching research results and modularized helper methods like `store_research_results` and `format_context_with_sources` for better reusability and clarity. Refactored existing researcher initialization for simplicity and improved comments to clarify intended usage.
2025-05-31 23:54:51 +02:00
ba48f44321 Set keep-alive timeout for FastMCP servers to 720 seconds
Added the `timeout_keep_alive` parameter to FastMCP initialization in both GPT Researcher and SMD Researcher servers. This ensures connections remain active for longer, improving server reliability for long-lived client interactions.
2025-05-31 22:29:01 +02:00
12503a4453 Set FastMCP server to listen on all interfaces with port 8000
The `host` and `port` parameters were added to FastMCP initialization for both GPT Researcher and SMD servers. This ensures the servers are accessible on all network interfaces at port 8000.
2025-05-31 21:16:40 +02:00
a593cdcae8 Update APP_ENTRYPOINT in Dockerfile to new path
Changed the APP_ENTRYPOINT environment variable to reflect the updated directory structure. This ensures the application points to the correct script path during execution.
2025-05-31 20:28:10 +02:00
dc93de9b18 Use shell form for CMD in Dockerfile
Switched CMD to use shell form instead of exec form for running the application. This simplifies the command syntax and ensures compatibility in environments where exec form parsing may cause issues.
2025-05-31 20:06:23 +02:00
3b75d04f32 Add SMD MCP Server implementation and utilities
Implemented the SMD MCP Server with functionality for AI-driven research and article retrieval via the MCP protocol. Added utilities for prompt creation and enhanced dependency configuration. This serves as a foundation for handling SMD-related queries and data processing.
2025-05-31 17:08:46 +02:00
47c036a973 Remove unused research resource and related utilities
Eliminated the `research://{topic}` resource API, associated utilities, and the `research_store`. These components were redundant due to existing alternatives using the `conduct_research` tool. This cleanup reduces complexity and improves maintainability.
2025-05-03 11:10:56 +02:00
eec1b34517 Refactor module structure and improve Docker configuration
Reorganized module directories under `phoenix_technologies` for better namespace clarity and maintainability. Updated the Dockerfile to use an environment variable for the application entry point, enhancing flexibility in deployment. Additionally, revamped the README to reflect the new structure and provide clearer project documentation.
2025-05-03 11:03:15 +02:00
6bab336883 Update function docstrings for clarity and focus
Simplified the descriptions in `deep_research` and `quick_search` docstrings to improve readability and eliminate redundant details. This ensures concise and focused explanations of each function's purpose.
2025-04-26 19:48:06 +02:00
51eecd2830 Add CustomLogsHandler for JSON logging in GPTResearcher
Introduced a CustomLogsHandler class to handle and log JSON data during research or search operations. Updated GPTResearcher initialization to include the CustomLogsHandler for improved logging and debugging.
2025-04-26 19:28:43 +02:00
0892ecdc8e Use "sse" mode in MCP run method
Updated the `mcp.run` call to include the "sse" mode, ensuring proper handling of server events. This change improves the server's event loop management and prepares it for SSE-specific operations.
2025-04-26 18:48:51 +02:00
7f7a6083f8 Add quick_search function and remove citation note in deep_research
Introduced the quick_search function for faster, snippet-based searches and adjusted mcp.run to remove the "sse" argument. Additionally, modified the deep_research docstring to remove the citation requirement note for simpler usage documentation.
2025-04-26 18:46:11 +02:00
7e5a6db0f6 Set research type dynamically via environment variable
Added `RESEARCH_TYPE` environment variable to configure the default research type for GPTResearcher. Updated the initialization of GPTResearcher to use this dynamic configuration, improving flexibility and adaptability.
2025-04-26 18:14:41 +02:00
626345b65e Add report_type parameter to GPTResearcher initialization
This update enables specifying the report type during researcher initialization. It introduces "deep" as the default report type, enhancing flexibility for future use cases.
2025-04-26 18:11:21 +02:00
44b91b9375 Refactor codebase to implement MCP server for GPT Researcher
Replaced FastAPI app with an MCP server implementation, enhancing flexibility and modularity for research operations. Deprecated `phoenix_technologies` package, updated server logic, added utility functions, and revised dependencies in `requirements.txt`. Updated Dockerfile and README to align with the new architecture.
2025-04-26 17:54:43 +02:00
73e929ca00 Refactor report generation with simplified log streaming
Replaced custom log handler and async report generation logic with a simplified fake data streamer for the StreamingResponse. Added uvicorn server startup code for direct script execution.
2025-04-26 08:38:02 +02:00
e2b4ba5a7d Refactor log streaming and report generation logic
Simplified the logic for log streaming by consolidating into a single async generator (`log_stream`). Removed redundant tasks and streamlined report generation to improve code readability and maintainability.
2025-04-25 22:11:19 +02:00
1472e277bc Refactor report generation with async streaming and logging
Enhanced report generation by integrating `CustomLogsHandler` and separating tasks for generating reports and streaming logs. Replaced the previous implementation with a unified async generator to handle concurrent execution and improve error handling. Updated module exports to include the new `CustomLogsHandler` component.
2025-04-25 22:07:25 +02:00
6ee47f02c0 Fix incorrect attribute access for generator completion check
Replaced `generator.researcher.complete` with the correct `generator.complete`. This resolves a potential logic error and ensures the completion condition is properly evaluated.
2025-04-25 19:57:01 +02:00
79be94afd2 Refactor generator completion checks for consistency.
Replaced `.is_complete()` method calls with direct `.complete` attribute access to streamline the code. Removed the redundant `is_complete()` method from `deepresearch.py` to reduce unnecessary indirection. This simplifies the logic and improves readability.
2025-04-25 19:53:59 +02:00
b16305e369 Fix missing parentheses in is_complete method call
The is_complete method was incorrectly referenced without parentheses, causing a potential runtime error. This fix ensures proper method invocation and prevents the loop from behaving incorrectly.
2025-04-25 19:51:38 +02:00
cb5fe35d24 Refactor report generation flow to include progress streaming
Modified the report generation process to stream logs incrementally during execution. Introduced new methods `init` and `is_complete` to enhance modularity and track completion status. Adjusted generator logic to better manage and yield logs in real-time.
2025-04-25 19:47:34 +02:00
ae4e81906e Add progress messages to report generation process
This change introduces status updates to indicate the start and successful completion of the report generation. These progress messages improve user feedback during the asynchronous operation.
2025-04-25 19:26:18 +02:00
ffeb2527b3 Refactor log handling in deepresearch module
Removed the unnecessary `logs` initialization and streamlined the `CustomLogsHandler` setup. This simplifies the code and avoids redundant log storage.
2025-04-25 18:58:40 +02:00
936eb6f394 Refactor report generation and logging handlers.
Introduced `CustomLogsHandler` to streamline log management and updated `ReportGenerator` to simplify report generation output. Removed unused asynchronous iterator methods, improving code clarity and reducing complexity.
2025-04-25 18:56:41 +02:00
1dbf774d55 Refactor async methods to adjust return types.
Changed `send_json` to return `None` instead of yielding and updated log retrieval to use `yield` in place of `return`. These modifications align function behavior with intended asynchronous use cases and improve consistency.
2025-04-25 18:46:48 +02:00
327758f00f Refactor async logging logic in report generation.
Removed unused log yielding in `main.py` to simplify the flow. Updated `send_json` in `deepresearch.py` to use an async generator for more flexible streaming of log data.
2025-04-25 18:42:40 +02:00
ade4511a87 Add report generation step to the main async process
The `generate_report` method is now explicitly called within the main process, ensuring the researcher's report generation is initiated properly. Updated its return type annotation for clarity in `deepresearch.py`.
2025-04-25 18:39:33 +02:00
14429fc6f7 Refactor logging and improve async iteration in report generation
Refactored `CustomLogsHandler` to accept an optional logs list and modified its method to append data instead of yielding. Enhanced `ReportGenerator` with proper asynchronous iterator implementation, enabling smooth log handling and retrieval. Simplified variable naming for better readability.
2025-04-25 18:35:30 +02:00
cd79fe99be Refactor ReportGenerator and integrate custom logs handler
Introduced `CustomLogsHandler` to manage log handling and WebSocket integration in `ReportGenerator`. Simplified and restructured report generation logic for improved maintainability. Removed obsolete methods and enhanced overall readability with cleaner code structure.
2025-04-25 18:20:51 +02:00
14 changed files with 928 additions and 318 deletions

View file

@ -4,6 +4,9 @@ FROM python:3.13-slim
# Set environment variable for Python
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# Use an environment variable to define the script path
ENV APP_ENTRYPOINT "/app/phoenix_technologies/gpt_researcher/server.py"
# Set working directory within the container
WORKDIR /app
@ -22,4 +25,4 @@ COPY src/ /app/
EXPOSE 8000
# Set the default command to run the app with `uvicorn`
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
CMD python "$APP_ENTRYPOINT"

248
README.md
View file

@ -1,175 +1,119 @@
# README for FastAPI-Based Report GPT Generation Service
## Overview
This repository contains the implementation of a **FastAPI**-based service designed to generate research reports. The service processes user-provided queries and report types, performing advanced research powered by `GPTResearcher` and responding with comprehensive results, including details, cost, context, images, and other associated metadata.
# GPT Researcher
## Project Description
**GPT Researcher** is a Python-based tool designed to assist researchers and knowledge seekers in generating detailed research insights efficiently and accurately. This project combines powerful machine learning-backed methodologies with intuitive features for processing and analyzing research data.
The project revolves around managing research queries, obtaining resourceful insights, and enabling the creation of detailed research outputs. It provides flexibility for both quick general searches and deep explorations of specific topics. Additionally, it comes with a robust system for managing logs, exceptions, and user-defined custom responses.
The application is containerized with Docker, making it easier to deploy in any environment. It is built to utilize modular components for research management, formatting responses, and enhancing the overall research experience.
## Features
### Key Highlights:
1. **Research Operations**
- Perform deep-dives into specific topics through a systematic research mechanism.
- Quickly search for general information in a fast and lightweight fashion (`quick_search`).
- Retrieve necessary sources for research with flexibility.
- **RESTful API** to handle user queries and generate reports.
- **Streaming responses** to deliver research output in chunks.
- **Secure API access** with API Key authentication.
- Completely containerized setup with Docker.
- Built with modular design for easier scalability and maintenance.
2. **Resource Management**
- Organize and store research results for later analysis.
- Format research sources and context to create user-friendly reports.
- Generate customizable research prompts for optimized research requests.
---
3. **Logging and Error Handling**
- Employ a custom logging handler (`CustomLogsHandler`) to manage logs effectively.
- Provide user-friendly error responses with flexible exception handling.
## System Architecture
4. **Report Generation**
- Automate the creation of research reports in reusable templates.
- Aggregate research resources and contexts while ensuring professional quality formatting.
### Core Components
5. **Web Server Support**
- Run a backend server with the `run_server` function for handling client-side operations and queries.
1. **FastAPI App (`main.py`)**:
- Hosts the API endpoints.
- Handles API Key authentication for secure use.
- Accepts user inputs (query and report type) and generates a chunked streaming response.
6. **Containerized Deployment**
- Fully Dockerized to allow for fast setup and consistent deployment across environments.
2. **Research Logic (`deepresearch.py`)**:
- Encapsulates research and report generation.
- Utilizes `GPTResearcher` to conduct research, generate reports, and retrieve extended data like images, contexts, or costs.
## Installation
To get the project up and running locally, follow these steps:
### Prerequisites
- Python 3.13+
- Docker Engine (for containerized deployments)
- `pip` for managing necessary Python packages
3. **Docker Integration**:
- The application is containerized with a well-defined `Dockerfile`.
- Includes dependency installation, environment setup, and FastAPI server configuration for rapid deployment.
---
## Prerequisites
Before running the application, ensure the following are installed on your system:
- **Docker**: Version 24.0+
- **Python**: Version 3.13+
- **pip**: Pre-installed Python package manager.
---
## Running the Application Locally
### Cloning the Repository
Clone the repository to a directory of your choice:
```shell script
git clone https://git.kvant.cloud/phoenix/gpt-researcher.git
cd gpt-researcher
### Steps:
1. **Clone the Repository**:
``` sh
git clone <repository-url>
cd gpt_researcher
```
### Environment Variable Configuration
Create a `.env` file in the root of the project and define:
1. **Install Dependencies**: If you are working outside of Docker:
``` sh
pip install -r requirements.txt
```
API_KEY=your_api_key # Replace "your_api_key" with your desired key
OPENAI_BASE_URL=
OPENAI_API_KEY=
EMBEDDING=
FAST_LLM=
SMART_LLM=
STRATEGIC_LLM=
OPENAI_API_VERSION=
SERPER_API_KEY=
RETRIEVER=serper
1. **Run the Application**:
``` sh
python src/mcp/gpt_researcher/server.py
```
### Installing Dependencies
Install the required Python modules based on the generated `requirements.txt`.
```shell script
pip install --no-cache-dir -r requirements.txt
1. **Using Docker**: Build and run the Docker container:
``` sh
docker build -t gpt-researcher .
docker run -p 8000:8000 gpt-researcher
```
### Running the App
Run the FastAPI app locally:
```shell script
uvicorn main:app --host 0.0.0.0 --port 8000
The app will be available at `http://localhost:8000`.
## File Structure
### Overview:
``` plaintext
src/
├── mcp/
│ ├── gpt_researcher/
│ │ ├── server.py # Main entrypoint for research workflow and server
│ │ └── utils.py # Utility functions for research formatting and storage
Dockerfile # Defines the container runtime environment
requirements.txt # Python dependencies for the project
```
### Key Components:
1. **`server.py` **:
- Implements the main research operations:
- `deep_research`, `quick_search`, and `write_report`.
After running, your app will be available at `http://127.0.0.1:8000`.
- Contains functions to retrieve research sources, manage contexts, and execute research prompts.
---
2. **`utils.py` **:
- Provides utilities for:
- Managing research storage.
- Formatting sources and context for readable outputs.
- Generating responses to different types of user interactions.
## Using Docker for Deployment
3. **`Dockerfile` **:
- A configuration file for containerizing the application using the official Python 3.13-slim image.
### Building the Docker Image
## Usage Instructions
### Available Endpoints
Once the server is running successfully, you can interact with the application by sending API requests to the available endpoints. Below are the core functionalities:
1. **Quick Research**:
- Quickly get general insights based on a query.
Build the Docker image using the **Dockerfile** provided:
2. **Deep Research**:
- Run comprehensive research on a specific topic.
```shell script
docker build -t fastapi-report-service .
```
3. **Custom Research Reports**:
- Combine sources and analysis to create detailed user reports.
### Running the Docker Container
Spin up a container and map FastAPI's default port, `8000`:
```shell script
docker run --env-file .env -p 8000:8000 fastapi-report-service
```
---
## API Usage
### 1. **`/get_report`**
- **Method**: `POST`
- **Description**: Generates a report based on user input.
- **Headers**:
- `X-API-KEY`: API Key for authentication.
- **Request Body** (`JSON`):
```json
{
"query": "Research on AI in healthcare",
"report_type": "research_report|resource_report|outline_report|custom_report|detailed_report|subtopic_report|deep"
}
```
- **Streaming Response**: Research and report are provided in chunks.
---
## Code Structure
```
├── Dockerfile # Configuration for Dockerizing the application
├── requirements.txt # Python dependencies list
├── main.py # FastAPI server entry point
├── deepresearch.py # Research-related logic and GPTResearcher integration
└── src/ # Other project files and assets
```
---
## Features Under the Hood
1. **Authentication**:
- An API key mechanism ensures that only authorized users can access endpoints.
2. **Streaming Response**:
- Large research reports are sent incrementally using `StreamingResponse` for better experience and efficiency.
3. **Modular Research Logic**:
- Research and generation tasks are handled by a dedicated class (`ReportGenerator`), making the application extensible.
---
## Future Enhancements
- **Asynchronous Enhancements**:
- Improve async handling for long-running queries.
- **Database Integration**:
- Save request history for auditing and reference purposes.
- **Web Interface**:
- A user-friendly web application for interacting with the API.
---
4. **Access Research Context**:
- Retrieve the context and sources behind a result for deeper understanding.
## Contributing
We follow a **code of conduct** that expects all contributors to maintain a respectful and inclusive environment.
### Contribution Guidelines:
1. Fork the repository and make your development locally.
2. Create a branch for your changes.
``` sh
git checkout -b feature-name
```
1. Make your changes and test them thoroughly.
2. Submit a pull request with a clear description of the changes.
Contributions are welcome! Feel free to fork the repository, make updates, and submit a pull request.
## Code of Conduct
As contributors and maintainers of this project, we pledge to respect all members of the community by fostering a positive and inclusive environment. Instances of abuse, harassment, or unacceptable behavior will not be tolerated.
For details, please refer to the [Code of Conduct](CODE_OF_CONDUCT.md).
## License
This project is licensed under the **MIT License**. For more information, refer to the LICENSE file in the repository.
## Feedback and Support
For any queries, suggestions, or feedback, feel free to open an issue in this repository. You can also contact the maintainers directly via email or GitHub.
Please feel free to expand or modify this README according to your project's future needs!

View file

@ -1,5 +1,15 @@
fastapi
uvicorn
pydantic
gpt-researcher
asyncio
# GPT Researcher dependencies
gpt-researcher>=0.13.3
python-dotenv~=1.1.0
# MCP dependencies
mcp>=1.6.0
fastapi>=0.103.1
uvicorn>=0.23.2
pydantic>=2.3.0
# Utility dependencies
loguru>=0.7.0
requests~=2.32.3
aiohttp~=3.11.18
asyncio~=3.4.3

View file

@ -1,47 +0,0 @@
from fastapi import FastAPI, HTTPException, Request, Depends
from pydantic import BaseModel
from phoenix_technologies import ReportGenerator
from fastapi.responses import StreamingResponse
import os
import asyncio
# FastAPI app instance
app = FastAPI()
# Define a request body structure using Pydantic
class ReportRequest(BaseModel):
query: str
report_type: str
# Define a dependency to validate the API Key
def verify_api_key(request: Request):
# Define the API key from the environment variables
expected_api_key = os.getenv("API_KEY", None)
if not expected_api_key:
raise HTTPException(status_code=500, detail="API key is not configured on the server.")
# Get the API key from the request headers
provided_api_key = request.headers.get("X-API-KEY", None)
# Check if the API key is correct
if not provided_api_key or provided_api_key != expected_api_key:
raise HTTPException(status_code=403, detail="Invalid or missing API key.")
@app.post("/get_report", dependencies=[Depends(verify_api_key)])
async def get_report_endpoint(request: ReportRequest):
"""
Expose the `get_report` function as a POST API endpoint, with a streaming response.
"""
async def generate_report():
try:
# Call the asynchronous get_report function
report_generator = ReportGenerator(request.query, request.report_type)
async for chunk in report_generator:
yield chunk
except Exception as e:
yield f"Error: {str(e)}"
# Return streaming response
return StreamingResponse(generate_report(), media_type="text/plain")

View file

@ -1,4 +0,0 @@
# phoenix-technologies/__init__.py
from .gptresearch.deepresearch import ReportGenerator
__all__ = ["ReportGenerator"]

View file

@ -0,0 +1,8 @@
"""
GPT Researcher MCP Server
This module provides an MCP server implementation for GPT Researcher,
allowing AI assistants to perform web research and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -0,0 +1,381 @@
"""
GPT Researcher MCP Server
This script implements an MCP server for GPT Researcher, allowing AI assistants
to conduct web research and generate reports via the MCP protocol.
"""
import os
import json
import uuid
import logging
from typing import Dict, Any, Optional, AsyncGenerator
import asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from gpt_researcher import GPTResearcher
# Load environment variables
load_dotenv()
from utils import (
research_store,
create_success_response,
handle_exception,
get_researcher_by_id,
format_sources_for_response,
format_context_with_sources,
store_research_results,
create_research_prompt
)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s][%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)
def format_sse_event(event_name: str, data: Dict[str, Any]) -> str:
"""Formatiert Daten als SSE Event String."""
json_data = json.dumps(data)
return f"event: {event_name}\ndata: {json_data}\n\n"
# Initialize FastMCP server
mcp = FastMCP("GPT Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
# Initialize researchers dictionary
if not hasattr(mcp, "researchers"):
mcp.researchers = {}
@mcp.resource("research://{topic}")
async def research_resource(topic: str) -> str:
"""
Provide research context for a given topic directly as a resource.
This allows LLMs to access web-sourced information without explicit function calls.
Args:
topic: The research topic or query
Returns:
String containing the research context with source information
"""
# Check if we've already researched this topic
if topic in research_store:
logger.info(f"Returning cached research for topic: {topic}")
return research_store[topic]["context"]
# If not, conduct the research
logger.info(f"Conducting new research for resource on topic: {topic}")
# Initialize GPT Researcher
researcher = GPTResearcher(topic)
try:
# Conduct the research
await researcher.conduct_research()
# Get the context and sources
context = researcher.get_research_context()
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
# Format with sources included
formatted_context = format_context_with_sources(topic, context, sources)
# Store for future use
store_research_results(topic, context, sources, source_urls, formatted_context)
return formatted_context
except Exception as e:
return f"Error conducting research on '{topic}': {str(e)}"
@mcp.tool()
async def deep_research(query: str) -> AsyncGenerator[str, None]:
"""
Conduct a web deep research on a given query using GPT Researcher.
Use this tool when you need time-sensitive, real-time information like stock prices, news, people, specific knowledge, etc.
Args:
query: The research query or topic
Returns:
Dict containing research status, ID, and the actual research context and sources
that can be used directly by LLMs for context enrichment
"""
logger.info(f"Starting streaming deep research on query: {query}...")
research_id = str(uuid.uuid4())
try:
yield format_sse_event(
"message",
{"research_id": research_id, "query": query, "status": "Research initiated"}
)
# Initialize GPT Researcher
# In neueren GPTResearcher Versionen wird der query beim Initialisieren übergeben
researcher = GPTResearcher(query=query, report_type="research_report", config_path=None)
# Alternativ, falls deine Version es anders handhabt: researcher = GPTResearcher(query)
mcp.researchers[research_id] = researcher # Speichere früh, falls benötigt
yield format_sse_event(
"message",
{"research_id": research_id, "status": "GPT Researcher initialized. Starting information gathering."}
)
await asyncio.sleep(0.1) # Kurze Pause, damit das Event sicher gesendet wird
# Simuliertes Update: Beginn der Recherche
yield format_sse_event(
"message",
{"research_id": research_id, "status": "Starting web crawling and source analysis...", "progress": 10}
)
# Der eigentliche langlaufende Prozess
await researcher.conduct_research() # Dieser Aufruf blockiert für die Dauer der Recherche
# Simuliertes Update: Recherche abgeschlossen, beginne mit der Verarbeitung
yield format_sse_event(
"message",
{"research_id": research_id, "status": "Web crawling finished. Processing and consolidating information...", "progress": 70}
)
await asyncio.sleep(0.1)
logger.info(f"Core research completed for ID: {research_id}. Fetching results...")
# Get the research context and sources
context = researcher.get_research_context()
sources = researcher.get_research_sources() # Dies sind strukturierte Source-Objekte
source_urls = researcher.get_source_urls() # Dies sind nur die URLs
# Store in the research store for the resource API (optional, je nach Logik)
# Überlege, ob formatted_context hier gebraucht wird, wenn der Client die Rohdaten bekommt
store_research_results(query, context, sources, source_urls, context) # Vereinfacht für den Store
# Finale Daten vorbereiten
final_data_payload = {
"research_id": research_id,
"query": query,
"status": "Research completed successfully",
"source_count": len(sources),
"context": context,
"sources": format_sources_for_response(sources),
"source_urls": source_urls
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {research_id}")
except Exception as e:
logger.error(f"Error during deep_research for query '{query}': {e}", exc_info=True)
# Sende ein Fehler-Event an den Client
error_payload = {
"research_id": research_id, # Kann None sein, wenn Fehler sehr früh auftritt
"query": query,
"status": "Error occurred",
"error_message": str(e),
"error_details": "Check server logs for more information."
}
yield format_sse_event("message", error_payload) # Eigener Event-Typ für Fehler
# Du könntest hier auch handle_exception verwenden, wenn es ein SSE-Event zurückgibt
# oder die Exception weiter werfen, wenn FastMCP das besser handhabt.
@mcp.tool()
async def quick_search(query: str) -> AsyncGenerator[str, None]:
"""
Perform a quick web search on a given query and return search results with snippets.
This optimizes for speed over quality and is useful when an LLM doesn't need in-depth
information on a topic.
Args:
query: The search query
Returns:
Dict containing search results and snippets
"""
logger.info(f"Performing quick search on query: {query}...")
# Generate a unique ID for this search session
search_id = str(uuid.uuid4())
# Initialize GPT Researcher
researcher = GPTResearcher(query)
try:
yield format_sse_event(
"message",
{"research_id": search_id, "query": query, "status": "Research initiated"}
)
# Perform quick search
search_results = await researcher.quick_search(query=query)
mcp.researchers[search_id] = researcher
logger.info(f"Quick search completed for ID: {search_id}")
final_data_payload = {
"search_id": search_id,
"query": query,
"result_count": len(search_results) if search_results else 0,
"search_results": search_results
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {search_id}")
except Exception as e:
logger.error(f"Error during deep_research for query '{query}': {e}", exc_info=True)
# Sende ein Fehler-Event an den Client
error_payload = {
"research_id": search_id, # Kann None sein, wenn Fehler sehr früh auftritt
"query": query,
"status": "Error occurred",
"error_message": str(e),
"error_details": "Check server logs for more information."
}
yield format_sse_event("message", error_payload)
@mcp.tool()
async def write_report(research_id: str, custom_prompt: Optional[str] = None) -> AsyncGenerator[str, None]:
"""
Generate a report based on previously conducted research.
Args:
research_id: The ID of the research session from conduct_research
custom_prompt: Optional custom prompt for report generation
Returns:
Dict containing the report content and metadata
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
yield format_sse_event("message", error)
logger.info(f"Generating report for research ID: {research_id}")
try:
# Generate report
report = await researcher.write_report(custom_prompt=custom_prompt)
# Get additional information
sources = researcher.get_research_sources()
costs = researcher.get_costs()
final_data_payload = {
"report": report,
"source_count": len(sources),
"costs": costs
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {research_id}")
except Exception as e:
logger.error(f"Error during deep_research for query: {e}", exc_info=True)
# Sende ein Fehler-Event an den Client
error_payload = {
"research_id": research_id,
"status": "Error occurred",
"error_message": str(e),
"error_details": "Check server logs for more information."
}
yield format_sse_event("message", error_payload)
@mcp.tool()
async def get_research_sources(research_id: str) -> AsyncGenerator[str, None]:
"""
Get the sources used in the research.
Args:
research_id: The ID of the research session
Returns:
Dict containing the research sources
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
yield format_sse_event("message", error)
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
final_data_payload = {
"sources": format_sources_for_response(sources),
"source_urls": source_urls
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {research_id}")
@mcp.tool()
async def get_research_context(research_id: str) -> Dict[str, Any]:
"""
Get the full context of the research.
Args:
research_id: The ID of the research session
Returns:
Dict containing the research context
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
return error
context = researcher.get_research_context()
return create_success_response({
"context": context
})
@mcp.prompt()
def research_query(topic: str, goal: str, report_format: str = "research_report") -> str:
"""
Create a research query prompt for GPT Researcher.
Args:
topic: The topic to research
goal: The goal or specific question to answer
report_format: The format of the report to generate
Returns:
A formatted prompt for research
"""
return create_research_prompt(topic, goal, report_format)
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Check if API keys are set
if not os.getenv("OPENAI_API_KEY"):
logger.error("OPENAI_API_KEY not found. Please set it in your .env file.")
return
# Add startup message
logger.info("Starting GPT Researcher MCP Server...")
print("🚀 GPT Researcher MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
# Note: If we reach here, the server has stopped
logger.info("MCP Server has stopped")
except Exception as e:
logger.error(f"Error running MCP server: {str(e)}")
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()

View file

@ -0,0 +1,139 @@
"""
GPT Researcher MCP Server Utilities
This module provides utility functions and helpers for the GPT Researcher MCP Server.
"""
import sys
from typing import Dict, List, Optional, Tuple, Any
from loguru import logger
# Configure logging for console only (no file logging)
logger.configure(handlers=[{"sink": sys.stderr, "level": "INFO"}])
# Research store to track ongoing research topics and contexts
research_store = {}
# API Response Utilities
def create_error_response(message: str) -> Dict[str, Any]:
"""Create a standardized error response"""
return {"status": "error", "message": message}
def create_success_response(data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a standardized success response"""
return {"status": "success", **data}
def handle_exception(e: Exception, operation: str) -> Dict[str, Any]:
"""Handle exceptions in a consistent way"""
error_message = str(e)
logger.error(f"{operation} failed: {error_message}")
return create_error_response(error_message)
def get_researcher_by_id(researchers_dict: Dict, research_id: str) -> Tuple[bool, Any, Dict[str, Any]]:
"""
Helper function to retrieve a researcher by ID.
Args:
researchers_dict: Dictionary of research objects
research_id: The ID of the research session
Returns:
Tuple containing (success, researcher_object, error_response)
"""
if not researchers_dict or research_id not in researchers_dict:
return False, None, create_error_response("Research ID not found. Please conduct research first.")
return True, researchers_dict[research_id], {}
def format_sources_for_response(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Format source information for API responses.
Args:
sources: List of source dictionaries
Returns:
Formatted source list for API responses
"""
return [
{
"title": source.get("title", "Unknown"),
"url": source.get("url", ""),
"content_length": len(source.get("content", ""))
}
for source in sources
]
def format_context_with_sources(topic: str, context: str, sources: List[Dict[str, Any]]) -> str:
"""
Format research context with sources for display.
Args:
topic: Research topic
context: Research context
sources: List of sources
Returns:
Formatted context string with sources
"""
formatted_context = f"## Research: {topic}\n\n{context}\n\n"
formatted_context += "## Sources:\n"
for i, source in enumerate(sources):
formatted_context += f"{i+1}. {source.get('title', 'Unknown')}: {source.get('url', '')}\n"
return formatted_context
def store_research_results(topic: str, context: str, sources: List[Dict[str, Any]],
source_urls: List[str], formatted_context: Optional[str] = None):
"""
Store research results in the research store.
Args:
topic: Research topic
context: Research context
sources: List of sources
source_urls: List of source URLs
formatted_context: Optional pre-formatted context
"""
research_store[topic] = {
"context": formatted_context or context,
"sources": sources,
"source_urls": source_urls
}
def create_research_prompt(topic: str, goal: str, report_format: str = "research_report") -> str:
"""
Create a research query prompt for GPT Researcher.
Args:
topic: The topic to research
goal: The goal or specific question to answer
report_format: The format of the report to generate
Returns:
A formatted prompt for research
"""
return f"""
Please research the following topic: {topic}
Goal: {goal}
You have two methods to access web-sourced information:
1. Use the "research://{topic}" resource to directly access context about this topic if it exists
or if you want to get straight to the information without tracking a research ID.
2. Use the conduct_research tool to perform new research and get a research_id for later use.
This tool also returns the context directly in its response, which you can use immediately.
After getting context, you can:
- Use it directly in your response
- Use the write_report tool with a custom prompt to generate a structured {report_format}
You can also use get_research_sources to view additional details about the information sources.
"""

View file

@ -1,109 +0,0 @@
from gpt_researcher import GPTResearcher
class ReportGenerator:
"""
A class to handle the generation of research-based reports.
This class integrates with GPTResearcher to conduct research, retrieve results,
and format them into consumable chunks for asynchronous streaming.
"""
def __init__(self, query: str, report_type: str):
"""
Initializes the ReportGenerator instance with a query and report type.
:param query: The main topic or question for research.
:param report_type: The type of report to generate.
"""
self.query = query
self.report_type = report_type
self.researcher = GPTResearcher(query, report_type)
self._chunks = None # Placeholder for report chunks
self._index = 0 # Iterator index for streaming
def __aiter__(self):
"""
Makes the ReportGenerator instance asynchronously iterable.
:return: Self instance for iteration.
"""
return self
async def __anext__(self):
"""
Defines the logic for asynchronous iteration over report chunks.
:return: The next chunk of the report.
:raises StopAsyncIteration: Raised when all chunks are yielded.
"""
if self._chunks is None:
# Generate report chunks on first access
self._chunks = await self._generate_report_chunks()
if self._index >= len(self._chunks):
# End iteration once all chunks are exhausted
raise StopAsyncIteration
chunk = self._chunks[self._index]
self._index += 1
return chunk
async def _generate_report_chunks(self):
"""
Conducts research using the researcher and generates the report in chunks.
:return: A list of report chunks.
"""
try:
# Asynchronous research and report generation
research_result = await self.researcher.conduct_research()
report = await self.researcher.write_report()
# Retrieve additional research details
research_context = self.researcher.get_research_context() or {}
research_costs = self.researcher.get_costs() or {}
research_images = self.researcher.get_research_images() or []
research_sources = self.researcher.get_research_sources() or []
# Construct the complete research response
full_report = {
"report": report,
"context": research_context,
"costs": research_costs,
"images": research_images,
"sources": research_sources,
}
# Generate chunks for streaming
return self._split_into_chunks(full_report)
except Exception as e:
# Handle potential errors during research and report generation
raise RuntimeError(f"Error generating report chunks: {e}")
def _split_into_chunks(self, report):
"""
Splits the report dictionary into smaller chunks for easier streaming.
:param report: A dictionary containing the full report data.
:return: A list of formatted text chunks.
"""
if not report:
raise ValueError("Cannot split an empty or None report into chunks.")
chunks = []
for key, value in report.items():
chunks.append(f"{key}: {value}")
return chunks
def get_query_details(self):
"""
Retrieves the details of the query and report type.
:return: A dictionary containing the query and report type.
"""
return {
"query": self.query,
"report_type": self.report_type
}

View file

@ -0,0 +1,8 @@
"""
SMD MCP Server
This module provides an MCP server implementation for SMD,
allowing AI assistants to perform searches and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -0,0 +1,186 @@
"""
SMD Researcher MCP Server
This script implements an MCP server for SMD Researcher, allowing AI assistants
to conduct research and generate reports via the MCP protocol.
"""
import os
import logging
import aiohttp
import asyncio
import requests
import time
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
# Load environment variables
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s][%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("SMD Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
async def summarize_to_words(text: str, title: str, target_word_count: int = 1000) -> str:
url = f"https://maas.ai-2.kvant.cloud/engines/{os.getenv('SWISSDOX_SUMMARIZING_MODEL', '')}/chat/completions"
headers = {
"x-litellm-api-key": f"{os.getenv('SWISSDOX_SUMMARIZING_MODEL_APIKEY', '')}",
"Content-type": "application/json",
}
payload = {
"model": os.getenv('SWISSDOX_SUMMARIZING_MODEL', ''),
"messages": [
{
"role": "text summarizer",
"content": f"You are summarizing the user input to a maximum of {target_word_count}"
},
{
"role": "user",
"content": f"{title} - {text}"
}
]
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 200:
data = await response.json()
return data.get("choices")[0].get("message").get("content")
else:
return await response.text()
async def smd_detail_article(article_id):
logger.info("Starting smd_detail_article function.")
start_time = time.perf_counter()
url = f"https://api.swissdox.ch/api/documents/{article_id}"
headers = {
"Authorization": f"Bearer {os.getenv('SWISSDOX_BEARER_TOKEN', '')}",
"Content-type": "application/json",
}
payload = {"filters": [], "pagination": {"pageSize": 1, "currentPage": 1}}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 200:
data = await response.json()
summarized_content = await summarize_to_words(title=data.get("title"), text=data.get("text"), target_word_count=10000)
execution_time = time.perf_counter() - start_time
logger.info(f"smd_detail_article executed in {execution_time:.2f} seconds.")
logger.info(f"smd_article_summarization {summarized_content}")
return {
"message": summarized_content,
"article_id": article_id
}
else:
return {
"message": await response.text(),
"article_id": article_id
}
@mcp.tool()
async def smd_research(search_query: str = "Bundesrat", date_from: str = "2024-05-30T22:00:00.000Z", date_to: str = "2025-05-31T21:59:59.999Z") -> dict:
"""
Execute a deep search on a given query using SMD Researcher.
Use this tool when you need research on a topic.
Args:
search_query: The SMD search query, there are Logical Operators available (AND, OR, NOT) and for a excact match use "+" before the word. For excluding use "-" before the word. For queries with multiple words use quotes. Formulate the Query in German. Enrich the query with the relevant context keywords of the topic.
date_from: The date to start research from, in the format YYYY-MM-DDTHH:MM:SS.SSSZ
date_to: The date to end research at, in the format YYYY-MM-DDTHH:MM:SS.SSSZ
Returns:
String containing research status, ID, and the actual research context
"""
query = {
"sort": {
"field": "score",
"direction": "desc"
},
"filters": [
{
"field": "datetime",
"value": [
os.getenv('SWISSDOX_DATEFROM', '2020-12-31T23:00:00.000Z'),
os.getenv('SWISSDOX_DATETO', '2023-12-31T22:59:00.000Z')
]
},
{
"field": "newspaper",
"value": [
os.getenv('SWISSDOX_NEWSPAPER', 'NZZ')
]
},
{
"field": "query_text",
"value": [search_query]
}
],
"exact": False,
"pagination": {
"pageSize": os.getenv('SWISSDOX_PAGESIZE', '10'),
"currentPage": 1
},
"onlyResults": False
}
url = "https://api.swissdox.ch/api/documents/search"
headers = {
"authorization": f"Bearer {os.getenv('SWISSDOX_BEARER_TOKEN', '')}",
"content-type": "application/json",
}
response = requests.post(url, headers=headers, json=query)
if response.status_code == 200:
result = response.json()
articles = result.get("data", [])
facets = result.get("facets", [])
tasks = []
for article in articles:
article_id = article.get("id")
if article_id:
tasks.append(smd_detail_article(article_id))
detailed_articles = await asyncio.gather(*tasks)
logger.info(f"detailed_articles {detailed_articles}")
return {
"related_persons": facets.get("persons", []),
"related_organizations": facets.get("persons", []),
"detailed_articles": detailed_articles
}
else:
return {
"message": response.text
}
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Add startup message
logger.info("Starting GPT Researcher MCP Server...")
print("🚀 GPT Researcher MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
# Note: If we reach here, the server has stopped
logger.info("MCP Server has stopped")
except Exception as e:
logger.error(f"Error running MCP server: {str(e)}")
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()

View file

@ -0,0 +1,8 @@
"""
GPT Researcher MCP Server
This module provides an MCP server implementation for GPT Researcher,
allowing AI assistants to perform web research and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -0,0 +1,83 @@
"""
GPT Researcher MCP Server
This script implements an MCP server for GPT Researcher, allowing AI assistants
to conduct web research and generate reports via the MCP protocol.
"""
import os
import json
from typing import Dict, Any, AsyncGenerator
import asyncio
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("GPT Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
@mcp.tool()
async def updates(tool_input: Dict[str, Any]) -> AsyncGenerator[str, None]:
"""
Ein MCP-Tool, das Zwischenupdates über SSE streamt.
tool_input ist ein Dictionary, das die Eingabeparameter des Tools enthält.
"""
print(f"Tool gestartet mit Input: {tool_input}")
async def _process_step_1():
await asyncio.sleep(1)
return {"status": "Step 1 complete", "details": "Initial processing done."}
async def _process_step_2():
await asyncio.sleep(2)
return {"status": "Step 2 in progress", "progress": 50}
async def _process_step_3():
await asyncio.sleep(1)
return {"status": "Step 2 complete", "progress": 100, "intermediate_result": "Partial data available"}
async def _generate_final_output():
await asyncio.sleep(1)
return {"final_data": "All steps finished successfully.", "summary": tool_input.get("summary_needed", False)}
# Schritt 1
update1 = await _process_step_1()
yield f"event: tool_update\ndata: {json.dumps(update1)}\n\n"
print("Update 1 gesendet")
# Schritt 2
update2 = await _process_step_2()
yield f"event: tool_update\ndata: {json.dumps(update2)}\n\n"
print("Update 2 gesendet")
# Schritt 3
update3 = await _process_step_3()
yield f"event: tool_update\ndata: {json.dumps(update3)}\n\n"
print("Update 3 gesendet")
# Finale Ausgabe (kann auch als spezielles Event gesendet werden)
final_output = await _generate_final_output()
yield f"event: tool_result\ndata: {json.dumps(final_output)}\n\n"
print("Finales Ergebnis gesendet")
# Optional: Signal für Stream-Ende, falls vom Client benötigt oder von FastMCP erwartet
# yield "event: stream_end\ndata: {}\n\n"
print("Tool-Ausführung beendet.")
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Add startup message
print("🚀 Test MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
except Exception as e:
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()