From cda44df1a096426003582a89451ccf1bfe13319a Mon Sep 17 00:00:00 2001 From: ThomasTaroni Date: Sun, 1 Jun 2025 11:46:11 +0200 Subject: [PATCH] Refactor deep_research to support SSE-based streaming. Updated the `deep_research` function to use asynchronous generators and send Server-Sent Events (SSE) for real-time streaming updates. Added a new utility for formatting SSE events and improved research lifecycle visibility with intermediate progress steps and error handling. --- .../gpt_researcher/server.py | 94 ++++++++++++++----- 1 file changed, 72 insertions(+), 22 deletions(-) diff --git a/src/phoenix_technologies/gpt_researcher/server.py b/src/phoenix_technologies/gpt_researcher/server.py index 0627c87..59d08ab 100644 --- a/src/phoenix_technologies/gpt_researcher/server.py +++ b/src/phoenix_technologies/gpt_researcher/server.py @@ -6,10 +6,11 @@ to conduct web research and generate reports via the MCP protocol. """ import os -import sys +import json import uuid import logging -from typing import Dict, Any, Optional, List +from typing import Dict, Any, Optional, AsyncGenerator +import asyncio from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP from gpt_researcher import GPTResearcher @@ -35,6 +36,11 @@ logging.basicConfig( logger = logging.getLogger(__name__) +def format_sse_event(event_name: str, data: Dict[str, Any]) -> str: + """Formatiert Daten als SSE Event String.""" + json_data = json.dumps(data) + return f"event: {event_name}\ndata: {json_data}\n\n" + # Initialize FastMCP server mcp = FastMCP("GPT Researcher", host="0.0.0.0", port=8000, timeout_keep_alive=720) @@ -88,7 +94,7 @@ async def research_resource(topic: str) -> str: @mcp.tool() -async def deep_research(query: str) -> Dict[str, Any]: +async def deep_research(query: str) -> AsyncGenerator[str, None]: """ Conduct a web deep research on a given query using GPT Researcher. Use this tool when you need time-sensitive, real-time information like stock prices, news, people, specific knowledge, etc. @@ -100,39 +106,83 @@ async def deep_research(query: str) -> Dict[str, Any]: Dict containing research status, ID, and the actual research context and sources that can be used directly by LLMs for context enrichment """ - logger.info(f"Conducting research on query: {query}...") - - # Generate a unique ID for this research session + logger.info(f"Starting streaming deep research on query: {query}...") research_id = str(uuid.uuid4()) - # Initialize GPT Researcher - researcher = GPTResearcher(query) - - # Start research try: - await researcher.conduct_research() - mcp.researchers[research_id] = researcher - logger.info(f"Research completed for ID: {research_id}") + yield format_sse_event( + "tool_update", + {"research_id": research_id, "query": query, "status": "Research initiated"} + ) + + # Initialize GPT Researcher + # In neueren GPTResearcher Versionen wird der query beim Initialisieren übergeben + researcher = GPTResearcher(query=query, report_type="research_report", config_path=None) + # Alternativ, falls deine Version es anders handhabt: researcher = GPTResearcher(query) + + mcp.researchers[research_id] = researcher # Speichere früh, falls benötigt + + yield format_sse_event( + "tool_update", + {"research_id": research_id, "status": "GPT Researcher initialized. Starting information gathering."} + ) + await asyncio.sleep(0.1) # Kurze Pause, damit das Event sicher gesendet wird + + # Simuliertes Update: Beginn der Recherche + yield format_sse_event( + "tool_update", + {"research_id": research_id, "status": "Starting web crawling and source analysis...", "progress": 10} + ) + + # Der eigentliche langlaufende Prozess + await researcher.conduct_research() # Dieser Aufruf blockiert für die Dauer der Recherche + + # Simuliertes Update: Recherche abgeschlossen, beginne mit der Verarbeitung + yield format_sse_event( + "tool_update", + {"research_id": research_id, "status": "Web crawling finished. Processing and consolidating information...", "progress": 70} + ) + await asyncio.sleep(0.1) + + logger.info(f"Core research completed for ID: {research_id}. Fetching results...") # Get the research context and sources context = researcher.get_research_context() - sources = researcher.get_research_sources() - source_urls = researcher.get_source_urls() + sources = researcher.get_research_sources() # Dies sind strukturierte Source-Objekte + source_urls = researcher.get_source_urls() # Dies sind nur die URLs - # Store in the research store for the resource API - store_research_results(query, context, sources, source_urls) + # Store in the research store for the resource API (optional, je nach Logik) + # Überlege, ob formatted_context hier gebraucht wird, wenn der Client die Rohdaten bekommt + store_research_results(query, context, sources, source_urls, context) # Vereinfacht für den Store - return create_success_response({ + # Finale Daten vorbereiten + final_data_payload = { "research_id": research_id, "query": query, + "status": "Research completed successfully", "source_count": len(sources), "context": context, - "sources": format_sources_for_response(sources), + "sources": format_sources_for_response(sources), # Nutze deine Formatierungsfunktion "source_urls": source_urls - }) - except Exception as e: - return handle_exception(e, "Research") + } + # Sende das finale Ergebnis als 'tool_result' Event + yield format_sse_event("tool_result", final_data_payload) + logger.info(f"Sent final research result for ID: {research_id}") + + except Exception as e: + logger.error(f"Error during deep_research for query '{query}': {e}", exc_info=True) + # Sende ein Fehler-Event an den Client + error_payload = { + "research_id": research_id, # Kann None sein, wenn Fehler sehr früh auftritt + "query": query, + "status": "Error occurred", + "error_message": str(e), + "error_details": "Check server logs for more information." + } + yield format_sse_event("tool_error", error_payload) # Eigener Event-Typ für Fehler + # Du könntest hier auch handle_exception verwenden, wenn es ein SSE-Event zurückgibt + # oder die Exception weiter werfen, wenn FastMCP das besser handhabt. @mcp.tool() async def quick_search(query: str) -> Dict[str, Any]: