Refactor log streaming and report generation logic
Simplified the logic for log streaming by consolidating into a single async generator (`log_stream`). Removed redundant tasks and streamlined report generation to improve code readability and maintainability.
This commit is contained in:
parent
1472e277bc
commit
e2b4ba5a7d
1 changed files with 17 additions and 23 deletions
40
src/main.py
40
src/main.py
|
|
@ -1,4 +1,4 @@
|
||||||
from fastapi import FastAPI, HTTPException, Request, Depends
|
from fastapi import FastAPI, HTTPException, Request, Depends
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from phoenix_technologies import ReportGenerator, CustomLogsHandler
|
from phoenix_technologies import ReportGenerator, CustomLogsHandler
|
||||||
from fastapi.responses import StreamingResponse
|
from fastapi.responses import StreamingResponse
|
||||||
|
|
@ -8,17 +8,21 @@ import asyncio
|
||||||
# FastAPI app instance
|
# FastAPI app instance
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
|
|
||||||
# Define a request body structure using Pydantic
|
# Define a request body structure using Pydantic
|
||||||
class ReportRequest(BaseModel):
|
class ReportRequest(BaseModel):
|
||||||
query: str
|
query: str
|
||||||
report_type: str
|
report_type: str
|
||||||
|
|
||||||
|
|
||||||
# Define a dependency to validate the API Key
|
# Define a dependency to validate the API Key
|
||||||
def verify_api_key(request: Request):
|
def verify_api_key(request: Request):
|
||||||
# Define the API key from the environment variables
|
# Define the API key from the environment variables
|
||||||
expected_api_key = os.getenv("API_KEY", None)
|
expected_api_key = os.getenv("API_KEY", None)
|
||||||
if not expected_api_key:
|
if not expected_api_key:
|
||||||
raise HTTPException(status_code=500, detail="API key is not configured on the server.")
|
raise HTTPException(
|
||||||
|
status_code=500, detail="API key is not configured on the server."
|
||||||
|
)
|
||||||
|
|
||||||
# Get the API key from the request headers
|
# Get the API key from the request headers
|
||||||
provided_api_key = request.headers.get("X-API-KEY", None)
|
provided_api_key = request.headers.get("X-API-KEY", None)
|
||||||
|
|
@ -27,6 +31,7 @@ def verify_api_key(request: Request):
|
||||||
if not provided_api_key or provided_api_key != expected_api_key:
|
if not provided_api_key or provided_api_key != expected_api_key:
|
||||||
raise HTTPException(status_code=403, detail="Invalid or missing API key.")
|
raise HTTPException(status_code=403, detail="Invalid or missing API key.")
|
||||||
|
|
||||||
|
|
||||||
@app.post("/get_report", dependencies=[Depends(verify_api_key)])
|
@app.post("/get_report", dependencies=[Depends(verify_api_key)])
|
||||||
async def get_report_endpoint(request: ReportRequest):
|
async def get_report_endpoint(request: ReportRequest):
|
||||||
"""
|
"""
|
||||||
|
|
@ -40,12 +45,13 @@ async def get_report_endpoint(request: ReportRequest):
|
||||||
# Define a coroutine to run `generate_report` in a separate thread
|
# Define a coroutine to run `generate_report` in a separate thread
|
||||||
async def generate_report_thread(generator: ReportGenerator):
|
async def generate_report_thread(generator: ReportGenerator):
|
||||||
try:
|
try:
|
||||||
|
# Run blocking code in a thread pool
|
||||||
await asyncio.to_thread(generator.generate_report)
|
await asyncio.to_thread(generator.generate_report)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error during report generation: {str(e)}")
|
print(f"Error during report generation: {str(e)}")
|
||||||
|
|
||||||
# Define a coroutine for streaming logs
|
# Define an asynchronous generator for streaming logs
|
||||||
async def get_logs_thread(generator: ReportGenerator, custom_logs_handler: CustomLogsHandler):
|
async def log_stream():
|
||||||
try:
|
try:
|
||||||
index = 0
|
index = 0
|
||||||
while not generator.complete:
|
while not generator.complete:
|
||||||
|
|
@ -57,27 +63,15 @@ async def get_report_endpoint(request: ReportRequest):
|
||||||
else:
|
else:
|
||||||
# Wait briefly to avoid aggressive looping
|
# Wait briefly to avoid aggressive looping
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
# After completion, include a success message
|
||||||
|
yield "\nReport generation completed successfully!\n"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error while fetching logs: {str(e)}")
|
print(f"Error while fetching logs: {str(e)}")
|
||||||
yield f"Error: {str(e)}"
|
yield f"Error: {str(e)}"
|
||||||
|
|
||||||
# Define an asynchronous generator to stream output
|
# Run the report generation task concurrently with the log streaming
|
||||||
async def combined_stream():
|
asyncio.create_task(generate_report_thread(generator))
|
||||||
try:
|
|
||||||
# Run both tasks concurrently
|
|
||||||
task1 = asyncio.create_task(generate_report_thread(generator))
|
|
||||||
task2 = asyncio.create_task(get_logs_thread(generator, custom_logs_handler))
|
|
||||||
|
|
||||||
# Wait for logs and stream output
|
# Return the `log_stream` async generator as a streaming response
|
||||||
async for log_entry in get_logs_thread(generator, custom_logs_handler):
|
return StreamingResponse(log_stream(), media_type="text/plain")
|
||||||
yield log_entry
|
|
||||||
|
|
||||||
# Wait for both tasks to finish
|
|
||||||
await asyncio.gather(task1, task2)
|
|
||||||
|
|
||||||
yield "\nReport generation completed successfully!\n"
|
|
||||||
except Exception as e:
|
|
||||||
yield f"Error: {str(e)}"
|
|
||||||
|
|
||||||
# Return the combined async generator as a streaming response
|
|
||||||
return StreamingResponse(combined_stream(), media_type="text/plain")
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue