From 1472e277bc7afef8278faf2af2837b58a9c48fba Mon Sep 17 00:00:00 2001 From: ThomasTaroni Date: Fri, 25 Apr 2025 22:07:25 +0200 Subject: [PATCH] Refactor report generation with async streaming and logging Enhanced report generation by integrating `CustomLogsHandler` and separating tasks for generating reports and streaming logs. Replaced the previous implementation with a unified async generator to handle concurrent execution and improve error handling. Updated module exports to include the new `CustomLogsHandler` component. --- src/main.py | 48 ++++++++++++++++++++-------- src/phoenix_technologies/__init__.py | 4 +-- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/main.py b/src/main.py index aa7b992..fa46b69 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,6 @@ from fastapi import FastAPI, HTTPException, Request, Depends from pydantic import BaseModel -from phoenix_technologies import ReportGenerator +from phoenix_technologies import ReportGenerator, CustomLogsHandler from fastapi.responses import StreamingResponse import os import asyncio @@ -33,14 +33,20 @@ async def get_report_endpoint(request: ReportRequest): Expose the `get_report` function as a POST API endpoint, with a streaming response. """ - async def generate_report(): + # Initialize the ReportGenerator and CustomLogsHandler + generator = ReportGenerator(request.query, request.report_type) + custom_logs_handler = generator.init() + + # Define a coroutine to run `generate_report` in a separate thread + async def generate_report_thread(generator: ReportGenerator): + try: + await asyncio.to_thread(generator.generate_report) + except Exception as e: + print(f"Error during report generation: {str(e)}") + + # Define a coroutine for streaming logs + async def get_logs_thread(generator: ReportGenerator, custom_logs_handler: CustomLogsHandler): try: - # Call the asynchronous get_report function - yield "Report generation started...\n" - generator = ReportGenerator(request.query, request.report_type) - custom_logs_handler = generator.init() - generator.generate_report() - yield "Report generation completed successfully!\n" index = 0 while not generator.complete: # If there are more logs to send, yield them @@ -51,13 +57,27 @@ async def get_report_endpoint(request: ReportRequest): else: # Wait briefly to avoid aggressive looping await asyncio.sleep(0.1) - # Stop if processing is complete and no more logs remain - if generator.complete: - break + except Exception as e: + print(f"Error while fetching logs: {str(e)}") + yield f"Error: {str(e)}" + # Define an asynchronous generator to stream output + async def combined_stream(): + try: + # Run both tasks concurrently + task1 = asyncio.create_task(generate_report_thread(generator)) + task2 = asyncio.create_task(get_logs_thread(generator, custom_logs_handler)) + + # Wait for logs and stream output + async for log_entry in get_logs_thread(generator, custom_logs_handler): + yield log_entry + + # Wait for both tasks to finish + await asyncio.gather(task1, task2) + + yield "\nReport generation completed successfully!\n" except Exception as e: yield f"Error: {str(e)}" - # Return streaming response - return StreamingResponse(generate_report(), media_type="text/plain") - + # Return the combined async generator as a streaming response + return StreamingResponse(combined_stream(), media_type="text/plain") diff --git a/src/phoenix_technologies/__init__.py b/src/phoenix_technologies/__init__.py index 142e2fc..d7e2062 100644 --- a/src/phoenix_technologies/__init__.py +++ b/src/phoenix_technologies/__init__.py @@ -1,4 +1,4 @@ # phoenix-technologies/__init__.py -from .gptresearch.deepresearch import ReportGenerator +from .gptresearch.deepresearch import ReportGenerator, CustomLogsHandler -__all__ = ["ReportGenerator"] \ No newline at end of file +__all__ = ["ReportGenerator", "CustomLogsHandler"] \ No newline at end of file