From cd79fe99be0e14346d3042bf80938553856a71f7 Mon Sep 17 00:00:00 2001 From: ThomasTaroni Date: Fri, 25 Apr 2025 18:20:51 +0200 Subject: [PATCH] Refactor `ReportGenerator` and integrate custom logs handler Introduced `CustomLogsHandler` to manage log handling and WebSocket integration in `ReportGenerator`. Simplified and restructured report generation logic for improved maintainability. Removed obsolete methods and enhanced overall readability with cleaner code structure. --- .../gptresearch/deepresearch.py | 114 +++++------------- 1 file changed, 27 insertions(+), 87 deletions(-) diff --git a/src/phoenix_technologies/gptresearch/deepresearch.py b/src/phoenix_technologies/gptresearch/deepresearch.py index b128433..e693f45 100644 --- a/src/phoenix_technologies/gptresearch/deepresearch.py +++ b/src/phoenix_technologies/gptresearch/deepresearch.py @@ -1,107 +1,47 @@ from gpt_researcher import GPTResearcher +from typing import Dict, Any, AsyncGenerator +class CustomLogsHandler: + """A custom Logs handler class to handle JSON data.""" + def __init__(self): + self.logs = [] # Initialize logs to store data + + async def send_json(self, data: Dict[str, Any]) -> AsyncGenerator[str, Any]: + """Send JSON data and log it.""" + yield f"My custom Log: {data}" + class ReportGenerator: - """ - A class to handle the generation of research-based reports. - - This class integrates with GPTResearcher to conduct research, retrieve results, - and format them into consumable chunks for asynchronous streaming. - """ - def __init__(self, query: str, report_type: str): """ - Initializes the ReportGenerator instance with a query and report type. - - :param query: The main topic or question for research. - :param report_type: The type of report to generate. + Initializes the ReportGenerator with a query and report type. """ self.query = query self.report_type = report_type - self.researcher = GPTResearcher(query, report_type) - self._chunks = None # Placeholder for report chunks - self._index = 0 # Iterator index for streaming + # Initialize researcher with a custom WebSocket + self.custom_logs_handler = CustomLogsHandler() - def __aiter__(self): + self.researcher = GPTResearcher(query, report_type, websocket=self.custom_logs_handler) + + async def generate_report(self): """ - Makes the ReportGenerator instance asynchronously iterable. - - :return: Self instance for iteration. + Conducts research and generates the report along with additional information. """ - return self + # Conduct research + research_result = await self.researcher.conduct_research() + report = await self.researcher.write_report() - async def __anext__(self): - """ - Defines the logic for asynchronous iteration over report chunks. + # Retrieve additional information + research_context = self.researcher.get_research_context() + research_costs = self.researcher.get_costs() + research_images = self.researcher.get_research_images() + research_sources = self.researcher.get_research_sources() - :return: The next chunk of the report. - :raises StopAsyncIteration: Raised when all chunks are yielded. - """ - if self._chunks is None: - # Generate report chunks on first access - self._chunks = await self._generate_report_chunks() - - if self._index >= len(self._chunks): - # End iteration once all chunks are exhausted - raise StopAsyncIteration - - chunk = self._chunks[self._index] - self._index += 1 - return chunk - - async def _generate_report_chunks(self): - """ - Conducts research using the researcher and generates the report in chunks. - - :return: A list of report chunks. - """ - try: - # Asynchronous research and report generation - research_result = await self.researcher.conduct_research() - report = await self.researcher.write_report() - - # Retrieve additional research details - research_context = self.researcher.get_research_context() or {} - research_costs = self.researcher.get_costs() or {} - research_images = self.researcher.get_research_images() or [] - research_sources = self.researcher.get_research_sources() or [] - - # Construct the complete research response - full_report = { - "report": report, - "context": research_context, - "costs": research_costs, - "images": research_images, - "sources": research_sources, - } - - # Generate chunks for streaming - return self._split_into_chunks(full_report) - - except Exception as e: - # Handle potential errors during research and report generation - raise RuntimeError(f"Error generating report chunks: {e}") - - def _split_into_chunks(self, report): - """ - Splits the report dictionary into smaller chunks for easier streaming. - - :param report: A dictionary containing the full report data. - :return: A list of formatted text chunks. - """ - if not report: - raise ValueError("Cannot split an empty or None report into chunks.") - - chunks = [] - for key, value in report.items(): - chunks.append(f"{key}: {value}") - return chunks + return self.custom_logs_handler def get_query_details(self): """ - Retrieves the details of the query and report type. - - :return: A dictionary containing the query and report type. + Returns details of the query and report type. """ return { "query": self.query,