forked from phoenix-oss/llama-stack-mirror
# What does this PR do? --run runs the stack that was just build using the same arguments during the build process (image-name, type, etc) This simplifies the workflow a lot and makes the UX better for most local users trying to get started rather than having to match the flags of the two commands (build and then run) Also, moved `ImageType` to distribution.utils since there were circular import errors with its old location ## Test Plan tested locally using the following command: `llama stack build --run --template ollama --image-type venv` Signed-off-by: Charlie Doern <cdoern@redhat.com>
233 lines
7.2 KiB
Python
233 lines
7.2 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
import errno
|
|
import logging
|
|
import os
|
|
import select
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
|
|
from termcolor import cprint
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
import importlib
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from llama_stack.distribution.utils.image_types import ImageType
|
|
|
|
|
|
def formulate_run_args(image_type, image_name, config, template_name) -> list:
|
|
if image_type == ImageType.container.value or config.container_image:
|
|
script = importlib.resources.files("llama_stack") / "distribution/start_container.sh"
|
|
image_name = f"distribution-{template_name}" if template_name else config.container_image
|
|
run_args = [script, image_name]
|
|
elif image_type == ImageType.conda.value:
|
|
current_conda_env = os.environ.get("CONDA_DEFAULT_ENV")
|
|
image_name = image_name or current_conda_env
|
|
if not image_name:
|
|
cprint(
|
|
"No current conda environment detected, please specify a conda environment name with --image-name",
|
|
color="red",
|
|
)
|
|
return
|
|
|
|
def get_conda_prefix(env_name):
|
|
# Conda "base" environment does not end with "base" in the
|
|
# prefix, so should be handled separately.
|
|
if env_name == "base":
|
|
return os.environ.get("CONDA_PREFIX")
|
|
# Get conda environments info
|
|
conda_env_info = json.loads(subprocess.check_output(["conda", "info", "--envs", "--json"]).decode())
|
|
envs = conda_env_info["envs"]
|
|
for envpath in envs:
|
|
if envpath.endswith(env_name):
|
|
return envpath
|
|
return None
|
|
|
|
print(f"Using conda environment: {image_name}")
|
|
conda_prefix = get_conda_prefix(image_name)
|
|
if not conda_prefix:
|
|
cprint(
|
|
f"Conda environment {image_name} does not exist.",
|
|
color="red",
|
|
)
|
|
return
|
|
|
|
build_file = Path(conda_prefix) / "llamastack-build.yaml"
|
|
if not build_file.exists():
|
|
cprint(
|
|
f"Build file {build_file} does not exist.\n\nPlease run `llama stack build` or specify the correct conda environment name with --image-name",
|
|
color="red",
|
|
)
|
|
return
|
|
|
|
script = importlib.resources.files("llama_stack") / "distribution/start_conda_env.sh"
|
|
run_args = [
|
|
script,
|
|
image_name,
|
|
]
|
|
else:
|
|
# else must be venv since that is the only valid option left.
|
|
current_venv = os.environ.get("VIRTUAL_ENV")
|
|
venv = image_name or current_venv
|
|
script = importlib.resources.files("llama_stack") / "distribution/start_venv.sh"
|
|
run_args = [
|
|
script,
|
|
venv,
|
|
]
|
|
return run_args
|
|
|
|
|
|
def run_with_pty(command):
|
|
if sys.platform.startswith("win"):
|
|
return _run_with_pty_win(command)
|
|
else:
|
|
return _run_with_pty_unix(command)
|
|
|
|
|
|
def in_notebook():
|
|
try:
|
|
from IPython import get_ipython
|
|
|
|
if "IPKernelApp" not in get_ipython().config: # pragma: no cover
|
|
return False
|
|
except ImportError:
|
|
return False
|
|
except AttributeError:
|
|
return False
|
|
return True
|
|
|
|
|
|
# run a command in a pseudo-terminal, with interrupt handling,
|
|
# useful when you want to run interactive things
|
|
def _run_with_pty_unix(command):
|
|
import pty
|
|
import termios
|
|
|
|
master, slave = pty.openpty()
|
|
|
|
old_settings = termios.tcgetattr(sys.stdin)
|
|
original_sigint = signal.getsignal(signal.SIGINT)
|
|
|
|
ctrl_c_pressed = False
|
|
process = None
|
|
|
|
def sigint_handler(signum, frame):
|
|
nonlocal ctrl_c_pressed
|
|
ctrl_c_pressed = True
|
|
log.info("\nCtrl-C detected. Aborting...")
|
|
|
|
try:
|
|
# Set up the signal handler
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
|
|
new_settings = termios.tcgetattr(sys.stdin)
|
|
new_settings[3] = new_settings[3] & ~termios.ECHO # Disable echo
|
|
new_settings[3] = new_settings[3] & ~termios.ICANON # Disable canonical mode
|
|
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)
|
|
|
|
process = subprocess.Popen(
|
|
command,
|
|
stdin=slave,
|
|
stdout=slave,
|
|
stderr=slave,
|
|
universal_newlines=True,
|
|
preexec_fn=os.setsid,
|
|
)
|
|
|
|
# Close the slave file descriptor as it's now owned by the subprocess
|
|
os.close(slave)
|
|
|
|
def handle_io():
|
|
while not ctrl_c_pressed:
|
|
try:
|
|
rlist, _, _ = select.select([sys.stdin, master], [], [], 0.1)
|
|
|
|
if sys.stdin in rlist:
|
|
data = os.read(sys.stdin.fileno(), 1024)
|
|
if not data:
|
|
break
|
|
os.write(master, data)
|
|
|
|
if master in rlist:
|
|
data = os.read(master, 1024)
|
|
if not data:
|
|
break
|
|
sys.stdout.buffer.write(data)
|
|
sys.stdout.flush()
|
|
|
|
except KeyboardInterrupt:
|
|
# This will be raised when Ctrl+C is pressed
|
|
break
|
|
|
|
if process.poll() is not None:
|
|
break
|
|
|
|
handle_io()
|
|
except (EOFError, KeyboardInterrupt):
|
|
pass
|
|
except OSError as e:
|
|
if e.errno != errno.EIO:
|
|
raise
|
|
finally:
|
|
# Clean up
|
|
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
|
|
signal.signal(signal.SIGINT, original_sigint)
|
|
|
|
os.close(master)
|
|
if process and process.poll() is None:
|
|
process.terminate()
|
|
process.wait()
|
|
|
|
return process.returncode
|
|
|
|
|
|
# run a command in a pseudo-terminal in windows, with interrupt handling,
|
|
def _run_with_pty_win(command):
|
|
"""
|
|
Runs a command with interactive support using subprocess directly.
|
|
"""
|
|
try:
|
|
# For shell scripts on Windows, use appropriate shell
|
|
if isinstance(command, (list, tuple)):
|
|
if command[0].endswith(".sh"):
|
|
if os.path.exists("/usr/bin/bash"): # WSL
|
|
command = ["bash"] + command
|
|
else:
|
|
# Use cmd.exe with bash while preserving all arguments
|
|
command = ["cmd.exe", "/c", "bash"] + command
|
|
|
|
process = subprocess.Popen(
|
|
command,
|
|
shell=True,
|
|
universal_newlines=True,
|
|
)
|
|
|
|
process.wait()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
return 1
|
|
finally:
|
|
if process and process.poll() is None:
|
|
process.terminate()
|
|
process.wait()
|
|
return process.returncode
|
|
|
|
|
|
def run_command(command):
|
|
try:
|
|
result = subprocess.run(command, capture_output=True, text=True, check=True)
|
|
print("Script Output\n", result.stdout)
|
|
return result.returncode
|
|
except subprocess.CalledProcessError as e:
|
|
print("Error running script:", e)
|
|
print("Error output:", e.stderr)
|
|
return e.returncode
|