Changed `send_json` to return `None` instead of yielding and updated log retrieval to use `yield` in place of `return`. These modifications align function behavior with intended asynchronous use cases and improve consistency.
68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
from gpt_researcher import GPTResearcher
|
|
from typing import Dict, Any, AsyncGenerator, Coroutine
|
|
|
|
|
|
class CustomLogsHandler:
|
|
"""A custom Logs handler class to handle JSON data."""
|
|
def __init__(self, logs=None):
|
|
if logs is None:
|
|
logs = []
|
|
self.logs = logs # Initialize logs to store data
|
|
|
|
async def send_json(self, data: Dict[str, Any]) -> None:
|
|
"""Send JSON data and log it."""
|
|
self.logs.append(data) # Append data to logs
|
|
print(f"My custom Log: {data}")
|
|
|
|
class ReportGenerator:
|
|
def __init__(self, query: str, report_type: str):
|
|
"""
|
|
Initializes the ReportGenerator with a query and report type.
|
|
"""
|
|
self.query = query
|
|
self.report_type = report_type
|
|
# Initialize researcher with a custom WebSocket
|
|
self.logs = []
|
|
self.custom_logs_handler = CustomLogsHandler(self.logs)
|
|
|
|
self.researcher = GPTResearcher(query, report_type, websocket=self.custom_logs_handler)
|
|
|
|
async def generate_report(self) -> CustomLogsHandler:
|
|
"""
|
|
Conducts research and generates the report along with additional information.
|
|
"""
|
|
# Conduct research
|
|
research_result = await self.researcher.conduct_research()
|
|
report = await self.researcher.write_report()
|
|
|
|
# Retrieve additional information
|
|
research_context = self.researcher.get_research_context()
|
|
research_costs = self.researcher.get_costs()
|
|
research_images = self.researcher.get_research_images()
|
|
research_sources = self.researcher.get_research_sources()
|
|
|
|
return self.custom_logs_handler
|
|
|
|
def get_query_details(self):
|
|
"""
|
|
Returns details of the query and report type.
|
|
"""
|
|
return {
|
|
"query": self.query,
|
|
"report_type": self.report_type
|
|
}
|
|
|
|
# Implementing the asynchronous iterator protocol
|
|
def __aiter__(self):
|
|
"""Initialize and return the asynchronous iterator."""
|
|
self._log_index = 0 # Iterator index
|
|
return self
|
|
|
|
async def __anext__(self):
|
|
"""Return the next log asynchronously."""
|
|
if self._log_index < len(self.logs):
|
|
log = self.logs[self._log_index]
|
|
self._log_index += 1
|
|
yield log # Return the next log
|
|
else:
|
|
raise StopAsyncIteration # Stop when logs are exhausted
|