Add GPT Researcher MCP server implementation

Introduce an MCP server for GPT Researcher using FastMCP, enabling AI assistants to perform web research and generate reports via SSE. Includes async processing steps with real-time updates and a runnable server script.
This commit is contained in:
ThomasTaroni 2025-06-01 13:31:42 +02:00
parent 895671189e
commit 60c441c817
2 changed files with 91 additions and 0 deletions

View file

@ -0,0 +1,8 @@
"""
GPT Researcher MCP Server
This module provides an MCP server implementation for GPT Researcher,
allowing AI assistants to perform web research and generate reports via the MCP protocol.
"""
__version__ = "0.1.0"

View file

@ -0,0 +1,83 @@
"""
GPT Researcher MCP Server
This script implements an MCP server for GPT Researcher, allowing AI assistants
to conduct web research and generate reports via the MCP protocol.
"""
import os
import json
from typing import Dict, Any, AsyncGenerator
import asyncio
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("GPT Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
@mcp.tool()
async def updates(tool_input: Dict[str, Any]) -> AsyncGenerator[str, None]:
"""
Ein MCP-Tool, das Zwischenupdates über SSE streamt.
tool_input ist ein Dictionary, das die Eingabeparameter des Tools enthält.
"""
print(f"Tool gestartet mit Input: {tool_input}")
async def _process_step_1():
await asyncio.sleep(1)
return {"status": "Step 1 complete", "details": "Initial processing done."}
async def _process_step_2():
await asyncio.sleep(2)
return {"status": "Step 2 in progress", "progress": 50}
async def _process_step_3():
await asyncio.sleep(1)
return {"status": "Step 2 complete", "progress": 100, "intermediate_result": "Partial data available"}
async def _generate_final_output():
await asyncio.sleep(1)
return {"final_data": "All steps finished successfully.", "summary": tool_input.get("summary_needed", False)}
# Schritt 1
update1 = await _process_step_1()
yield f"event: tool_update\ndata: {json.dumps(update1)}\n\n"
print("Update 1 gesendet")
# Schritt 2
update2 = await _process_step_2()
yield f"event: tool_update\ndata: {json.dumps(update2)}\n\n"
print("Update 2 gesendet")
# Schritt 3
update3 = await _process_step_3()
yield f"event: tool_update\ndata: {json.dumps(update3)}\n\n"
print("Update 3 gesendet")
# Finale Ausgabe (kann auch als spezielles Event gesendet werden)
final_output = await _generate_final_output()
yield f"event: tool_result\ndata: {json.dumps(final_output)}\n\n"
print("Finales Ergebnis gesendet")
# Optional: Signal für Stream-Ende, falls vom Client benötigt oder von FastMCP erwartet
# yield "event: stream_end\ndata: {}\n\n"
print("Tool-Ausführung beendet.")
def run_server():
"""Run the MCP server using FastMCP's built-in event loop handling."""
# Add startup message
print("🚀 Test MCP Server starting... Check researcher_mcp_server.log for details")
# Let FastMCP handle the event loop
try:
mcp.run("sse")
except Exception as e:
print(f"❌ MCP Server error: {str(e)}")
return
print("✅ MCP Server stopped")
if __name__ == "__main__":
# Use the non-async approach to avoid asyncio nesting issues
run_server()