Compare commits

..

1 commit

Author SHA1 Message Date
af4d96f7fc chore(deps): add .github/renovate.json5 2025-04-25 08:23:15 +00:00
15 changed files with 324 additions and 928 deletions

6
.github/renovate.json5 vendored Normal file
View file

@ -0,0 +1,6 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"local>phoenix/renovate-config"
]
}

View file

@ -4,9 +4,6 @@ FROM python:3.13-slim
# Set environment variable for Python
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# Use an environment variable to define the script path
ENV APP_ENTRYPOINT "/app/phoenix_technologies/gpt_researcher/server.py"
# Set working directory within the container
WORKDIR /app
@ -25,4 +22,4 @@ COPY src/ /app/
EXPOSE 8000
# Set the default command to run the app with `uvicorn`
CMD python "$APP_ENTRYPOINT"
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

248
README.md
View file

@ -1,119 +1,175 @@
# GPT Researcher
## Project Description
**GPT Researcher** is a Python-based tool designed to assist researchers and knowledge seekers in generating detailed research insights efficiently and accurately. This project combines powerful machine learning-backed methodologies with intuitive features for processing and analyzing research data.
The project revolves around managing research queries, obtaining resourceful insights, and enabling the creation of detailed research outputs. It provides flexibility for both quick general searches and deep explorations of specific topics. Additionally, it comes with a robust system for managing logs, exceptions, and user-defined custom responses.
The application is containerized with Docker, making it easier to deploy in any environment. It is built to utilize modular components for research management, formatting responses, and enhancing the overall research experience.
# README for FastAPI-Based Report GPT Generation Service
## Overview
This repository contains the implementation of a **FastAPI**-based service designed to generate research reports. The service processes user-provided queries and report types, performing advanced research powered by `GPTResearcher` and responding with comprehensive results, including details, cost, context, images, and other associated metadata.
## Features
### Key Highlights:
1. **Research Operations**
- Perform deep-dives into specific topics through a systematic research mechanism.
- Quickly search for general information in a fast and lightweight fashion (`quick_search`).
- Retrieve necessary sources for research with flexibility.
2. **Resource Management**
- Organize and store research results for later analysis.
- Format research sources and context to create user-friendly reports.
- Generate customizable research prompts for optimized research requests.
- **RESTful API** to handle user queries and generate reports.
- **Streaming responses** to deliver research output in chunks.
- **Secure API access** with API Key authentication.
- Completely containerized setup with Docker.
- Built with modular design for easier scalability and maintenance.
3. **Logging and Error Handling**
- Employ a custom logging handler (`CustomLogsHandler`) to manage logs effectively.
- Provide user-friendly error responses with flexible exception handling.
---
4. **Report Generation**
- Automate the creation of research reports in reusable templates.
- Aggregate research resources and contexts while ensuring professional quality formatting.
## System Architecture
5. **Web Server Support**
- Run a backend server with the `run_server` function for handling client-side operations and queries.
### Core Components
6. **Containerized Deployment**
- Fully Dockerized to allow for fast setup and consistent deployment across environments.
1. **FastAPI App (`main.py`)**:
- Hosts the API endpoints.
- Handles API Key authentication for secure use.
- Accepts user inputs (query and report type) and generates a chunked streaming response.
## Installation
To get the project up and running locally, follow these steps:
### Prerequisites
- Python 3.13+
- Docker Engine (for containerized deployments)
- `pip` for managing necessary Python packages
2. **Research Logic (`deepresearch.py`)**:
- Encapsulates research and report generation.
- Utilizes `GPTResearcher` to conduct research, generate reports, and retrieve extended data like images, contexts, or costs.
### Steps:
1. **Clone the Repository**:
``` sh
git clone <repository-url>
cd gpt_researcher
3. **Docker Integration**:
- The application is containerized with a well-defined `Dockerfile`.
- Includes dependency installation, environment setup, and FastAPI server configuration for rapid deployment.
---
## Prerequisites
Before running the application, ensure the following are installed on your system:
- **Docker**: Version 24.0+
- **Python**: Version 3.13+
- **pip**: Pre-installed Python package manager.
---
## Running the Application Locally
### Cloning the Repository
Clone the repository to a directory of your choice:
```shell script
git clone https://git.kvant.cloud/phoenix/gpt-researcher.git
cd gpt-researcher
```
1. **Install Dependencies**: If you are working outside of Docker:
``` sh
pip install -r requirements.txt
### Environment Variable Configuration
Create a `.env` file in the root of the project and define:
```
1. **Run the Application**:
``` sh
python src/mcp/gpt_researcher/server.py
API_KEY=your_api_key # Replace "your_api_key" with your desired key
OPENAI_BASE_URL=
OPENAI_API_KEY=
EMBEDDING=
FAST_LLM=
SMART_LLM=
STRATEGIC_LLM=
OPENAI_API_VERSION=
SERPER_API_KEY=
RETRIEVER=serper
```
1. **Using Docker**: Build and run the Docker container:
``` sh
docker build -t gpt-researcher .
docker run -p 8000:8000 gpt-researcher
### Installing Dependencies
Install the required Python modules based on the generated `requirements.txt`.
```shell script
pip install --no-cache-dir -r requirements.txt
```
The app will be available at `http://localhost:8000`.
## File Structure
### Overview:
``` plaintext
src/
├── mcp/
│ ├── gpt_researcher/
│ │ ├── server.py # Main entrypoint for research workflow and server
│ │ └── utils.py # Utility functions for research formatting and storage
Dockerfile # Defines the container runtime environment
requirements.txt # Python dependencies for the project
### Running the App
Run the FastAPI app locally:
```shell script
uvicorn main:app --host 0.0.0.0 --port 8000
```
### Key Components:
1. **`server.py` **:
- Implements the main research operations:
- `deep_research`, `quick_search`, and `write_report`.
- Contains functions to retrieve research sources, manage contexts, and execute research prompts.
After running, your app will be available at `http://127.0.0.1:8000`.
2. **`utils.py` **:
- Provides utilities for:
- Managing research storage.
- Formatting sources and context for readable outputs.
- Generating responses to different types of user interactions.
---
3. **`Dockerfile` **:
- A configuration file for containerizing the application using the official Python 3.13-slim image.
## Using Docker for Deployment
## Usage Instructions
### Available Endpoints
Once the server is running successfully, you can interact with the application by sending API requests to the available endpoints. Below are the core functionalities:
1. **Quick Research**:
- Quickly get general insights based on a query.
### Building the Docker Image
2. **Deep Research**:
- Run comprehensive research on a specific topic.
Build the Docker image using the **Dockerfile** provided:
3. **Custom Research Reports**:
- Combine sources and analysis to create detailed user reports.
```shell script
docker build -t fastapi-report-service .
```
4. **Access Research Context**:
- Retrieve the context and sources behind a result for deeper understanding.
### Running the Docker Container
Spin up a container and map FastAPI's default port, `8000`:
```shell script
docker run --env-file .env -p 8000:8000 fastapi-report-service
```
---
## API Usage
### 1. **`/get_report`**
- **Method**: `POST`
- **Description**: Generates a report based on user input.
- **Headers**:
- `X-API-KEY`: API Key for authentication.
- **Request Body** (`JSON`):
```json
{
"query": "Research on AI in healthcare",
"report_type": "research_report|resource_report|outline_report|custom_report|detailed_report|subtopic_report|deep"
}
```
- **Streaming Response**: Research and report are provided in chunks.
---
## Code Structure
```
├── Dockerfile # Configuration for Dockerizing the application
├── requirements.txt # Python dependencies list
├── main.py # FastAPI server entry point
├── deepresearch.py # Research-related logic and GPTResearcher integration
└── src/ # Other project files and assets
```
---
## Features Under the Hood
1. **Authentication**:
- An API key mechanism ensures that only authorized users can access endpoints.
2. **Streaming Response**:
- Large research reports are sent incrementally using `StreamingResponse` for better experience and efficiency.
3. **Modular Research Logic**:
- Research and generation tasks are handled by a dedicated class (`ReportGenerator`), making the application extensible.
---
## Future Enhancements
- **Asynchronous Enhancements**:
- Improve async handling for long-running queries.
- **Database Integration**:
- Save request history for auditing and reference purposes.
- **Web Interface**:
- A user-friendly web application for interacting with the API.
---
## Contributing
We follow a **code of conduct** that expects all contributors to maintain a respectful and inclusive environment.
### Contribution Guidelines:
1. Fork the repository and make your development locally.
2. Create a branch for your changes.
``` sh
git checkout -b feature-name
```
1. Make your changes and test them thoroughly.
2. Submit a pull request with a clear description of the changes.
## Code of Conduct
As contributors and maintainers of this project, we pledge to respect all members of the community by fostering a positive and inclusive environment. Instances of abuse, harassment, or unacceptable behavior will not be tolerated.
For details, please refer to the [Code of Conduct](CODE_OF_CONDUCT.md).
## License
This project is licensed under the **MIT License**. For more information, refer to the LICENSE file in the repository.
## Feedback and Support
For any queries, suggestions, or feedback, feel free to open an issue in this repository. You can also contact the maintainers directly via email or GitHub.
Please feel free to expand or modify this README according to your project's future needs!
Contributions are welcome! Feel free to fork the repository, make updates, and submit a pull request.

