Refactor report generation with async streaming and logging
Enhanced report generation by integrating `CustomLogsHandler` and separating tasks for generating reports and streaming logs. Replaced the previous implementation with a unified async generator to handle concurrent execution and improve error handling. Updated module exports to include the new `CustomLogsHandler` component.
This commit is contained in:
parent
6ee47f02c0
commit
1472e277bc
2 changed files with 36 additions and 16 deletions
46
src/main.py
46
src/main.py
|
@ -1,6 +1,6 @@
|
||||||
from fastapi import FastAPI, HTTPException, Request, Depends
|
from fastapi import FastAPI, HTTPException, Request, Depends
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from phoenix_technologies import ReportGenerator
|
from phoenix_technologies import ReportGenerator, CustomLogsHandler
|
||||||
from fastapi.responses import StreamingResponse
|
from fastapi.responses import StreamingResponse
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
|
@ -33,14 +33,20 @@ async def get_report_endpoint(request: ReportRequest):
|
||||||
Expose the `get_report` function as a POST API endpoint, with a streaming response.
|
Expose the `get_report` function as a POST API endpoint, with a streaming response.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async def generate_report():
|
# Initialize the ReportGenerator and CustomLogsHandler
|
||||||
try:
|
|
||||||
# Call the asynchronous get_report function
|
|
||||||
yield "Report generation started...\n"
|
|
||||||
generator = ReportGenerator(request.query, request.report_type)
|
generator = ReportGenerator(request.query, request.report_type)
|
||||||
custom_logs_handler = generator.init()
|
custom_logs_handler = generator.init()
|
||||||
generator.generate_report()
|
|
||||||
yield "Report generation completed successfully!\n"
|
# Define a coroutine to run `generate_report` in a separate thread
|
||||||
|
async def generate_report_thread(generator: ReportGenerator):
|
||||||
|
try:
|
||||||
|
await asyncio.to_thread(generator.generate_report)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during report generation: {str(e)}")
|
||||||
|
|
||||||
|
# Define a coroutine for streaming logs
|
||||||
|
async def get_logs_thread(generator: ReportGenerator, custom_logs_handler: CustomLogsHandler):
|
||||||
|
try:
|
||||||
index = 0
|
index = 0
|
||||||
while not generator.complete:
|
while not generator.complete:
|
||||||
# If there are more logs to send, yield them
|
# If there are more logs to send, yield them
|
||||||
|
@ -51,13 +57,27 @@ async def get_report_endpoint(request: ReportRequest):
|
||||||
else:
|
else:
|
||||||
# Wait briefly to avoid aggressive looping
|
# Wait briefly to avoid aggressive looping
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
# Stop if processing is complete and no more logs remain
|
except Exception as e:
|
||||||
if generator.complete:
|
print(f"Error while fetching logs: {str(e)}")
|
||||||
break
|
yield f"Error: {str(e)}"
|
||||||
|
|
||||||
|
# Define an asynchronous generator to stream output
|
||||||
|
async def combined_stream():
|
||||||
|
try:
|
||||||
|
# Run both tasks concurrently
|
||||||
|
task1 = asyncio.create_task(generate_report_thread(generator))
|
||||||
|
task2 = asyncio.create_task(get_logs_thread(generator, custom_logs_handler))
|
||||||
|
|
||||||
|
# Wait for logs and stream output
|
||||||
|
async for log_entry in get_logs_thread(generator, custom_logs_handler):
|
||||||
|
yield log_entry
|
||||||
|
|
||||||
|
# Wait for both tasks to finish
|
||||||
|
await asyncio.gather(task1, task2)
|
||||||
|
|
||||||
|
yield "\nReport generation completed successfully!\n"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
yield f"Error: {str(e)}"
|
yield f"Error: {str(e)}"
|
||||||
|
|
||||||
# Return streaming response
|
# Return the combined async generator as a streaming response
|
||||||
return StreamingResponse(generate_report(), media_type="text/plain")
|
return StreamingResponse(combined_stream(), media_type="text/plain")
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# phoenix-technologies/__init__.py
|
# phoenix-technologies/__init__.py
|
||||||
from .gptresearch.deepresearch import ReportGenerator
|
from .gptresearch.deepresearch import ReportGenerator, CustomLogsHandler
|
||||||
|
|
||||||
__all__ = ["ReportGenerator"]
|
__all__ = ["ReportGenerator", "CustomLogsHandler"]
|
Loading…
Add table
Add a link
Reference in a new issue