Refactor deep_research to support SSE-based streaming.

Updated the `deep_research` function to use asynchronous generators and send Server-Sent Events (SSE) for real-time streaming updates. Added a new utility for formatting SSE events and improved research lifecycle visibility with intermediate progress steps and error handling.
This commit is contained in:
ThomasTaroni 2025-06-01 11:46:11 +02:00
parent 11bfff7ff7
commit cda44df1a0

View file

@ -6,10 +6,11 @@ to conduct web research and generate reports via the MCP protocol.
"""
import os
import sys
import json
import uuid
import logging
from typing import Dict, Any, Optional, List
from typing import Dict, Any, Optional, AsyncGenerator
import asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from gpt_researcher import GPTResearcher
@ -35,6 +36,11 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
def format_sse_event(event_name: str, data: Dict[str, Any]) -> str:
"""Formatiert Daten als SSE Event String."""
json_data = json.dumps(data)
return f"event: {event_name}\ndata: {json_data}\n\n"
# Initialize FastMCP server
mcp = FastMCP("GPT Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720)
@ -88,7 +94,7 @@ async def research_resource(topic: str) -> str:
@mcp.tool()
async def deep_research(query: str) -> Dict[str, Any]:
async def deep_research(query: str) -> AsyncGenerator[str, None]:
"""
Conduct a web deep research on a given query using GPT Researcher.
Use this tool when you need time-sensitive, real-time information like stock prices, news, people, specific knowledge, etc.
@ -100,39 +106,83 @@ async def deep_research(query: str) -> Dict[str, Any]:
Dict containing research status, ID, and the actual research context and sources
that can be used directly by LLMs for context enrichment
"""
logger.info(f"Conducting research on query: {query}...")
# Generate a unique ID for this research session
logger.info(f"Starting streaming deep research on query: {query}...")
research_id = str(uuid.uuid4())
# Initialize GPT Researcher
researcher = GPTResearcher(query)
# Start research
try:
await researcher.conduct_research()
mcp.researchers[research_id] = researcher
logger.info(f"Research completed for ID: {research_id}")
yield format_sse_event(
"tool_update",
{"research_id": research_id, "query": query, "status": "Research initiated"}
)
# Initialize GPT Researcher
# In neueren GPTResearcher Versionen wird der query beim Initialisieren übergeben
researcher = GPTResearcher(query=query, report_type="research_report", config_path=None)
# Alternativ, falls deine Version es anders handhabt: researcher = GPTResearcher(query)
mcp.researchers[research_id] = researcher # Speichere früh, falls benötigt
yield format_sse_event(
"tool_update",
{"research_id": research_id, "status": "GPT Researcher initialized. Starting information gathering."}
)
await asyncio.sleep(0.1) # Kurze Pause, damit das Event sicher gesendet wird
# Simuliertes Update: Beginn der Recherche
yield format_sse_event(
"tool_update",
{"research_id": research_id, "status": "Starting web crawling and source analysis...", "progress": 10}
)
# Der eigentliche langlaufende Prozess
await researcher.conduct_research() # Dieser Aufruf blockiert für die Dauer der Recherche
# Simuliertes Update: Recherche abgeschlossen, beginne mit der Verarbeitung
yield format_sse_event(
"tool_update",
{"research_id": research_id, "status": "Web crawling finished. Processing and consolidating information...", "progress": 70}
)
await asyncio.sleep(0.1)
logger.info(f"Core research completed for ID: {research_id}. Fetching results...")
# Get the research context and sources
context = researcher.get_research_context()
sources = researcher.get_research_sources()
source_urls = researcher.get_source_urls()
sources = researcher.get_research_sources() # Dies sind strukturierte Source-Objekte
source_urls = researcher.get_source_urls() # Dies sind nur die URLs
# Store in the research store for the resource API
store_research_results(query, context, sources, source_urls)
# Store in the research store for the resource API (optional, je nach Logik)
# Überlege, ob formatted_context hier gebraucht wird, wenn der Client die Rohdaten bekommt
store_research_results(query, context, sources, source_urls, context) # Vereinfacht für den Store
return create_success_response({
# Finale Daten vorbereiten
final_data_payload = {
"research_id": research_id,
"query": query,
"status": "Research completed successfully",
"source_count": len(sources),
"context": context,
"sources": format_sources_for_response(sources),
"sources": format_sources_for_response(sources), # Nutze deine Formatierungsfunktion
"source_urls": source_urls
})
except Exception as e:
return handle_exception(e, "Research")
}
# Sende das finale Ergebnis als 'tool_result' Event
yield format_sse_event("tool_result", final_data_payload)
logger.info(f"Sent final research result for ID: {research_id}")
except Exception as e:
logger.error(f"Error during deep_research for query '{query}': {e}", exc_info=True)
# Sende ein Fehler-Event an den Client
error_payload = {
"research_id": research_id, # Kann None sein, wenn Fehler sehr früh auftritt
"query": query,
"status": "Error occurred",
"error_message": str(e),
"error_details": "Check server logs for more information."
}
yield format_sse_event("tool_error", error_payload) # Eigener Event-Typ für Fehler
# Du könntest hier auch handle_exception verwenden, wenn es ein SSE-Event zurückgibt
# oder die Exception weiter werfen, wenn FastMCP das besser handhabt.
@mcp.tool()
async def quick_search(query: str) -> Dict[str, Any]: