Refactor codebase to implement MCP server for GPT Researcher

Replaced FastAPI app with an MCP server implementation, enhancing flexibility and modularity for research operations. Deprecated `phoenix_technologies` package, updated server logic, added utility functions, and revised dependencies in `requirements.txt`. Updated Dockerfile and README to align with the new architecture.
This commit is contained in:
ThomasTaroni 2025-04-26 17:54:43 +02:00
parent 73e929ca00
commit 44b91b9375
10 changed files with 481 additions and 281 deletions

View file

@ -22,4 +22,4 @@ COPY src/ /app/
EXPOSE 8000
# Set the default command to run the app with `uvicorn`
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["python", "server.py"]

223
README.md
View file

@ -1,175 +1,72 @@
# README for FastAPI-Based Report GPT Generation Service
## Overview
This repository contains the implementation of a **FastAPI**-based service designed to generate research reports. The service processes user-provided queries and report types, performing advanced research powered by `GPTResearcher` and responding with comprehensive results, including details, cost, context, images, and other associated metadata.
# Project Overview
## Description
This project is a server-side application built with Python that facilitates research-related operations. It provides functionalities to manage researchers, handle resources, process queries, and generate in-depth research reports. The application features reusable utility functions to streamline responses, handle exceptions gracefully, and format data for client consumption. A `Dockerfile` is provided for easy containerization and deployment.
## Features
### Server Functionality
The main server functionalities are defined in `server.py`, which includes:
- **research_resource**: Management of research resources.
- **deep_research**: Conducts detailed research operations.
- **write_report**: Creates comprehensive reports based on researched data.
- **get_research_sources**: Retrieves information sources for research.
- **get_research_context**: Provides contextual information tied to research.
- **research_query**: Handles incoming research-related queries.
- **run_server**: Initializes and runs the server.
- **RESTful API** to handle user queries and generate reports.
- **Streaming responses** to deliver research output in chunks.
- **Secure API access** with API Key authentication.
- Completely containerized setup with Docker.
- Built with modular design for easier scalability and maintenance.
### Utility Functions
The `utils.py` file provides additional support, including:
- **Response Handling**:
- `create_error_response`
- `create_success_response`
---
- **Error & Exception Management**:
- `handle_exception`
## System Architecture
- **Data Operations**:
- `get_researcher_by_id`
- `format_sources_for_response`
- `format_context_with_sources`
- `store_research_results`
- `create_research_prompt`
### Core Components
### Docker Support
The included `Dockerfile` allows for simple containerized deployment:
- Uses a lightweight Python 3.13 image.
- Installs required dependencies from `requirements.txt`.
- Configures the application to run via `server.py` on port `8000` using `CMD ["python", "server.py"]`.
1. **FastAPI App (`main.py`)**:
- Hosts the API endpoints.
- Handles API Key authentication for secure use.
- Accepts user inputs (query and report type) and generates a chunked streaming response.
## Setup and Usage
### Prerequisites
- Python 3.13 or later.
- `pip` for dependency management.
- Docker (optional, for containerized deployment).
2. **Research Logic (`deepresearch.py`)**:
- Encapsulates research and report generation.
- Utilizes `GPTResearcher` to conduct research, generate reports, and retrieve extended data like images, contexts, or costs.
3. **Docker Integration**:
- The application is containerized with a well-defined `Dockerfile`.
- Includes dependency installation, environment setup, and FastAPI server configuration for rapid deployment.
---
## Prerequisites
Before running the application, ensure the following are installed on your system:
- **Docker**: Version 24.0+
- **Python**: Version 3.13+
- **pip**: Pre-installed Python package manager.
---
## Running the Application Locally
### Cloning the Repository
Clone the repository to a directory of your choice:
```shell script
git clone https://git.kvant.cloud/phoenix/gpt-researcher.git
cd gpt-researcher
### Installation
1. Clone this repository.
2. Install dependencies:
``` bash
pip install -r requirements.txt
```
### Environment Variable Configuration
Create a `.env` file in the root of the project and define:
1. Run the application:
``` bash
python server.py
```
API_KEY=your_api_key # Replace "your_api_key" with your desired key
OPENAI_BASE_URL=
OPENAI_API_KEY=
EMBEDDING=
FAST_LLM=
SMART_LLM=
STRATEGIC_LLM=
OPENAI_API_VERSION=
SERPER_API_KEY=
RETRIEVER=serper
### Using Docker
Build and run the application as a Docker container:
1. Build the Docker image:
``` bash
docker build -t research-app .
```
### Installing Dependencies
Install the required Python modules based on the generated `requirements.txt`.
```shell script
pip install --no-cache-dir -r requirements.txt
1. Run the Docker container:
``` bash
docker run -p 8000:8000 research-app
```
### Running the App
Run the FastAPI app locally:
```shell script
uvicorn main:app --host 0.0.0.0 --port 8000
The application will be accessible at `http://localhost:8000`.
## Folder Structure
```
|-- src/
|-- server.py # Main server logic
|-- utils.py # Reusable utility functions
|-- Dockerfile # Containerization setup
|-- requirements.txt # Dependencies file
|-- README.md # Documentation (this file)
```
After running, your app will be available at `http://127.0.0.1:8000`.
---
## Using Docker for Deployment
### Building the Docker Image
Build the Docker image using the **Dockerfile** provided:
```shell script
docker build -t fastapi-report-service .
```
### Running the Docker Container
Spin up a container and map FastAPI's default port, `8000`:
```shell script
docker run --env-file .env -p 8000:8000 fastapi-report-service
```
---
## API Usage
### 1. **`/get_report`**
- **Method**: `POST`
- **Description**: Generates a report based on user input.
- **Headers**:
- `X-API-KEY`: API Key for authentication.
- **Request Body** (`JSON`):
```json
{
"query": "Research on AI in healthcare",
"report_type": "research_report|resource_report|outline_report|custom_report|detailed_report|subtopic_report|deep"
}
```
- **Streaming Response**: Research and report are provided in chunks.
---
## Code Structure
```
├── Dockerfile # Configuration for Dockerizing the application
├── requirements.txt # Python dependencies list
├── main.py # FastAPI server entry point
├── deepresearch.py # Research-related logic and GPTResearcher integration
└── src/ # Other project files and assets
```
---
## Features Under the Hood
1. **Authentication**:
- An API key mechanism ensures that only authorized users can access endpoints.
2. **Streaming Response**:
- Large research reports are sent incrementally using `StreamingResponse` for better experience and efficiency.
3. **Modular Research Logic**:
- Research and generation tasks are handled by a dedicated class (`ReportGenerator`), making the application extensible.
---
## Future Enhancements
- **Asynchronous Enhancements**:
- Improve async handling for long-running queries.
- **Database Integration**:
- Save request history for auditing and reference purposes.
- **Web Interface**:
- A user-friendly web application for interacting with the API.
---
## Contributing
Contributions are welcome! Feel free to fork the repository, make updates, and submit a pull request.

