fix: cancel scheduler tasks on shutdown

Otherwise the currently running tasks will never exit (before they
actually complete), which means the process can't be properly shut down
(only with SIGKILL).

Ideally, we let tasks know that they are about to shutdown and give them
some time to do so; but in the lack of the mechanism, it's better to
cancel than linger forever.

Signed-off-by: Ihar Hrachyshka <ihar.hrachyshka@gmail.com>
This commit is contained in:
Ihar Hrachyshka 2025-05-09 13:34:04 -04:00
parent 473a07f624
commit 1ceebdc813
2 changed files with 20 additions and 6 deletions

View file

@ -157,10 +157,14 @@ class _NaiveSchedulerBackend(_SchedulerBackend):
asyncio.set_event_loop(self._loop) asyncio.set_event_loop(self._loop)
self._loop.run_forever() self._loop.run_forever()
# When stopping the loop, give tasks a chance to finish # TODO: When stopping the loop, give tasks a chance to finish
# TODO: should we explicitly inform jobs of pending stoppage? # TODO: should we explicitly inform jobs of pending stoppage?
# cancel all tasks
for task in asyncio.all_tasks(self._loop): for task in asyncio.all_tasks(self._loop):
self._loop.run_until_complete(task) if not task.done():
task.cancel()
self._loop.close() self._loop.close()
async def shutdown(self) -> None: async def shutdown(self) -> None:

View file

@ -17,6 +17,15 @@ async def test_scheduler_unknown_backend():
Scheduler(backend="unknown") Scheduler(backend="unknown")
async def wait_for_job_completed(sched: Scheduler, job_id: str) -> None:
for _ in range(10):
job = sched.get_job(job_id)
if job.completed_at is not None:
return
await asyncio.sleep(0.1)
raise TimeoutError(f"Job {job_id} did not complete in time.")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scheduler_naive(): async def test_scheduler_naive():
sched = Scheduler() sched = Scheduler()
@ -52,6 +61,9 @@ async def test_scheduler_naive():
assert sched.get_jobs("unknown") == [] assert sched.get_jobs("unknown") == []
assert sched.get_jobs(job_type) == [sched.get_job(job_id)] assert sched.get_jobs(job_type) == [sched.get_job(job_id)]
# give the job handler a chance to run
await wait_for_job_completed(sched, job_id)
# now shut the scheduler down and make sure the job ran # now shut the scheduler down and make sure the job ran
await sched.shutdown() await sched.shutdown()
@ -92,10 +104,7 @@ async def test_scheduler_naive_handler_raises():
# confirm the exception made the job transition to failed state, even # confirm the exception made the job transition to failed state, even
# though it was set to `running` before the error # though it was set to `running` before the error
for _ in range(10): await wait_for_job_completed(sched, job_id)
if job.status == JobStatus.failed:
break
await asyncio.sleep(0.1)
assert job.status == JobStatus.failed assert job.status == JobStatus.failed
# confirm that the raised error got registered in log # confirm that the raised error got registered in log
@ -111,6 +120,7 @@ async def test_scheduler_naive_handler_raises():
job_id = "test_job_id2" job_id = "test_job_id2"
sched.schedule(job_type, job_id, successful_job_handler) sched.schedule(job_type, job_id, successful_job_handler)
await wait_for_job_completed(sched, job_id)
await sched.shutdown() await sched.shutdown()