Refactor report generation with simplified log streaming

Replaced custom log handler and async report generation logic with a simplified fake data streamer for the StreamingResponse. Added uvicorn server startup code for direct script execution.
This commit is contained in:
ThomasTaroni 2025-04-26 08:38:02 +02:00
parent e2b4ba5a7d
commit 73e929ca00

View file

@ -1,19 +1,23 @@
import uvicorn
from fastapi import FastAPI, HTTPException, Request, Depends
from pydantic import BaseModel
from phoenix_technologies import ReportGenerator, CustomLogsHandler
from fastapi.responses import StreamingResponse
from typing import Dict, Any, AsyncGenerator, Coroutine, Generator
import os
import asyncio
import time
# FastAPI app instance
app = FastAPI()
# Define a request body structure using Pydantic
class ReportRequest(BaseModel):
query: str
report_type: str
# Shared log array using asyncio.Queue
log_queue = asyncio.Queue()
# Define a dependency to validate the API Key
def verify_api_key(request: Request):
@ -38,40 +42,14 @@ async def get_report_endpoint(request: ReportRequest):
Expose the `get_report` function as a POST API endpoint, with a streaming response.
"""
# Initialize the ReportGenerator and CustomLogsHandler
generator = ReportGenerator(request.query, request.report_type)
custom_logs_handler = generator.init()
def fake_data_streamer():
for i in range(5):
yield f"My custom Log: {i}"
time.sleep(5)
# Define a coroutine to run `generate_report` in a separate thread
async def generate_report_thread(generator: ReportGenerator):
try:
# Run blocking code in a thread pool
await asyncio.to_thread(generator.generate_report)
except Exception as e:
print(f"Error during report generation: {str(e)}")
# Return streaming response
return StreamingResponse(fake_data_streamer(), media_type="text/plain")
# Define an asynchronous generator for streaming logs
async def log_stream():
try:
index = 0
while not generator.complete:
# If there are more logs to send, yield them
if index < len(custom_logs_handler.logs):
log_entry = custom_logs_handler.logs[index]
index += 1
yield f"{log_entry}\n" # Convert logs to string for streaming
else:
# Wait briefly to avoid aggressive looping
await asyncio.sleep(0.1)
# After completion, include a success message
yield "\nReport generation completed successfully!\n"
except Exception as e:
print(f"Error while fetching logs: {str(e)}")
yield f"Error: {str(e)}"
# Run the report generation task concurrently with the log streaming
asyncio.create_task(generate_report_thread(generator))
# Return the `log_stream` async generator as a streaming response
return StreamingResponse(log_stream(), media_type="text/plain")
if __name__ == "__main__":
uvicorn.run(app='main:app', host="127.0.0.1", port=8000)