View file

@ -1,15 +1,5 @@
# GPT Researcher dependencies
gpt-researcher>=0.13.3
python-dotenv~=1.1.0
# MCP dependencies
mcp>=1.6.0
fastapi>=0.103.1
uvicorn>=0.23.2
pydantic>=2.3.0
# Utility dependencies
loguru>=0.7.0
requests~=2.32.3
aiohttp~=3.11.18
asyncio~=3.4.3
fastapi
uvicorn
pydantic
gpt-researcher
asyncio

47
src/main.py Normal file
View file

@ -0,0 +1,47 @@
from fastapi import FastAPI, HTTPException, Request, Depends
from pydantic import BaseModel
from phoenix_technologies import ReportGenerator
from fastapi.responses import StreamingResponse
import os
import asyncio
# FastAPI app instance
app = FastAPI()
# Define a request body structure using Pydantic
class ReportRequest(BaseModel):
query: str
report_type: str
# Define a dependency to validate the API Key
def verify_api_key(request: Request):
# Define the API key from the environment variables
expected_api_key = os.getenv("API_KEY", None)
if not expected_api_key:
raise HTTPException(status_code=500, detail="API key is not configured on the server.")
# Get the API key from the request headers
provided_api_key = request.headers.get("X-API-KEY", None)
# Check if the API key is correct
if not provided_api_key or provided_api_key != expected_api_key:
raise HTTPException(status_code=403, detail="Invalid or missing API key.")
@app.post("/get_report", dependencies=[Depends(verify_api_key)])
async def get_report_endpoint(request: ReportRequest):
"""
Expose the `get_report` function as a POST API endpoint, with a streaming response.
"""
async def generate_report():
try:
# Call the asynchronous get_report function
report_generator = ReportGenerator(request.query, request.report_type)
async for chunk in report_generator:
yield chunk
except Exception as e:
yield f"Error: {str(e)}"
# Return streaming response
return StreamingResponse(generate_report(), media_type="text/plain")

