fix: Multiple fixes for server shutdown (fix lifespan handling; fix handling CancelledError when raised by provider; let uvicorn handle signals) (#1495)

# What does this PR do?

If implementation raises CancelledError (e.g. when it runs its own async
loop for jobs), the main server shutdown handler gets confused and
doesn't attempt to shut down the main loop tasks.

While at it, also fixing the following failure when this happens:

```
UnboundLocalError: cannot access local variable 'loop' where it is not
associated with a value
```

Shutdown handlers were not running because lifespan logic was broken
since ~Oct 2024. Fixed that too and enforcing `lifespan` now (making
sure server will crash when it fails to interact with app through
middleware).

[//]: # (If resolving an issue, uncomment and update the line below)
[//]: # (Closes #[issue-number])

## Test Plan

Spotted while working on
https://github.com/meta-llama/llama-stack/pull/1437

One way to trigger it without the PR above is to add `raise
CancelledError` in
any of the running providers' `shutdown` methods; then `kill -INT <pid>`
the
server process.

Validated this with the following test patch:

```
diff --git a/llama_stack/distribution/server/server.py b/llama_stack/distribution/server/server.py
index b85c463a..10dad83e 100644
--- a/llama_stack/distribution/server/server.py
+++ b/llama_stack/distribution/server/server.py
@@ -174,6 +174,7 @@ def handle_signal(app, signum, _) -> None:
         except asyncio.CancelledError:
             pass
         finally:
+            logger.info("Stopping event loop")
             loop.stop()
 
     loop = asyncio.get_running_loop()
diff --git a/llama_stack/providers/inline/post_training/torchtune/post_training.py b/llama_stack/providers/inline/post_training/torchtune/post_training.py
index b837362d..163f43d8 100644
--- a/llama_stack/providers/inline/post_training/torchtune/post_training.py
+++ b/llama_stack/providers/inline/post_training/torchtune/post_training.py
@@ -3,6 +3,7 @@
 #
 # This source code is licensed under the terms described in the LICENSE file in
 # the root directory of this source tree.
+import asyncio
 from datetime import datetime
 from typing import Any, Dict, Optional
 
@@ -43,6 +44,9 @@ class TorchtunePostTrainingImpl:
         self.jobs = {}
         self.checkpoints_dict = {}
 
+    async def shutdown(self) -> None:
+        raise asyncio.CancelledError("Shutdown")
+
     async def supervised_fine_tune(
         self,
         job_uuid: str,
```

Without the fix:

```
INFO:     Uvicorn running on http://['::', '0.0.0.0']:8321 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Finished server process [52099]
INFO     2025-03-07 23:25:33,548 __main__:143 server: Received signal SIGINT (2). Exiting gracefully...
INFO     2025-03-07 23:25:33,550 __main__:150 server: Shutting down DatasetsRoutingTable
INFO     2025-03-07 23:25:33,551 __main__:177 server: Stopping event loop
ERROR    2025-03-07 23:25:33,552 asyncio:1785 uncategorized: unhandled exception during asyncio.run() shutdown
         task: <Task finished name='Task-12' coro=<handle_signal.<locals>.shutdown() done, defined at
         /home/ec2-user/src/llama-stack/schedule/llama_stack/distribution/server/server.py:145>
         exception=UnboundLocalError("cannot access local variable 'loop' where it is not associated with a value")>
         ╭───────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────╮
         │ /home/ec2-user/src/llama-stack/schedule/llama_stack/distribution/server/server.py:178 in shutdown           │
         │                                                                                                             │
         │   175 │   │   │   pass                                                                                      │
         │   176 │   │   finally:                                                                                      │
         │   177 │   │   │   logger.info("Stopping event loop")                                                        │
         │ ❱ 178 │   │   │   loop.stop()                                                                               │
         │   179 │                                                                                                     │
         │   180 │   loop = asyncio.get_running_loop()                                                                 │
         │   181 │   loop.create_task(shutdown())                                                                      │
         ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
         UnboundLocalError: cannot access local variable 'loop' where it is not associated with a value

```

With the fix, now seeing the following messages when the server is
killed:

```
INFO:     Uvicorn running on http://['::', '0.0.0.0']:8321 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Finished server process [50836]
INFO     2025-03-07 23:20:35,182 __main__:143 server: Received signal SIGINT (2). Exiting gracefully...
INFO     2025-03-07 23:20:35,184 __main__:149 server: Shutting down DatasetsRoutingTable
ERROR    2025-03-07 23:20:35,185 __main__:158 server: Failed to shutdown DatasetsRoutingTable: {CancelledError()}
         ╭───────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────╮
         │ /usr/lib64/python3.11/asyncio/tasks.py:476 in wait_for                                                      │
         │                                                                                                             │
         │   473 │   try:                                                                                              │
         │   474 │   │   # wait until the future completes or the timeout                                              │
         │   475 │   │   try:                                                                                          │
         │ ❱ 476 │   │   │   await waiter                                                                              │
         │   477 │   │   except exceptions.CancelledError:                                                             │
         │   478 │   │   │   if fut.done():                                                                            │
         │   479 │   │   │   │   return fut.result()                                                                   │
         ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
         CancelledError

         During handling of the above exception, another exception occurred:

         ╭───────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────╮
         │ /home/ec2-user/src/llama-stack/schedule/llama_stack/distribution/server/server.py:152 in shutdown           │
         │                                                                                                             │
         │   149 │   │   │   logger.info("Shutting down %s", impl_name)                                                │
         │   150 │   │   │   try:                                                                                      │
         │   151 │   │   │   │   if hasattr(impl, "shutdown"):                                                         │
         │ ❱ 152 │   │   │   │   │   await asyncio.wait_for(impl.shutdown(), timeout=5)                                │
         │   153 │   │   │   │   else:                                                                                 │
         │   154 │   │   │   │   │   logger.warning("No shutdown method for %s", impl_name)                            │
         │   155 │   │   │   except asyncio.TimeoutError:                                                              │
         │                                                                                                             │
         │ /usr/lib64/python3.11/asyncio/tasks.py:479 in wait_for                                                      │
         │                                                                                                             │
         │   476 │   │   │   await waiter                                                                              │
         │   477 │   │   except exceptions.CancelledError:                                                             │
         │   478 │   │   │   if fut.done():                                                                            │
         │ ❱ 479 │   │   │   │   return fut.result()                                                                   │
         │   480 │   │   │   else:                                                                                     │
         │   481 │   │   │   │   fut.remove_done_callback(cb)                                                          │
         │   482 │   │   │   │   # We must ensure that the task is not running                                         │
         │                                                                                                             │
         │ /home/ec2-user/src/llama-stack/schedule/llama_stack/distribution/routers/routing_tables.py:131 in shutdown  │
         │                                                                                                             │
         │   128 │   │   │   elif api == Api.tool_runtime:                                                             │
         │   129 │   │   │   │   p.tool_store = self                                                                   │
         │   130 │                                                                                                     │
         │ ❱ 131 │   async def shutdown(self) -> None:                                                                 │
         │   132 │   │   for p in self.impls_by_provider_id.values():                                                  │
         │   133 │   │   │   await p.shutdown()                                                                        │
         │   134                                                                                                       │
         ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
         CancelledError
INFO     2025-03-07 23:20:35,295 __main__:149 server: Shutting down DatasetIORouter
INFO     2025-03-07 23:20:35,296 __main__:149 server: Shutting down ScoringFunctionsRoutingTable
INFO     2025-03-07 23:20:35,297 __main__:149 server: Shutting down ScoringRouter
INFO     2025-03-07 23:20:35,298 __main__:149 server: Shutting down ModelsRoutingTable
INFO     2025-03-07 23:20:35,299 __main__:149 server: Shutting down InferenceRouter
INFO     2025-03-07 23:20:35,300 __main__:149 server: Shutting down ShieldsRoutingTable
INFO     2025-03-07 23:20:35,300 __main__:149 server: Shutting down SafetyRouter
INFO     2025-03-07 23:20:35,301 __main__:149 server: Shutting down VectorDBsRoutingTable
INFO     2025-03-07 23:20:35,302 __main__:149 server: Shutting down VectorIORouter
INFO     2025-03-07 23:20:35,303 __main__:149 server: Shutting down ToolGroupsRoutingTable
INFO     2025-03-07 23:20:35,304 __main__:149 server: Shutting down ToolRuntimeRouter
INFO     2025-03-07 23:20:35,304 __main__:149 server: Shutting down MetaReferenceAgentsImpl
INFO     2025-03-07 23:20:35,305 __main__:149 server: Shutting down TelemetryAdapter
INFO     2025-03-07 23:20:35,306 __main__:149 server: Shutting down TorchtunePostTrainingImpl
ERROR    2025-03-07 23:20:35,307 __main__:158 server: Failed to shutdown TorchtunePostTrainingImpl:
         {CancelledError('Shutdown')}
         ╭───────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────╮
         │ /home/ec2-user/src/llama-stack/schedule/llama_stack/distribution/server/server.py:152 in shutdown           │
         │                                                                                                             │
         │   149 │   │   │   logger.info("Shutting down %s", impl_name)                                                │
         │   150 │   │   │   try:                                                                                      │
         │   151 │   │   │   │   if hasattr(impl, "shutdown"):                                                         │
         │ ❱ 152 │   │   │   │   │   await asyncio.wait_for(impl.shutdown(), timeout=5)                                │
         │   153 │   │   │   │   else:                                                                                 │
         │   154 │   │   │   │   │   logger.warning("No shutdown method for %s", impl_name)                            │
         │   155 │   │   │   except asyncio.TimeoutError:                                                              │
         │                                                                                                             │
         │ /usr/lib64/python3.11/asyncio/tasks.py:489 in wait_for                                                      │
         │                                                                                                             │
         │   486 │   │   │   │   raise                                                                                 │
         │   487 │   │                                                                                                 │
         │   488 │   │   if fut.done():                                                                                │
         │ ❱ 489 │   │   │   return fut.result()                                                                       │
         │   490 │   │   else:                                                                                         │
         │   491 │   │   │   fut.remove_done_callback(cb)                                                              │
         │   492 │   │   │   # We must ensure that the task is not running                                             │
         │                                                                                                             │
         │ /home/ec2-user/src/llama-stack/schedule/llama_stack/providers/inline/post_training/torchtune/post_training. │
         │ py:48 in shutdown                                                                                           │
         │                                                                                                             │
         │    45 │   │   self.checkpoints_dict = {}                                                                    │
         │    46 │                                                                                                     │
         │    47 │   async def shutdown(self) -> None:                                                                 │
         │ ❱  48 │   │   raise asyncio.CancelledError("Shutdown")                                                      │
         │    49 │                                                                                                     │
         │    50 │   async def supervised_fine_tune(                                                                   │
         │    51 │   │   self,                                                                                         │
         ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
         CancelledError: Shutdown
INFO     2025-03-07 23:20:35,352 __main__:149 server: Shutting down BenchmarksRoutingTable
INFO     2025-03-07 23:20:35,353 __main__:149 server: Shutting down EvalRouter
INFO     2025-03-07 23:20:35,354 __main__:149 server: Shutting down DistributionInspectImpl
INFO     2025-03-07 23:20:35,355 __main__:177 server: Stopping event loop
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/home/ec2-user/src/llama-stack/schedule/llama_stack/distribution/server/server.py", line 488, in <module>
    main()
  File "/home/ec2-user/src/llama-stack/schedule/llama_stack/distribution/server/server.py", line 476, in main
    uvicorn.run(**uvicorn_config)
  File "/home/ec2-user/src/llama-stack/schedule/venv/lib64/python3.11/site-packages/uvicorn/main.py", line 579, in run
    server.run()
  File "/home/ec2-user/src/llama-stack/schedule/venv/lib64/python3.11/site-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.11/asyncio/runners.py", line 189, in run
    with Runner(debug=debug) as runner:
  File "/usr/lib64/python3.11/asyncio/runners.py", line 63, in __exit__
    self.close()
  File "/usr/lib64/python3.11/asyncio/runners.py", line 71, in close
    _cancel_all_tasks(loop)
  File "/usr/lib64/python3.11/asyncio/runners.py", line 201, in _cancel_all_tasks
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
  File "/usr/lib64/python3.11/asyncio/base_events.py", line 652, in run_until_complete
    raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.
++ error_handler 104
++ echo 'Error occurred in script at line: 104'
Error occurred in script at line: 104
++ exit 1
```

With all patches included, the shutdown now looks as follows:

```
$ kill -INT $(ps ax | grep  llama_stack.distribution.server.server | grep -v nvim | awk -e '{print $1}' | sort | head -n 1)
```

```
20:56:09.308 [START]
INFO:     Uvicorn running on http://['::', '0.0.0.0']:8321 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO     2025-03-10 20:56:43,961 __main__:140 server: Shutting down
INFO     2025-03-10 20:56:43,962 __main__:124 server: Shutting down DatasetsRoutingTable
INFO     2025-03-10 20:56:43,964 __main__:124 server: Shutting down DatasetIORouter
INFO     2025-03-10 20:56:43,965 __main__:124 server: Shutting down ScoringFunctionsRoutingTable
INFO     2025-03-10 20:56:43,966 __main__:124 server: Shutting down ScoringRouter
INFO     2025-03-10 20:56:43,967 __main__:124 server: Shutting down ModelsRoutingTable
INFO     2025-03-10 20:56:43,968 __main__:124 server: Shutting down InferenceRouter
INFO     2025-03-10 20:56:43,969 __main__:124 server: Shutting down ShieldsRoutingTable
INFO     2025-03-10 20:56:43,971 __main__:124 server: Shutting down SafetyRouter
INFO     2025-03-10 20:56:43,972 __main__:124 server: Shutting down VectorDBsRoutingTable
INFO     2025-03-10 20:56:43,973 __main__:124 server: Shutting down VectorIORouter
INFO     2025-03-10 20:56:43,974 __main__:124 server: Shutting down ToolGroupsRoutingTable
INFO     2025-03-10 20:56:43,975 __main__:124 server: Shutting down ToolRuntimeRouter
INFO     2025-03-10 20:56:43,976 __main__:124 server: Shutting down MetaReferenceAgentsImpl
INFO     2025-03-10 20:56:43,977 __main__:124 server: Shutting down TelemetryAdapter
INFO     2025-03-10 20:56:43,978 __main__:124 server: Shutting down TorchtunePostTrainingImpl
WARNING  2025-03-10 20:56:43,979 __main__:129 server: No shutdown method for TorchtunePostTrainingImpl
INFO     2025-03-10 20:56:43,979 __main__:124 server: Shutting down BenchmarksRoutingTable
INFO     2025-03-10 20:56:43,980 __main__:124 server: Shutting down EvalRouter
INFO     2025-03-10 20:56:43,981 __main__:124 server: Shutting down DistributionInspectImpl
INFO:     Application shutdown complete.
INFO:     Finished server process [33862]
```

[//]: # (## Documentation)

---------

Signed-off-by: Ihar Hrachyshka <ihar.hrachyshka@gmail.com>
This commit is contained in:
Ihar Hrachyshka 2025-03-11 13:30:55 -04:00 committed by GitHub
parent d33b8ea3dc
commit aca82df7ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -6,11 +6,9 @@
import argparse
import asyncio
import functools
import inspect
import json
import os
import signal
import sys
import traceback
import warnings
@ -118,36 +116,12 @@ def translate_exception(exc: Exception) -> Union[HTTPException, RequestValidatio
)
def handle_signal(app, signum, _) -> None:
async def shutdown(app):
"""Initiate a graceful shutdown of the application.
Handled by the lifespan context manager. The shutdown process involves
shutting down all implementations registered in the application.
"""
Handle incoming signals and initiate a graceful shutdown of the application.
This function is intended to be used as a signal handler for various signals
(e.g., SIGINT, SIGTERM). Upon receiving a signal, it will print a message
indicating the received signal and initiate a shutdown process.
Args:
app: The application instance containing implementations to be shut down.
signum (int): The signal number received.
frame: The current stack frame (not used in this function).
The shutdown process involves:
- Shutting down all implementations registered in the application.
- Gathering all running asyncio tasks.
- Cancelling all gathered tasks.
- Waiting for all tasks to finish.
- Stopping the event loop.
Note:
This function schedules the shutdown process as an asyncio task and does
not block the current execution.
"""
signame = signal.Signals(signum).name
logger.info(f"Received signal {signame} ({signum}). Exiting gracefully...")
async def shutdown():
try:
# Gracefully shut down implementations
for impl in app.__llama_stack_impls__.values():
impl_name = impl.__class__.__name__
logger.info("Shutting down %s", impl_name)
@ -158,38 +132,16 @@ def handle_signal(app, signum, _) -> None:
logger.warning("No shutdown method for %s", impl_name)
except asyncio.TimeoutError:
logger.exception("Shutdown timeout for %s ", impl_name, exc_info=True)
except Exception as e:
except (Exception, asyncio.CancelledError) as e:
logger.exception("Failed to shutdown %s: %s", impl_name, {e})
# Gather all running tasks
loop = asyncio.get_running_loop()
tasks = [task for task in asyncio.all_tasks(loop) if task is not asyncio.current_task()]
# Cancel all tasks
for task in tasks:
task.cancel()
# Wait for all tasks to finish
try:
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=10)
except asyncio.TimeoutError:
logger.exception("Timeout while waiting for tasks to finish")
except asyncio.CancelledError:
pass
finally:
loop.stop()
loop = asyncio.get_running_loop()
loop.create_task(shutdown())
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting up")
yield
logger.info("Shutting down")
for impl in app.__llama_stack_impls__.values():
await impl.shutdown()
await shutdown(app)
def is_streaming_request(func_name: str, request: Request, **kwargs):
@ -266,7 +218,7 @@ class TracingMiddleware:
self.app = app
async def __call__(self, scope, receive, send):
path = scope["path"]
path = scope.get("path", "")
await start_trace(path, {"__location__": "server"})
try:
return await self.app(scope, receive, send)
@ -439,8 +391,6 @@ def main():
app.exception_handler(RequestValidationError)(global_exception_handler)
app.exception_handler(Exception)(global_exception_handler)
signal.signal(signal.SIGINT, functools.partial(handle_signal, app))
signal.signal(signal.SIGTERM, functools.partial(handle_signal, app))
app.__llama_stack_impls__ = impls
@ -471,6 +421,7 @@ def main():
"app": app,
"host": listen_host,
"port": port,
"lifespan": "on",
}
if ssl_config:
uvicorn_config.update(ssl_config)