View file

@ -1,5 +1,12 @@
fastapi
uvicorn
pydantic
gpt-researcher
asyncio
# GPT Researcher dependencies
gpt-researcher>=0.12.16
python-dotenv
# MCP dependencies
mcp>=1.6.0
fastapi>=0.103.1
uvicorn>=0.23.2
pydantic>=2.3.0
# Utility dependencies
loguru>=0.7.0

View file

@ -0,0 +1,8 @@
"""
GPT Researcher MCP Server
This module provides an MCP server implementation for GPT Researcher,
allowing AI assistants to perform web research and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -1,55 +0,0 @@
import uvicorn
from fastapi import FastAPI, HTTPException, Request, Depends
from pydantic import BaseModel
from phoenix_technologies import ReportGenerator, CustomLogsHandler
from fastapi.responses import StreamingResponse
from typing import Dict, Any, AsyncGenerator, Coroutine, Generator
import os
import asyncio
import time
# FastAPI app instance
app = FastAPI()
# Define a request body structure using Pydantic
class ReportRequest(BaseModel):
query: str
report_type: str
# Shared log array using asyncio.Queue
log_queue = asyncio.Queue()
# Define a dependency to validate the API Key
def verify_api_key(request: Request):
# Define the API key from the environment variables
expected_api_key = os.getenv("API_KEY", None)
if not expected_api_key:
raise HTTPException(
status_code=500, detail="API key is not configured on the server."
)
# Get the API key from the request headers
provided_api_key = request.headers.get("X-API-KEY", None)
# Check if the API key is correct
if not provided_api_key or provided_api_key != expected_api_key:
raise HTTPException(status_code=403, detail="Invalid or missing API key.")
@app.post("/get_report", dependencies=[Depends(verify_api_key)])
async def get_report_endpoint(request: ReportRequest):
"""
Expose the `get_report` function as a POST API endpoint, with a streaming response.
"""
def fake_data_streamer():
for i in range(5):
yield f"My custom Log: {i}"
time.sleep(5)
# Return streaming response
return StreamingResponse(fake_data_streamer(), media_type="text/plain")
if __name__ == "__main__":
uvicorn.run(app='main:app', host="127.0.0.1", port=8000)

View file

@ -1,4 +0,0 @@
# phoenix-technologies/__init__.py
from .gptresearch.deepresearch import ReportGenerator, CustomLogsHandler
__all__ = ["ReportGenerator", "CustomLogsHandler"]

View file

@ -1,53 +0,0 @@
from gpt_researcher import GPTResearcher
from typing import Dict, Any, AsyncGenerator, Coroutine
class CustomLogsHandler:
"""A custom Logs handler class to handle JSON data."""
def __init__(self):
self.logs = [] # Initialize logs to store data
async def send_json(self, data: Dict[str, Any]) -> None:
"""Send JSON data and log it."""
self.logs.append(data) # Append data to logs
print(f"My custom Log: {data}") # For demonstration, print the log
class ReportGenerator:
def __init__(self, query: str, report_type: str):
"""
Initializes the ReportGenerator with a query and report type.
"""
self.query = query
self.report_type = report_type
# Initialize researcher with a custom WebSocket
self.custom_logs_handler = CustomLogsHandler()
self.complete = False
self.researcher = GPTResearcher(query, report_type, websocket=self.custom_logs_handler)
def init(self) -> CustomLogsHandler:
return self.custom_logs_handler
async def generate_report(self) -> None:
"""
Conducts research and generates the report along with additional information.
"""
# Conduct research
research_result = await self.researcher.conduct_research()
report = await self.researcher.write_report()
# Retrieve additional information
research_context = self.researcher.get_research_context()
research_costs = self.researcher.get_costs()
research_images = self.researcher.get_research_images()
research_sources = self.researcher.get_research_sources()
self.complete = True
def get_query_details(self):
"""
Returns details of the query and report type.
"""
return {
"query": self.query,
"report_type": self.report_type
}

261
src/server.py Normal file
View file

@ -0,0 +1,261 @@
"""
GPT Researcher MCP Server
This script implements an MCP server for GPT Researcher, allowing AI assistants
to conduct web research and generate reports via the MCP protocol.
"""
import os
import sys
import uuid
import logging
from typing import Dict, Any, Optional
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from gpt_researcher import GPTResearcher
# Load environment variables
load_dotenv()
from utils import (
research_store,
create_success_response,
handle_exception,
get_researcher_by_id,
format_sources_for_response,
format_context_with_sources,
store_research_results,
create_research_prompt
)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s][%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("GPT Researcher")
# Initialize researchers dictionary
if not hasattr(mcp, "researchers"):
mcp.researchers = {}
@mcp.resource("research://{topic}")
async def research_resource(topic: str) -> str:
"""
Provide research context for a given topic directly as a resource.
This allows LLMs to access web-sourced information without explicit function calls.
Args:
topic: The research topic or query
Returns:
String containing the research context with source information
"""
# Check if we've already researched this topic
if topic in research_store:
logger.info(f"Returning cached research for topic: {topic}")
return research_store[topic]["context"]
# If not, conduct the research
logger.info(f"Conducting new research for resource on topic: {topic}")
# Initialize GPT Researcher
researcher = GPTResearcher(topic)
try:
# Conduct the research
await researcher.conduct_research()
# Get the context and sources
context = researcher.get_research_context()
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
# Format with sources included
formatted_context = format_context_with_sources(topic, context, sources)
# Store for future use
store_research_results(topic, context, sources, source_urls, formatted_context)
return formatted_context
except Exception as e:
return f"Error conducting research on '{topic}': {str(e)}"
@mcp.tool()
async def deep_research(query: str) -> Dict[str, Any]:
"""
Conduct a deep web research on a given query using GPT Researcher.
Use this tool when you need time-sensitive, real-time information like stock prices, news, people, specific knowledge, etc.
You must include citations that back your responses when using this tool.
Args:
query: The research query or topic
Returns:
Dict containing research status, ID, and the actual research context and sources
that can be used directly by LLMs for context enrichment
"""
logger.info(f"Conducting research on query: {query}...")
# Generate a unique ID for this research session
research_id = str(uuid.uuid4())
# Initialize GPT Researcher
researcher = GPTResearcher(query)
# Start research
try:
await researcher.conduct_research()
mcp.researchers[research_id] = researcher
logger.info(f"Research completed for ID: {research_id}")
# Get the research context and sources
context = researcher.get_research_context()
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
# Store in the research store for the resource API
store_research_results(query, context, sources, source_urls)
return create_success_response({
"research_id": research_id,
"query": query,
"source_count": len(sources),
"context": context,
"sources": format_sources_for_response(sources),
"source_urls": source_urls
})
except Exception as e:
return handle_exception(e, "Research")
@mcp.tool()
async def write_report(research_id: str, custom_prompt: Optional[str] = None) -> Dict[str, Any]:
"""
Generate a report based on previously conducted research.
Args:
research_id: The ID of the research session from conduct_research
custom_prompt: Optional custom prompt for report generation
Returns:
Dict containing the report content and metadata
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
return error
logger.info(f"Generating report for research ID: {research_id}")
try:
# Generate report
report = await researcher.write_report(custom_prompt=custom_prompt)
# Get additional information
sources = researcher.get_research_sources()
costs = researcher.get_costs()
return create_success_response({
"report": report,
"source_count": len(sources),
"costs": costs
})
except Exception as e:
return handle_exception(e, "Report generation")
@mcp.tool()
async def get_research_sources(research_id: str) -> Dict[str, Any]:
"""
Get the sources used in the research.
Args:
research_id: The ID of the research session
Returns:
Dict containing the research sources
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
return error
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
return create_success_response({
"sources": format_sources_for_response(sources),
"source_urls": source_urls
})
@mcp.tool()
async def get_research_context(research_id: str) -> Dict[str, Any]:
"""
Get the full context of the research.
Args:
research_id: The ID of the research session
Returns:
Dict containing the research context
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
return error
context = researcher.get_research_context()
return create_success_response({
"context": context
})
@mcp.prompt()
def research_query(topic: str, goal: str, report_format: str = "research_report") -> str:
"""
Create a research query prompt for GPT Researcher.
Args:
topic: The topic to research
goal: The goal or specific question to answer
report_format: The format of the report to generate
Returns:
A formatted prompt for research
"""
return create_research_prompt(topic, goal, report_format)
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Check if API keys are set
if not os.getenv("OPENAI_API_KEY"):
logger.error("OPENAI_API_KEY not found. Please set it in your .env file.")
return
# Add startup message
logger.info("Starting GPT Researcher MCP Server...")
print("🚀 GPT Researcher MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
# Note: If we reach here, the server has stopped
logger.info("MCP Server has stopped")
except Exception as e:
logger.error(f"Error running MCP server: {str(e)}")
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()

139
src/utils.py Normal file
View file

@ -0,0 +1,139 @@
"""
GPT Researcher MCP Server Utilities
This module provides utility functions and helpers for the GPT Researcher MCP Server.
"""
import sys
from typing import Dict, List, Optional, Tuple, Any
from loguru import logger
# Configure logging for console only (no file logging)
logger.configure(handlers=[{"sink": sys.stderr, "level": "INFO"}])
# Research store to track ongoing research topics and contexts
research_store = {}
# API Response Utilities
def create_error_response(message: str) -> Dict[str, Any]:
"""Create a standardized error response"""
return {"status": "error", "message": message}
def create_success_response(data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a standardized success response"""
return {"status": "success", **data}
def handle_exception(e: Exception, operation: str) -> Dict[str, Any]:
"""Handle exceptions in a consistent way"""
error_message = str(e)
logger.error(f"{operation} failed: {error_message}")
return create_error_response(error_message)
def get_researcher_by_id(researchers_dict: Dict, research_id: str) -> Tuple[bool, Any, Dict[str, Any]]:
"""
Helper function to retrieve a researcher by ID.
Args:
researchers_dict: Dictionary of research objects
research_id: The ID of the research session
Returns:
Tuple containing (success, researcher_object, error_response)
"""
if not researchers_dict or research_id not in researchers_dict:
return False, None, create_error_response("Research ID not found. Please conduct research first.")
return True, researchers_dict[research_id], {}
def format_sources_for_response(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Format source information for API responses.
Args:
sources: List of source dictionaries
Returns:
Formatted source list for API responses
"""
return [
{
"title": source.get("title", "Unknown"),
"url": source.get("url", ""),
"content_length": len(source.get("content", ""))
}
for source in sources
]
def format_context_with_sources(topic: str, context: str, sources: List[Dict[str, Any]]) -> str:
"""
Format research context with sources for display.
Args:
topic: Research topic
context: Research context
sources: List of sources
Returns:
Formatted context string with sources
"""
formatted_context = f"## Research: {topic}\n\n{context}\n\n"
formatted_context += "## Sources:\n"
for i, source in enumerate(sources):
formatted_context += f"{i+1}. {source.get('title', 'Unknown')}: {source.get('url', '')}\n"
return formatted_context
def store_research_results(topic: str, context: str, sources: List[Dict[str, Any]],
source_urls: List[str], formatted_context: Optional[str] = None):
"""
Store research results in the research store.
Args:
topic: Research topic
context: Research context
sources: List of sources
source_urls: List of source URLs
formatted_context: Optional pre-formatted context
"""
research_store[topic] = {
"context": formatted_context or context,
"sources": sources,
"source_urls": source_urls
}
def create_research_prompt(topic: str, goal: str, report_format: str = "research_report") -> str:
"""
Create a research query prompt for GPT Researcher.
Args:
topic: The topic to research
goal: The goal or specific question to answer
report_format: The format of the report to generate
Returns:
A formatted prompt for research
"""
return f"""
Please research the following topic: {topic}
Goal: {goal}
You have two methods to access web-sourced information:
1. Use the "research://{topic}" resource to directly access context about this topic if it exists
or if you want to get straight to the information without tracking a research ID.
2. Use the conduct_research tool to perform new research and get a research_id for later use.
This tool also returns the context directly in its response, which you can use immediately.
After getting context, you can:
- Use it directly in your response
- Use the write_report tool with a custom prompt to generate a structured {report_format}
You can also use get_research_sources to view additional details about the information sources.
"""