View file

@ -0,0 +1,4 @@
# phoenix-technologies/__init__.py
from .gptresearch.deepresearch import ReportGenerator
__all__ = ["ReportGenerator"]

View file

@ -1,8 +0,0 @@
"""
GPT Researcher MCP Server
This module provides an MCP server implementation for GPT Researcher,
allowing AI assistants to perform web research and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -1,381 +0,0 @@
"""
GPT Researcher MCP Server
This script implements an MCP server for GPT Researcher, allowing AI assistants
to conduct web research and generate reports via the MCP protocol.
"""
import os
import json
import uuid
import logging
from typing import Dict, Any, Optional, AsyncGenerator
import asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from gpt_researcher import GPTResearcher
# Load environment variables
load_dotenv()
from utils import (
research_store,
create_success_response,
handle_exception,
get_researcher_by_id,
format_sources_for_response,
format_context_with_sources,
store_research_results,
create_research_prompt
)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s][%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)
def format_sse_event(event_name: str, data: Dict[str, Any]) -> str:
"""Formatiert Daten als SSE Event String."""
json_data = json.dumps(data)
return f"event: {event_name}\ndata: {json_data}\n\n"
# Initialize FastMCP server
mcp = FastMCP("GPT Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
# Initialize researchers dictionary
if not hasattr(mcp, "researchers"):
mcp.researchers = {}
@mcp.resource("research://{topic}")
async def research_resource(topic: str) -> str:
"""
Provide research context for a given topic directly as a resource.
This allows LLMs to access web-sourced information without explicit function calls.
Args:
topic: The research topic or query
Returns:
String containing the research context with source information
"""
# Check if we've already researched this topic
if topic in research_store:
logger.info(f"Returning cached research for topic: {topic}")
return research_store[topic]["context"]
# If not, conduct the research
logger.info(f"Conducting new research for resource on topic: {topic}")
# Initialize GPT Researcher
researcher = GPTResearcher(topic)
try:
# Conduct the research
await researcher.conduct_research()
# Get the context and sources
context = researcher.get_research_context()
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
# Format with sources included
formatted_context = format_context_with_sources(topic, context, sources)
# Store for future use
store_research_results(topic, context, sources, source_urls, formatted_context)
return formatted_context
except Exception as e:
return f"Error conducting research on '{topic}': {str(e)}"
@mcp.tool()
async def deep_research(query: str) -> AsyncGenerator[str, None]:
"""
Conduct a web deep research on a given query using GPT Researcher.
Use this tool when you need time-sensitive, real-time information like stock prices, news, people, specific knowledge, etc.
Args:
query: The research query or topic
Returns:
Dict containing research status, ID, and the actual research context and sources
that can be used directly by LLMs for context enrichment
"""
logger.info(f"Starting streaming deep research on query: {query}...")
research_id = str(uuid.uuid4())
try:
yield format_sse_event(
"message",
{"research_id": research_id, "query": query, "status": "Research initiated"}
)
# Initialize GPT Researcher
# In neueren GPTResearcher Versionen wird der query beim Initialisieren übergeben
researcher = GPTResearcher(query=query, report_type="research_report", config_path=None)
# Alternativ, falls deine Version es anders handhabt: researcher = GPTResearcher(query)
mcp.researchers[research_id] = researcher # Speichere früh, falls benötigt
yield format_sse_event(
"message",
{"research_id": research_id, "status": "GPT Researcher initialized. Starting information gathering."}
)
await asyncio.sleep(0.1) # Kurze Pause, damit das Event sicher gesendet wird
# Simuliertes Update: Beginn der Recherche
yield format_sse_event(
"message",
{"research_id": research_id, "status": "Starting web crawling and source analysis...", "progress": 10}
)
# Der eigentliche langlaufende Prozess
await researcher.conduct_research() # Dieser Aufruf blockiert für die Dauer der Recherche
# Simuliertes Update: Recherche abgeschlossen, beginne mit der Verarbeitung
yield format_sse_event(
"message",
{"research_id": research_id, "status": "Web crawling finished. Processing and consolidating information...", "progress": 70}
)
await asyncio.sleep(0.1)
logger.info(f"Core research completed for ID: {research_id}. Fetching results...")
# Get the research context and sources
context = researcher.get_research_context()
sources = researcher.get_research_sources() # Dies sind strukturierte Source-Objekte
source_urls = researcher.get_source_urls() # Dies sind nur die URLs
# Store in the research store for the resource API (optional, je nach Logik)
# Überlege, ob formatted_context hier gebraucht wird, wenn der Client die Rohdaten bekommt
store_research_results(query, context, sources, source_urls, context) # Vereinfacht für den Store
# Finale Daten vorbereiten
final_data_payload = {
"research_id": research_id,
"query": query,
"status": "Research completed successfully",
"source_count": len(sources),
"context": context,
"sources": format_sources_for_response(sources),
"source_urls": source_urls
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {research_id}")
except Exception as e:
logger.error(f"Error during deep_research for query '{query}': {e}", exc_info=True)
# Sende ein Fehler-Event an den Client
error_payload = {
"research_id": research_id, # Kann None sein, wenn Fehler sehr früh auftritt
"query": query,
"status": "Error occurred",
"error_message": str(e),
"error_details": "Check server logs for more information."
}
yield format_sse_event("message", error_payload) # Eigener Event-Typ für Fehler
# Du könntest hier auch handle_exception verwenden, wenn es ein SSE-Event zurückgibt
# oder die Exception weiter werfen, wenn FastMCP das besser handhabt.
@mcp.tool()
async def quick_search(query: str) -> AsyncGenerator[str, None]:
"""
Perform a quick web search on a given query and return search results with snippets.
This optimizes for speed over quality and is useful when an LLM doesn't need in-depth
information on a topic.
Args:
query: The search query
Returns:
Dict containing search results and snippets
"""
logger.info(f"Performing quick search on query: {query}...")
# Generate a unique ID for this search session
search_id = str(uuid.uuid4())
# Initialize GPT Researcher
researcher = GPTResearcher(query)
try:
yield format_sse_event(
"message",
{"research_id": search_id, "query": query, "status": "Research initiated"}
)
# Perform quick search
search_results = await researcher.quick_search(query=query)
mcp.researchers[search_id] = researcher
logger.info(f"Quick search completed for ID: {search_id}")
final_data_payload = {
"search_id": search_id,
"query": query,
"result_count": len(search_results) if search_results else 0,
"search_results": search_results
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {search_id}")
except Exception as e:
logger.error(f"Error during deep_research for query '{query}': {e}", exc_info=True)
# Sende ein Fehler-Event an den Client
error_payload = {
"research_id": search_id, # Kann None sein, wenn Fehler sehr früh auftritt
"query": query,
"status": "Error occurred",
"error_message": str(e),
"error_details": "Check server logs for more information."
}
yield format_sse_event("message", error_payload)
@mcp.tool()
async def write_report(research_id: str, custom_prompt: Optional[str] = None) -> AsyncGenerator[str, None]:
"""
Generate a report based on previously conducted research.
Args:
research_id: The ID of the research session from conduct_research
custom_prompt: Optional custom prompt for report generation
Returns:
Dict containing the report content and metadata
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
yield format_sse_event("message", error)
logger.info(f"Generating report for research ID: {research_id}")
try:
# Generate report
report = await researcher.write_report(custom_prompt=custom_prompt)
# Get additional information
sources = researcher.get_research_sources()
costs = researcher.get_costs()
final_data_payload = {
"report": report,
"source_count": len(sources),
"costs": costs
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {research_id}")
except Exception as e:
logger.error(f"Error during deep_research for query: {e}", exc_info=True)
# Sende ein Fehler-Event an den Client
error_payload = {
"research_id": research_id,
"status": "Error occurred",
"error_message": str(e),
"error_details": "Check server logs for more information."
}
yield format_sse_event("message", error_payload)
@mcp.tool()
async def get_research_sources(research_id: str) -> AsyncGenerator[str, None]:
"""
Get the sources used in the research.
Args:
research_id: The ID of the research session
Returns:
Dict containing the research sources
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
yield format_sse_event("message", error)
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
final_data_payload = {
"sources": format_sources_for_response(sources),
"source_urls": source_urls
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("message", final_data_payload)
logger.info(f"Sent final research result for ID: {research_id}")
@mcp.tool()
async def get_research_context(research_id: str) -> Dict[str, Any]:
"""
Get the full context of the research.
Args:
research_id: The ID of the research session
Returns:
Dict containing the research context
"""
success, researcher, error = get_researcher_by_id(mcp.researchers, research_id)
if not success:
return error
context = researcher.get_research_context()
return create_success_response({
"context": context
})
@mcp.prompt()
def research_query(topic: str, goal: str, report_format: str = "research_report") -> str:
"""
Create a research query prompt for GPT Researcher.
Args:
topic: The topic to research
goal: The goal or specific question to answer
report_format: The format of the report to generate
Returns:
A formatted prompt for research
"""
return create_research_prompt(topic, goal, report_format)
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Check if API keys are set
if not os.getenv("OPENAI_API_KEY"):
logger.error("OPENAI_API_KEY not found. Please set it in your .env file.")
return
# Add startup message
logger.info("Starting GPT Researcher MCP Server...")
print("🚀 GPT Researcher MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
# Note: If we reach here, the server has stopped
logger.info("MCP Server has stopped")
except Exception as e:
logger.error(f"Error running MCP server: {str(e)}")
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()

View file

@ -1,139 +0,0 @@
"""
GPT Researcher MCP Server Utilities
This module provides utility functions and helpers for the GPT Researcher MCP Server.
"""
import sys
from typing import Dict, List, Optional, Tuple, Any
from loguru import logger
# Configure logging for console only (no file logging)
logger.configure(handlers=[{"sink": sys.stderr, "level": "INFO"}])
# Research store to track ongoing research topics and contexts
research_store = {}
# API Response Utilities
def create_error_response(message: str) -> Dict[str, Any]:
"""Create a standardized error response"""
return {"status": "error", "message": message}
def create_success_response(data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a standardized success response"""
return {"status": "success", **data}
def handle_exception(e: Exception, operation: str) -> Dict[str, Any]:
"""Handle exceptions in a consistent way"""
error_message = str(e)
logger.error(f"{operation} failed: {error_message}")
return create_error_response(error_message)
def get_researcher_by_id(researchers_dict: Dict, research_id: str) -> Tuple[bool, Any, Dict[str, Any]]:
"""
Helper function to retrieve a researcher by ID.
Args:
researchers_dict: Dictionary of research objects
research_id: The ID of the research session
Returns:
Tuple containing (success, researcher_object, error_response)
"""
if not researchers_dict or research_id not in researchers_dict:
return False, None, create_error_response("Research ID not found. Please conduct research first.")
return True, researchers_dict[research_id], {}
def format_sources_for_response(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Format source information for API responses.
Args:
sources: List of source dictionaries
Returns:
Formatted source list for API responses
"""
return [
{
"title": source.get("title", "Unknown"),
"url": source.get("url", ""),
"content_length": len(source.get("content", ""))
}
for source in sources
]
def format_context_with_sources(topic: str, context: str, sources: List[Dict[str, Any]]) -> str:
"""
Format research context with sources for display.
Args:
topic: Research topic
context: Research context
sources: List of sources
Returns:
Formatted context string with sources
"""
formatted_context = f"## Research: {topic}\n\n{context}\n\n"
formatted_context += "## Sources:\n"
for i, source in enumerate(sources):
formatted_context += f"{i+1}. {source.get('title', 'Unknown')}: {source.get('url', '')}\n"
return formatted_context
def store_research_results(topic: str, context: str, sources: List[Dict[str, Any]],
source_urls: List[str], formatted_context: Optional[str] = None):
"""
Store research results in the research store.
Args:
topic: Research topic
context: Research context
sources: List of sources
source_urls: List of source URLs
formatted_context: Optional pre-formatted context
"""
research_store[topic] = {
"context": formatted_context or context,
"sources": sources,
"source_urls": source_urls
}
def create_research_prompt(topic: str, goal: str, report_format: str = "research_report") -> str:
"""
Create a research query prompt for GPT Researcher.
Args:
topic: The topic to research
goal: The goal or specific question to answer
report_format: The format of the report to generate
Returns:
A formatted prompt for research
"""
return f"""
Please research the following topic: {topic}
Goal: {goal}
You have two methods to access web-sourced information:
1. Use the "research://{topic}" resource to directly access context about this topic if it exists
or if you want to get straight to the information without tracking a research ID.
2. Use the conduct_research tool to perform new research and get a research_id for later use.
This tool also returns the context directly in its response, which you can use immediately.
After getting context, you can:
- Use it directly in your response
- Use the write_report tool with a custom prompt to generate a structured {report_format}
You can also use get_research_sources to view additional details about the information sources.
"""

View file

@ -0,0 +1,109 @@
from gpt_researcher import GPTResearcher
class ReportGenerator:
"""
A class to handle the generation of research-based reports.
This class integrates with GPTResearcher to conduct research, retrieve results,
and format them into consumable chunks for asynchronous streaming.
"""
def __init__(self, query: str, report_type: str):
"""
Initializes the ReportGenerator instance with a query and report type.
:param query: The main topic or question for research.
:param report_type: The type of report to generate.
"""
self.query = query
self.report_type = report_type
self.researcher = GPTResearcher(query, report_type)
self._chunks = None # Placeholder for report chunks
self._index = 0 # Iterator index for streaming
def __aiter__(self):
"""
Makes the ReportGenerator instance asynchronously iterable.
:return: Self instance for iteration.
"""
return self
async def __anext__(self):
"""
Defines the logic for asynchronous iteration over report chunks.
:return: The next chunk of the report.
:raises StopAsyncIteration: Raised when all chunks are yielded.
"""
if self._chunks is None:
# Generate report chunks on first access
self._chunks = await self._generate_report_chunks()
if self._index >= len(self._chunks):
# End iteration once all chunks are exhausted
raise StopAsyncIteration
chunk = self._chunks[self._index]
self._index += 1
return chunk
async def _generate_report_chunks(self):
"""
Conducts research using the researcher and generates the report in chunks.
:return: A list of report chunks.
"""
try:
# Asynchronous research and report generation
research_result = await self.researcher.conduct_research()
report = await self.researcher.write_report()
# Retrieve additional research details
research_context = self.researcher.get_research_context() or {}
research_costs = self.researcher.get_costs() or {}
research_images = self.researcher.get_research_images() or []
research_sources = self.researcher.get_research_sources() or []
# Construct the complete research response
full_report = {
"report": report,
"context": research_context,
"costs": research_costs,
"images": research_images,
"sources": research_sources,
}
# Generate chunks for streaming
return self._split_into_chunks(full_report)
except Exception as e:
# Handle potential errors during research and report generation
raise RuntimeError(f"Error generating report chunks: {e}")
def _split_into_chunks(self, report):
"""
Splits the report dictionary into smaller chunks for easier streaming.
:param report: A dictionary containing the full report data.
:return: A list of formatted text chunks.
"""
if not report:
raise ValueError("Cannot split an empty or None report into chunks.")
chunks = []
for key, value in report.items():
chunks.append(f"{key}: {value}")
return chunks
def get_query_details(self):
"""
Retrieves the details of the query and report type.
:return: A dictionary containing the query and report type.
"""
return {
"query": self.query,
"report_type": self.report_type
}

View file

@ -1,8 +0,0 @@
"""
SMD MCP Server
This module provides an MCP server implementation for SMD,
allowing AI assistants to perform searches and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -1,186 +0,0 @@
"""
SMD Researcher MCP Server
This script implements an MCP server for SMD Researcher, allowing AI assistants
to conduct research and generate reports via the MCP protocol.
"""
import os
import logging
import aiohttp
import asyncio
import requests
import time
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
# Load environment variables
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s][%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("SMD Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
async def summarize_to_words(text: str, title: str, target_word_count: int = 1000) -> str:
url = f"https://maas.ai-2.kvant.cloud/engines/{os.getenv('SWISSDOX_SUMMARIZING_MODEL', '')}/chat/completions"
headers = {
"x-litellm-api-key": f"{os.getenv('SWISSDOX_SUMMARIZING_MODEL_APIKEY', '')}",
"Content-type": "application/json",
}
payload = {
"model": os.getenv('SWISSDOX_SUMMARIZING_MODEL', ''),
"messages": [
{
"role": "text summarizer",
"content": f"You are summarizing the user input to a maximum of {target_word_count}"
},
{
"role": "user",
"content": f"{title} - {text}"
}
]
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 200:
data = await response.json()
return data.get("choices")[0].get("message").get("content")
else:
return await response.text()
async def smd_detail_article(article_id):
logger.info("Starting smd_detail_article function.")
start_time = time.perf_counter()
url = f"https://api.swissdox.ch/api/documents/{article_id}"
headers = {
"Authorization": f"Bearer {os.getenv('SWISSDOX_BEARER_TOKEN', '')}",
"Content-type": "application/json",
}
payload = {"filters": [], "pagination": {"pageSize": 1, "currentPage": 1}}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 200:
data = await response.json()
summarized_content = await summarize_to_words(title=data.get("title"), text=data.get("text"), target_word_count=10000)
execution_time = time.perf_counter() - start_time
logger.info(f"smd_detail_article executed in {execution_time:.2f} seconds.")
logger.info(f"smd_article_summarization {summarized_content}")
return {
"message": summarized_content,
"article_id": article_id
}
else:
return {
"message": await response.text(),
"article_id": article_id
}
@mcp.tool()
async def smd_research(search_query: str = "Bundesrat", date_from: str = "2024-05-30T22:00:00.000Z", date_to: str = "2025-05-31T21:59:59.999Z") -> dict:
"""
Execute a deep search on a given query using SMD Researcher.
Use this tool when you need research on a topic.
Args:
search_query: The SMD search query, there are Logical Operators available (AND, OR, NOT) and for a excact match use "+" before the word. For excluding use "-" before the word. For queries with multiple words use quotes. Formulate the Query in German. Enrich the query with the relevant context keywords of the topic.
date_from: The date to start research from, in the format YYYY-MM-DDTHH:MM:SS.SSSZ
date_to: The date to end research at, in the format YYYY-MM-DDTHH:MM:SS.SSSZ
Returns:
String containing research status, ID, and the actual research context
"""
query = {
"sort": {
"field": "score",
"direction": "desc"
},
"filters": [
{
"field": "datetime",
"value": [
os.getenv('SWISSDOX_DATEFROM', '2020-12-31T23:00:00.000Z'),
os.getenv('SWISSDOX_DATETO', '2023-12-31T22:59:00.000Z')
]
},
{
"field": "newspaper",
"value": [
os.getenv('SWISSDOX_NEWSPAPER', 'NZZ')
]
},
{
"field": "query_text",
"value": [search_query]
}
],
"exact": False,
"pagination": {
"pageSize": os.getenv('SWISSDOX_PAGESIZE', '10'),
"currentPage": 1
},
"onlyResults": False
}
url = "https://api.swissdox.ch/api/documents/search"
headers = {
"authorization": f"Bearer {os.getenv('SWISSDOX_BEARER_TOKEN', '')}",
"content-type": "application/json",
}
response = requests.post(url, headers=headers, json=query)
if response.status_code == 200:
result = response.json()
articles = result.get("data", [])
facets = result.get("facets", [])
tasks = []
for article in articles:
article_id = article.get("id")
if article_id:
tasks.append(smd_detail_article(article_id))
detailed_articles = await asyncio.gather(*tasks)
logger.info(f"detailed_articles {detailed_articles}")
return {
"related_persons": facets.get("persons", []),
"related_organizations": facets.get("persons", []),
"detailed_articles": detailed_articles
}
else:
return {
"message": response.text
}
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Add startup message
logger.info("Starting GPT Researcher MCP Server...")
print("🚀 GPT Researcher MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
# Note: If we reach here, the server has stopped
logger.info("MCP Server has stopped")
except Exception as e:
logger.error(f"Error running MCP server: {str(e)}")
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()

View file

@ -1,8 +0,0 @@
"""
GPT Researcher MCP Server
This module provides an MCP server implementation for GPT Researcher,
allowing AI assistants to perform web research and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -1,83 +0,0 @@
"""
GPT Researcher MCP Server
This script implements an MCP server for GPT Researcher, allowing AI assistants
to conduct web research and generate reports via the MCP protocol.
"""
import os
import json
from typing import Dict, Any, AsyncGenerator
import asyncio
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("GPT Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
@mcp.tool()
async def updates(tool_input: Dict[str, Any]) -> AsyncGenerator[str, None]:
"""
Ein MCP-Tool, das Zwischenupdates über SSE streamt.
tool_input ist ein Dictionary, das die Eingabeparameter des Tools enthält.
"""
print(f"Tool gestartet mit Input: {tool_input}")
async def _process_step_1():
await asyncio.sleep(1)
return {"status": "Step 1 complete", "details": "Initial processing done."}
async def _process_step_2():
await asyncio.sleep(2)
return {"status": "Step 2 in progress", "progress": 50}
async def _process_step_3():
await asyncio.sleep(1)
return {"status": "Step 2 complete", "progress": 100, "intermediate_result": "Partial data available"}
async def _generate_final_output():
await asyncio.sleep(1)
return {"final_data": "All steps finished successfully.", "summary": tool_input.get("summary_needed", False)}
# Schritt 1
update1 = await _process_step_1()
yield f"event: tool_update\ndata: {json.dumps(update1)}\n\n"
print("Update 1 gesendet")
# Schritt 2
update2 = await _process_step_2()
yield f"event: tool_update\ndata: {json.dumps(update2)}\n\n"
print("Update 2 gesendet")
# Schritt 3
update3 = await _process_step_3()
yield f"event: tool_update\ndata: {json.dumps(update3)}\n\n"
print("Update 3 gesendet")
# Finale Ausgabe (kann auch als spezielles Event gesendet werden)
final_output = await _generate_final_output()
yield f"event: tool_result\ndata: {json.dumps(final_output)}\n\n"
print("Finales Ergebnis gesendet")
# Optional: Signal für Stream-Ende, falls vom Client benötigt oder von FastMCP erwartet
# yield "event: stream_end\ndata: {}\n\n"
print("Tool-Ausführung beendet.")
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Add startup message
print("🚀 Test MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
except Exception as e:
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()