mcp-gpt-researcher/src/phoenix_technologies/gptresearch/deepresearch.py
ThomasTaroni ade4511a87 Add report generation step to the main async process
The `generate_report` method is now explicitly called within the main process, ensuring the researcher's report generation is initiated properly. Updated its return type annotation for clarity in `deepresearch.py`.
2025-04-25 18:39:33 +02:00

68 lines
2.4 KiB
Python

from gpt_researcher import GPTResearcher
from typing import Dict, Any, AsyncGenerator, Coroutine
class CustomLogsHandler:
"""A custom Logs handler class to handle JSON data."""
def __init__(self, logs=None):
if logs is None:
logs = []
self.logs = logs # Initialize logs to store data
async def send_json(self, data: Dict[str, Any]) -> None:
"""Send JSON data and log it."""
self.logs.append(data) # Append data to logs
print(f"My custom Log: {data}") # For demonstration, print the log
class ReportGenerator:
def __init__(self, query: str, report_type: str):
"""
Initializes the ReportGenerator with a query and report type.
"""
self.query = query
self.report_type = report_type
# Initialize researcher with a custom WebSocket
self.logs = []
self.custom_logs_handler = CustomLogsHandler(self.logs)
self.researcher = GPTResearcher(query, report_type, websocket=self.custom_logs_handler)
async def generate_report(self) -> CustomLogsHandler:
"""
Conducts research and generates the report along with additional information.
"""
# Conduct research
research_result = await self.researcher.conduct_research()
report = await self.researcher.write_report()
# Retrieve additional information
research_context = self.researcher.get_research_context()
research_costs = self.researcher.get_costs()
research_images = self.researcher.get_research_images()
research_sources = self.researcher.get_research_sources()
return self.custom_logs_handler
def get_query_details(self):
"""
Returns details of the query and report type.
"""
return {
"query": self.query,
"report_type": self.report_type
}
# Implementing the asynchronous iterator protocol
def __aiter__(self):
"""Initialize and return the asynchronous iterator."""
self._log_index = 0 # Iterator index
return self
async def __anext__(self):
"""Return the next log asynchronously."""
if self._log_index < len(self.logs):
log = self.logs[self._log_index]
self._log_index += 1
return log # Return the next log
else:
raise StopAsyncIteration # Stop when logs are exhausted