From 8fa2fb4f33f0815d810a3f64fd9e8550a82af8df Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Mon, 17 Mar 2025 21:50:24 -0700 Subject: [PATCH] test: use a more robust mechanism for detect loop blocking --- .../providers/inference/test_remote_vllm.py | 55 +++++++++++++------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/tests/unit/providers/inference/test_remote_vllm.py b/tests/unit/providers/inference/test_remote_vllm.py index cb0997e1a..881cf6f1f 100644 --- a/tests/unit/providers/inference/test_remote_vllm.py +++ b/tests/unit/providers/inference/test_remote_vllm.py @@ -5,6 +5,7 @@ # the root directory of this source tree. import asyncio +import contextlib import json import logging import threading @@ -182,13 +183,41 @@ async def test_process_vllm_chat_completion_stream_response_no_choices(): assert len(chunks) == 0 -def test_chat_completion_doesnt_block_event_loop(caplog): - loop = asyncio.new_event_loop() - loop.set_debug(True) - caplog.set_level(logging.WARNING) +@contextlib.asynccontextmanager +async def detect_blocking(threshold=0.1): + """Context manager to detect blocking in an event loop.""" + block_detected = False + watchdog_active = True - # Log when event loop is blocked for more than 200ms - loop.slow_callback_duration = 0.2 + last_check = time.monotonic() + + async def watchdog(): + nonlocal block_detected, last_check + while watchdog_active: + now = time.monotonic() + # If our check is significantly delayed, we might be blocked + if now - last_check > threshold: + block_detected = True + last_check = now + await asyncio.sleep(threshold / 3) + + watchdog_task = asyncio.create_task(watchdog()) + + try: + yield + finally: + watchdog_active = False + watchdog_task.cancel() + try: + await watchdog_task + except asyncio.CancelledError: + pass + + assert not block_detected, f"Event loop was blocked for more than {threshold}s" + + +@pytest.mark.asyncio +async def test_chat_completion_doesnt_block_event_loop(): # Sleep for 500ms in our delayed http response sleep_time = 0.5 @@ -220,15 +249,7 @@ def test_chat_completion_doesnt_block_event_loop(caplog): with MockInferenceAdapterWithSleep(sleep_time, mock_response) as inference_adapter: inference_adapter.model_store = AsyncMock() inference_adapter.model_store.get_model.return_value = mock_model - loop.run_until_complete(inference_adapter.initialize()) + await inference_adapter.initialize() - # Clear the logs so far and run the actual chat completion we care about - caplog.clear() - loop.run_until_complete(do_chat_completion()) - - # Ensure we don't have any asyncio warnings in the captured log - # records from our chat completion call. A message gets logged - # here any time we exceed the slow_callback_duration configured - # above. - asyncio_warnings = [record.message for record in caplog.records if record.name == "asyncio"] - assert not asyncio_warnings + async with detect_blocking(0.1): + await do_chat_completion()