Refactor ReportGenerator and integrate custom logs handler

Introduced `CustomLogsHandler` to manage log handling and WebSocket integration in `ReportGenerator`. Simplified and restructured report generation logic for improved maintainability. Removed obsolete methods and enhanced overall readability with cleaner code structure.
This commit is contained in:
ThomasTaroni 2025-04-25 18:20:51 +02:00
parent 6993a52d47
commit cd79fe99be

View file

@ -1,107 +1,47 @@
from gpt_researcher import GPTResearcher from gpt_researcher import GPTResearcher
from typing import Dict, Any, AsyncGenerator
class CustomLogsHandler:
"""A custom Logs handler class to handle JSON data."""
def __init__(self):
self.logs = [] # Initialize logs to store data
async def send_json(self, data: Dict[str, Any]) -> AsyncGenerator[str, Any]:
"""Send JSON data and log it."""
yield f"My custom Log: {data}"
class ReportGenerator: class ReportGenerator:
"""
A class to handle the generation of research-based reports.
This class integrates with GPTResearcher to conduct research, retrieve results,
and format them into consumable chunks for asynchronous streaming.
"""
def __init__(self, query: str, report_type: str): def __init__(self, query: str, report_type: str):
""" """
Initializes the ReportGenerator instance with a query and report type. Initializes the ReportGenerator with a query and report type.
:param query: The main topic or question for research.
:param report_type: The type of report to generate.
""" """
self.query = query self.query = query
self.report_type = report_type self.report_type = report_type
self.researcher = GPTResearcher(query, report_type) # Initialize researcher with a custom WebSocket
self._chunks = None # Placeholder for report chunks self.custom_logs_handler = CustomLogsHandler()
self._index = 0 # Iterator index for streaming
def __aiter__(self): self.researcher = GPTResearcher(query, report_type, websocket=self.custom_logs_handler)
async def generate_report(self):
""" """
Makes the ReportGenerator instance asynchronously iterable. Conducts research and generates the report along with additional information.
:return: Self instance for iteration.
""" """
return self # Conduct research
research_result = await self.researcher.conduct_research()
report = await self.researcher.write_report()
async def __anext__(self): # Retrieve additional information
""" research_context = self.researcher.get_research_context()
Defines the logic for asynchronous iteration over report chunks. research_costs = self.researcher.get_costs()
research_images = self.researcher.get_research_images()
research_sources = self.researcher.get_research_sources()
:return: The next chunk of the report. return self.custom_logs_handler
:raises StopAsyncIteration: Raised when all chunks are yielded.
"""
if self._chunks is None:
# Generate report chunks on first access
self._chunks = await self._generate_report_chunks()
if self._index >= len(self._chunks):
# End iteration once all chunks are exhausted
raise StopAsyncIteration
chunk = self._chunks[self._index]
self._index += 1
return chunk
async def _generate_report_chunks(self):
"""
Conducts research using the researcher and generates the report in chunks.
:return: A list of report chunks.
"""
try:
# Asynchronous research and report generation
research_result = await self.researcher.conduct_research()
report = await self.researcher.write_report()
# Retrieve additional research details
research_context = self.researcher.get_research_context() or {}
research_costs = self.researcher.get_costs() or {}
research_images = self.researcher.get_research_images() or []
research_sources = self.researcher.get_research_sources() or []
# Construct the complete research response
full_report = {
"report": report,
"context": research_context,
"costs": research_costs,
"images": research_images,
"sources": research_sources,
}
# Generate chunks for streaming
return self._split_into_chunks(full_report)
except Exception as e:
# Handle potential errors during research and report generation
raise RuntimeError(f"Error generating report chunks: {e}")
def _split_into_chunks(self, report):
"""
Splits the report dictionary into smaller chunks for easier streaming.
:param report: A dictionary containing the full report data.
:return: A list of formatted text chunks.
"""
if not report:
raise ValueError("Cannot split an empty or None report into chunks.")
chunks = []
for key, value in report.items():
chunks.append(f"{key}: {value}")
return chunks
def get_query_details(self): def get_query_details(self):
""" """
Retrieves the details of the query and report type. Returns details of the query and report type.
:return: A dictionary containing the query and report type.
""" """
return { return {
"query": self.query, "query": self.query,