Merge f93461de5e into sapling-pr-archive-ehhuang

This commit is contained in:
ehhuang 2025-10-07 09:52:01 -07:00 committed by GitHub
commit d103bf6c03
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
104 changed files with 15263 additions and 5592 deletions

View file

@ -444,12 +444,24 @@ def _run_stack_build_command_from_build_config(
cprint("Build Successful!", color="green", file=sys.stderr)
cprint(f"You can find the newly-built distribution here: {run_config_file}", color="blue", file=sys.stderr)
cprint(
"You can run the new Llama Stack distro via: "
+ colored(f"llama stack run {run_config_file} --image-type {build_config.image_type}", "blue"),
color="green",
file=sys.stderr,
)
if build_config.image_type == LlamaStackImageType.VENV:
cprint(
"You can run the new Llama Stack distro (after activating "
+ colored(image_name, "cyan")
+ ") via: "
+ colored(f"llama stack run {run_config_file}", "blue"),
color="green",
file=sys.stderr,
)
elif build_config.image_type == LlamaStackImageType.CONTAINER:
cprint(
"You can run the container with: "
+ colored(
f"docker run -p 8321:8321 -v ~/.llama:/root/.llama localhost/{image_name} --port 8321", "blue"
),
color="green",
file=sys.stderr,
)
return distro_path
else:
return _generate_run_config(build_config, build_dir, image_name)

View file

@ -16,7 +16,7 @@ import yaml
from llama_stack.cli.stack.utils import ImageType
from llama_stack.cli.subcommand import Subcommand
from llama_stack.core.datatypes import LoggingConfig, StackRunConfig
from llama_stack.core.stack import cast_image_name_to_string, replace_env_vars, validate_env_pair
from llama_stack.core.stack import cast_image_name_to_string, replace_env_vars
from llama_stack.core.utils.config_resolution import Mode, resolve_config_or_distro
from llama_stack.log import get_logger
@ -55,18 +55,12 @@ class StackRun(Subcommand):
"--image-name",
type=str,
default=None,
help="Name of the image to run. Defaults to the current environment",
)
self.parser.add_argument(
"--env",
action="append",
help="Environment variables to pass to the server in KEY=VALUE format. Can be specified multiple times.",
metavar="KEY=VALUE",
help="[DEPRECATED] This flag is no longer supported. Please activate your virtual environment before running.",
)
self.parser.add_argument(
"--image-type",
type=str,
help="Image Type used during the build. This can be only venv.",
help="[DEPRECATED] This flag is no longer supported. Please activate your virtual environment before running.",
choices=[e.value for e in ImageType if e.value != ImageType.CONTAINER.value],
)
self.parser.add_argument(
@ -75,48 +69,22 @@ class StackRun(Subcommand):
help="Start the UI server",
)
def _resolve_config_and_distro(self, args: argparse.Namespace) -> tuple[Path | None, str | None]:
"""Resolve config file path and distribution name from args.config"""
from llama_stack.core.utils.config_dirs import DISTRIBS_BASE_DIR
if not args.config:
return None, None
config_file = Path(args.config)
has_yaml_suffix = args.config.endswith(".yaml")
distro_name = None
if not config_file.exists() and not has_yaml_suffix:
# check if this is a distribution
config_file = Path(REPO_ROOT) / "llama_stack" / "distributions" / args.config / "run.yaml"
if config_file.exists():
distro_name = args.config
if not config_file.exists() and not has_yaml_suffix:
# check if it's a build config saved to ~/.llama dir
config_file = Path(DISTRIBS_BASE_DIR / f"llamastack-{args.config}" / f"{args.config}-run.yaml")
if not config_file.exists():
self.parser.error(
f"File {str(config_file)} does not exist.\n\nPlease run `llama stack build` to generate (and optionally edit) a run.yaml file"
)
if not config_file.is_file():
self.parser.error(
f"Config file must be a valid file path, '{config_file}' is not a file: type={type(config_file)}"
)
return config_file, distro_name
def _run_stack_run_cmd(self, args: argparse.Namespace) -> None:
import yaml
from llama_stack.core.configure import parse_and_maybe_upgrade_config
from llama_stack.core.utils.exec import formulate_run_args, run_command
if args.image_type or args.image_name:
self.parser.error(
"The --image-type and --image-name flags are no longer supported.\n\n"
"Please activate your virtual environment manually before running `llama stack run`.\n\n"
"For example:\n"
" source /path/to/venv/bin/activate\n"
" llama stack run <config>\n"
)
if args.enable_ui:
self._start_ui_development_server(args.port)
image_type, image_name = args.image_type, args.image_name
if args.config:
try:
@ -128,10 +96,6 @@ class StackRun(Subcommand):
else:
config_file = None
# Check if config is required based on image type
if image_type == ImageType.VENV.value and not config_file:
self.parser.error("Config file is required for venv environment")
if config_file:
logger.info(f"Using run configuration: {config_file}")
@ -146,50 +110,13 @@ class StackRun(Subcommand):
os.makedirs(str(config.external_providers_dir), exist_ok=True)
except AttributeError as e:
self.parser.error(f"failed to parse config file '{config_file}':\n {e}")
else:
config = None
# If neither image type nor image name is provided, assume the server should be run directly
# using the current environment packages.
if not image_type and not image_name:
logger.info("No image type or image name provided. Assuming environment packages.")
self._uvicorn_run(config_file, args)
else:
run_args = formulate_run_args(image_type, image_name)
run_args.extend([str(args.port)])
if config_file:
run_args.extend(["--config", str(config_file)])
if args.env:
for env_var in args.env:
if "=" not in env_var:
self.parser.error(f"Environment variable '{env_var}' must be in KEY=VALUE format")
return
key, value = env_var.split("=", 1) # split on first = only
if not key:
self.parser.error(f"Environment variable '{env_var}' has empty key")
return
run_args.extend(["--env", f"{key}={value}"])
run_command(run_args)
self._uvicorn_run(config_file, args)
def _uvicorn_run(self, config_file: Path | None, args: argparse.Namespace) -> None:
if not config_file:
self.parser.error("Config file is required")
# Set environment variables if provided
if args.env:
for env_pair in args.env:
try:
key, value = validate_env_pair(env_pair)
logger.info(f"Setting environment variable {key} => {value}")
os.environ[key] = value
except ValueError as e:
logger.error(f"Error: {str(e)}")
self.parser.error(f"Invalid environment variable format: {env_pair}")
config_file = resolve_config_or_distro(str(config_file), Mode.RUN)
with open(config_file) as fp:
config_contents = yaml.safe_load(fp)

View file

@ -33,7 +33,7 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models):
try:
models = await provider.list_models()
except Exception as e:
logger.warning(f"Model refresh failed for provider {provider_id}: {e}")
logger.debug(f"Model refresh failed for provider {provider_id}: {e}")
continue
self.listed_providers.add(provider_id)

View file

@ -245,3 +245,65 @@ class VectorDBsRoutingTable(CommonRoutingTableImpl, VectorDBs):
vector_store_id=vector_store_id,
file_id=file_id,
)
async def openai_create_vector_store_file_batch(
self,
vector_store_id: str,
file_ids: list[str],
attributes: dict[str, Any] | None = None,
chunking_strategy: Any | None = None,
):
await self.assert_action_allowed("update", "vector_db", vector_store_id)
provider = await self.get_provider_impl(vector_store_id)
return await provider.openai_create_vector_store_file_batch(
vector_store_id=vector_store_id,
file_ids=file_ids,
attributes=attributes,
chunking_strategy=chunking_strategy,
)
async def openai_retrieve_vector_store_file_batch(
self,
batch_id: str,
vector_store_id: str,
):
await self.assert_action_allowed("read", "vector_db", vector_store_id)
provider = await self.get_provider_impl(vector_store_id)
return await provider.openai_retrieve_vector_store_file_batch(
batch_id=batch_id,
vector_store_id=vector_store_id,
)
async def openai_list_files_in_vector_store_file_batch(
self,
batch_id: str,
vector_store_id: str,
after: str | None = None,
before: str | None = None,
filter: str | None = None,
limit: int | None = 20,
order: str | None = "desc",
):
await self.assert_action_allowed("read", "vector_db", vector_store_id)
provider = await self.get_provider_impl(vector_store_id)
return await provider.openai_list_files_in_vector_store_file_batch(
batch_id=batch_id,
vector_store_id=vector_store_id,
after=after,
before=before,
filter=filter,
limit=limit,
order=order,
)
async def openai_cancel_vector_store_file_batch(
self,
batch_id: str,
vector_store_id: str,
):
await self.assert_action_allowed("update", "vector_db", vector_store_id)
provider = await self.get_provider_impl(vector_store_id)
return await provider.openai_cancel_vector_store_file_batch(
batch_id=batch_id,
vector_store_id=vector_store_id,
)

View file

@ -274,22 +274,6 @@ def cast_image_name_to_string(config_dict: dict[str, Any]) -> dict[str, Any]:
return config_dict
def validate_env_pair(env_pair: str) -> tuple[str, str]:
"""Validate and split an environment variable key-value pair."""
try:
key, value = env_pair.split("=", 1)
key = key.strip()
if not key:
raise ValueError(f"Empty key in environment variable pair: {env_pair}")
if not all(c.isalnum() or c == "_" for c in key):
raise ValueError(f"Key must contain only alphanumeric characters and underscores: {key}")
return key, value
except ValueError as e:
raise ValueError(
f"Invalid environment variable format '{env_pair}': {str(e)}. Expected format: KEY=value"
) from e
def add_internal_implementations(impls: dict[Api, Any], run_config: StackRunConfig) -> None:
"""Add internal implementations (inspect and providers) to the implementations dictionary.

View file

@ -25,7 +25,7 @@ error_handler() {
trap 'error_handler ${LINENO}' ERR
if [ $# -lt 3 ]; then
echo "Usage: $0 <env_type> <env_path_or_name> <port> [--config <yaml_config>] [--env KEY=VALUE]..."
echo "Usage: $0 <env_type> <env_path_or_name> <port> [--config <yaml_config>]"
exit 1
fi
@ -43,7 +43,6 @@ SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
# Initialize variables
yaml_config=""
env_vars=""
other_args=""
# Process remaining arguments
@ -58,15 +57,6 @@ while [[ $# -gt 0 ]]; do
exit 1
fi
;;
--env)
if [[ -n "$2" ]]; then
env_vars="$env_vars --env $2"
shift 2
else
echo -e "${RED}Error: --env requires a KEY=VALUE argument${NC}" >&2
exit 1
fi
;;
*)
other_args="$other_args $1"
shift
@ -119,7 +109,6 @@ if [[ "$env_type" == "venv" ]]; then
llama stack run \
$yaml_config_arg \
--port "$port" \
$env_vars \
$other_args
elif [[ "$env_type" == "container" ]]; then
echo -e "${RED}Warning: Llama Stack no longer supports running Containers via the 'llama stack run' command.${NC}"

View file

@ -117,11 +117,11 @@ docker run -it \
# NOTE: mount the llama-stack directory if testing local changes else not needed
-v $HOME/git/llama-stack:/app/llama-stack-source \
# localhost/distribution-dell:dev if building / testing locally
-e INFERENCE_MODEL=$INFERENCE_MODEL \
-e DEH_URL=$DEH_URL \
-e CHROMA_URL=$CHROMA_URL \
llamastack/distribution-{{ name }}\
--port $LLAMA_STACK_PORT \
--env INFERENCE_MODEL=$INFERENCE_MODEL \
--env DEH_URL=$DEH_URL \
--env CHROMA_URL=$CHROMA_URL
--port $LLAMA_STACK_PORT
```
@ -142,14 +142,14 @@ docker run \
-p $LLAMA_STACK_PORT:$LLAMA_STACK_PORT \
-v $HOME/.llama:/root/.llama \
-v ./llama_stack/distributions/tgi/run-with-safety.yaml:/root/my-run.yaml \
-e INFERENCE_MODEL=$INFERENCE_MODEL \
-e DEH_URL=$DEH_URL \
-e SAFETY_MODEL=$SAFETY_MODEL \
-e DEH_SAFETY_URL=$DEH_SAFETY_URL \
-e CHROMA_URL=$CHROMA_URL \
llamastack/distribution-{{ name }} \
--config /root/my-run.yaml \
--port $LLAMA_STACK_PORT \
--env INFERENCE_MODEL=$INFERENCE_MODEL \
--env DEH_URL=$DEH_URL \
--env SAFETY_MODEL=$SAFETY_MODEL \
--env DEH_SAFETY_URL=$DEH_SAFETY_URL \
--env CHROMA_URL=$CHROMA_URL
--port $LLAMA_STACK_PORT
```
### Via Conda
@ -158,21 +158,21 @@ Make sure you have done `pip install llama-stack` and have the Llama Stack CLI a
```bash
llama stack build --distro {{ name }} --image-type conda
llama stack run {{ name }}
--port $LLAMA_STACK_PORT \
--env INFERENCE_MODEL=$INFERENCE_MODEL \
--env DEH_URL=$DEH_URL \
--env CHROMA_URL=$CHROMA_URL
INFERENCE_MODEL=$INFERENCE_MODEL \
DEH_URL=$DEH_URL \
CHROMA_URL=$CHROMA_URL \
llama stack run {{ name }} \
--port $LLAMA_STACK_PORT
```
If you are using Llama Stack Safety / Shield APIs, use:
```bash
INFERENCE_MODEL=$INFERENCE_MODEL \
DEH_URL=$DEH_URL \
SAFETY_MODEL=$SAFETY_MODEL \
DEH_SAFETY_URL=$DEH_SAFETY_URL \
CHROMA_URL=$CHROMA_URL \
llama stack run ./run-with-safety.yaml \
--port $LLAMA_STACK_PORT \
--env INFERENCE_MODEL=$INFERENCE_MODEL \
--env DEH_URL=$DEH_URL \
--env SAFETY_MODEL=$SAFETY_MODEL \
--env DEH_SAFETY_URL=$DEH_SAFETY_URL \
--env CHROMA_URL=$CHROMA_URL
--port $LLAMA_STACK_PORT
```

View file

@ -72,9 +72,9 @@ docker run \
--gpu all \
-p $LLAMA_STACK_PORT:$LLAMA_STACK_PORT \
-v ~/.llama:/root/.llama \
-e INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct \
llamastack/distribution-{{ name }} \
--port $LLAMA_STACK_PORT \
--env INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct
--port $LLAMA_STACK_PORT
```
If you are using Llama Stack Safety / Shield APIs, use:
@ -86,10 +86,10 @@ docker run \
--gpu all \
-p $LLAMA_STACK_PORT:$LLAMA_STACK_PORT \
-v ~/.llama:/root/.llama \
-e INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct \
-e SAFETY_MODEL=meta-llama/Llama-Guard-3-1B \
llamastack/distribution-{{ name }} \
--port $LLAMA_STACK_PORT \
--env INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct \
--env SAFETY_MODEL=meta-llama/Llama-Guard-3-1B
--port $LLAMA_STACK_PORT
```
### Via venv
@ -98,16 +98,16 @@ Make sure you have done `uv pip install llama-stack` and have the Llama Stack CL
```bash
llama stack build --distro {{ name }} --image-type venv
INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct \
llama stack run distributions/{{ name }}/run.yaml \
--port 8321 \
--env INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct
--port 8321
```
If you are using Llama Stack Safety / Shield APIs, use:
```bash
INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct \
SAFETY_MODEL=meta-llama/Llama-Guard-3-1B \
llama stack run distributions/{{ name }}/run-with-safety.yaml \
--port 8321 \
--env INFERENCE_MODEL=meta-llama/Llama-3.2-3B-Instruct \
--env SAFETY_MODEL=meta-llama/Llama-Guard-3-1B
--port 8321
```

View file

@ -118,10 +118,10 @@ docker run \
--pull always \
-p $LLAMA_STACK_PORT:$LLAMA_STACK_PORT \
-v ./run.yaml:/root/my-run.yaml \
-e NVIDIA_API_KEY=$NVIDIA_API_KEY \
llamastack/distribution-{{ name }} \
--config /root/my-run.yaml \
--port $LLAMA_STACK_PORT \
--env NVIDIA_API_KEY=$NVIDIA_API_KEY
--port $LLAMA_STACK_PORT
```
### Via venv
@ -131,10 +131,10 @@ If you've set up your local development environment, you can also build the imag
```bash
INFERENCE_MODEL=meta-llama/Llama-3.1-8B-Instruct
llama stack build --distro nvidia --image-type venv
NVIDIA_API_KEY=$NVIDIA_API_KEY \
INFERENCE_MODEL=$INFERENCE_MODEL \
llama stack run ./run.yaml \
--port 8321 \
--env NVIDIA_API_KEY=$NVIDIA_API_KEY \
--env INFERENCE_MODEL=$INFERENCE_MODEL
--port 8321
```
## Example Notebooks

View file

@ -22,6 +22,7 @@ async def get_provider_impl(config: MetaReferenceAgentsImplConfig, deps: dict[Ap
deps[Api.tool_runtime],
deps[Api.tool_groups],
policy,
Api.telemetry in deps,
)
await impl.initialize()
return impl

View file

@ -110,6 +110,7 @@ class ChatAgent(ShieldRunnerMixin):
persistence_store: KVStore,
created_at: str,
policy: list[AccessRule],
telemetry_enabled: bool = False,
):
self.agent_id = agent_id
self.agent_config = agent_config
@ -120,6 +121,7 @@ class ChatAgent(ShieldRunnerMixin):
self.tool_runtime_api = tool_runtime_api
self.tool_groups_api = tool_groups_api
self.created_at = created_at
self.telemetry_enabled = telemetry_enabled
ShieldRunnerMixin.__init__(
self,
@ -188,28 +190,30 @@ class ChatAgent(ShieldRunnerMixin):
async def create_and_execute_turn(self, request: AgentTurnCreateRequest) -> AsyncGenerator:
turn_id = str(uuid.uuid4())
span = tracing.get_current_span()
if span:
span.set_attribute("session_id", request.session_id)
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
if self.telemetry_enabled:
span = tracing.get_current_span()
if span is not None:
span.set_attribute("session_id", request.session_id)
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
await self._initialize_tools(request.toolgroups)
async for chunk in self._run_turn(request, turn_id):
yield chunk
async def resume_turn(self, request: AgentTurnResumeRequest) -> AsyncGenerator:
span = tracing.get_current_span()
if span:
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("session_id", request.session_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", request.turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
if self.telemetry_enabled:
span = tracing.get_current_span()
if span is not None:
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("session_id", request.session_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", request.turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
await self._initialize_tools()
async for chunk in self._run_turn(request):
@ -395,9 +399,12 @@ class ChatAgent(ShieldRunnerMixin):
touchpoint: str,
) -> AsyncGenerator:
async with tracing.span("run_shields") as span:
span.set_attribute("input", [m.model_dump_json() for m in messages])
if self.telemetry_enabled and span is not None:
span.set_attribute("input", [m.model_dump_json() for m in messages])
if len(shields) == 0:
span.set_attribute("output", "no shields")
if len(shields) == 0:
span.set_attribute("output", "no shields")
return
step_id = str(uuid.uuid4())
@ -430,7 +437,8 @@ class ChatAgent(ShieldRunnerMixin):
)
)
)
span.set_attribute("output", e.violation.model_dump_json())
if self.telemetry_enabled and span is not None:
span.set_attribute("output", e.violation.model_dump_json())
yield CompletionMessage(
content=str(e),
@ -453,7 +461,8 @@ class ChatAgent(ShieldRunnerMixin):
)
)
)
span.set_attribute("output", "no violations")
if self.telemetry_enabled and span is not None:
span.set_attribute("output", "no violations")
async def _run(
self,
@ -518,8 +527,9 @@ class ChatAgent(ShieldRunnerMixin):
stop_reason: StopReason | None = None
async with tracing.span("inference") as span:
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
if self.telemetry_enabled and span is not None:
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
def _serialize_nested(value):
"""Recursively serialize nested Pydantic models to dicts."""
@ -637,18 +647,19 @@ class ChatAgent(ShieldRunnerMixin):
else:
raise ValueError(f"Unexpected delta type {type(delta)}")
span.set_attribute("stop_reason", stop_reason or StopReason.end_of_turn)
span.set_attribute(
"input",
json.dumps([json.loads(m.model_dump_json()) for m in input_messages]),
)
output_attr = json.dumps(
{
"content": content,
"tool_calls": [json.loads(t.model_dump_json()) for t in tool_calls],
}
)
span.set_attribute("output", output_attr)
if self.telemetry_enabled and span is not None:
span.set_attribute("stop_reason", stop_reason or StopReason.end_of_turn)
span.set_attribute(
"input",
json.dumps([json.loads(m.model_dump_json()) for m in input_messages]),
)
output_attr = json.dumps(
{
"content": content,
"tool_calls": [json.loads(t.model_dump_json()) for t in tool_calls],
}
)
span.set_attribute("output", output_attr)
n_iter += 1
await self.storage.set_num_infer_iters_in_turn(session_id, turn_id, n_iter)
@ -756,7 +767,9 @@ class ChatAgent(ShieldRunnerMixin):
{
"tool_name": tool_call.tool_name,
"input": message.model_dump_json(),
},
}
if self.telemetry_enabled
else {},
) as span:
tool_execution_start_time = datetime.now(UTC).isoformat()
tool_result = await self.execute_tool_call_maybe(
@ -771,7 +784,8 @@ class ChatAgent(ShieldRunnerMixin):
call_id=tool_call.call_id,
content=tool_result.content,
)
span.set_attribute("output", result_message.model_dump_json())
if self.telemetry_enabled and span is not None:
span.set_attribute("output", result_message.model_dump_json())
# Store tool execution step
tool_execution_step = ToolExecutionStep(

View file

@ -64,6 +64,7 @@ class MetaReferenceAgentsImpl(Agents):
tool_runtime_api: ToolRuntime,
tool_groups_api: ToolGroups,
policy: list[AccessRule],
telemetry_enabled: bool = False,
):
self.config = config
self.inference_api = inference_api
@ -71,6 +72,7 @@ class MetaReferenceAgentsImpl(Agents):
self.safety_api = safety_api
self.tool_runtime_api = tool_runtime_api
self.tool_groups_api = tool_groups_api
self.telemetry_enabled = telemetry_enabled
self.in_memory_store = InmemoryKVStoreImpl()
self.openai_responses_impl: OpenAIResponsesImpl | None = None
@ -135,6 +137,7 @@ class MetaReferenceAgentsImpl(Agents):
),
created_at=agent_info.created_at,
policy=self.policy,
telemetry_enabled=self.telemetry_enabled,
)
async def create_agent_session(

View file

@ -200,12 +200,10 @@ class FaissIndex(EmbeddingIndex):
class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
def __init__(self, config: FaissVectorIOConfig, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.inference_api = inference_api
self.files_api = files_api
self.cache: dict[str, VectorDBWithIndex] = {}
self.kvstore: KVStore | None = None
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)

View file

@ -410,12 +410,10 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
"""
def __init__(self, config, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.inference_api = inference_api
self.files_api = files_api
self.cache: dict[str, VectorDBWithIndex] = {}
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self.kvstore: KVStore | None = None
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)

View file

@ -36,6 +36,9 @@ def available_providers() -> list[ProviderSpec]:
Api.tool_runtime,
Api.tool_groups,
],
optional_api_dependencies=[
Api.telemetry,
],
description="Meta's reference implementation of an agent system that can use tools, access vector databases, and perform complex reasoning tasks.",
),
]

View file

@ -11,6 +11,7 @@ from llama_stack.providers.datatypes import (
ProviderSpec,
RemoteProviderSpec,
)
from llama_stack.providers.registry.vector_io import DEFAULT_VECTOR_IO_DEPS
def available_providers() -> list[ProviderSpec]:
@ -18,9 +19,8 @@ def available_providers() -> list[ProviderSpec]:
InlineProviderSpec(
api=Api.tool_runtime,
provider_type="inline::rag-runtime",
pip_packages=[
"chardet",
"pypdf",
pip_packages=DEFAULT_VECTOR_IO_DEPS
+ [
"tqdm",
"numpy",
"scikit-learn",

View file

@ -12,13 +12,16 @@ from llama_stack.providers.datatypes import (
RemoteProviderSpec,
)
# Common dependencies for all vector IO providers that support document processing
DEFAULT_VECTOR_IO_DEPS = ["chardet", "pypdf"]
def available_providers() -> list[ProviderSpec]:
return [
InlineProviderSpec(
api=Api.vector_io,
provider_type="inline::meta-reference",
pip_packages=["faiss-cpu"],
pip_packages=["faiss-cpu"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.inline.vector_io.faiss",
config_class="llama_stack.providers.inline.vector_io.faiss.FaissVectorIOConfig",
deprecation_warning="Please use the `inline::faiss` provider instead.",
@ -29,7 +32,7 @@ def available_providers() -> list[ProviderSpec]:
InlineProviderSpec(
api=Api.vector_io,
provider_type="inline::faiss",
pip_packages=["faiss-cpu"],
pip_packages=["faiss-cpu"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.inline.vector_io.faiss",
config_class="llama_stack.providers.inline.vector_io.faiss.FaissVectorIOConfig",
api_dependencies=[Api.inference],
@ -82,7 +85,7 @@ more details about Faiss in general.
InlineProviderSpec(
api=Api.vector_io,
provider_type="inline::sqlite-vec",
pip_packages=["sqlite-vec"],
pip_packages=["sqlite-vec"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.inline.vector_io.sqlite_vec",
config_class="llama_stack.providers.inline.vector_io.sqlite_vec.SQLiteVectorIOConfig",
api_dependencies=[Api.inference],
@ -289,7 +292,7 @@ See [sqlite-vec's GitHub repo](https://github.com/asg017/sqlite-vec/tree/main) f
InlineProviderSpec(
api=Api.vector_io,
provider_type="inline::sqlite_vec",
pip_packages=["sqlite-vec"],
pip_packages=["sqlite-vec"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.inline.vector_io.sqlite_vec",
config_class="llama_stack.providers.inline.vector_io.sqlite_vec.SQLiteVectorIOConfig",
deprecation_warning="Please use the `inline::sqlite-vec` provider (notice the hyphen instead of underscore) instead.",
@ -303,7 +306,7 @@ Please refer to the sqlite-vec provider documentation.
api=Api.vector_io,
adapter_type="chromadb",
provider_type="remote::chromadb",
pip_packages=["chromadb-client"],
pip_packages=["chromadb-client"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.remote.vector_io.chroma",
config_class="llama_stack.providers.remote.vector_io.chroma.ChromaVectorIOConfig",
api_dependencies=[Api.inference],
@ -345,7 +348,7 @@ See [Chroma's documentation](https://docs.trychroma.com/docs/overview/introducti
InlineProviderSpec(
api=Api.vector_io,
provider_type="inline::chromadb",
pip_packages=["chromadb"],
pip_packages=["chromadb"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.inline.vector_io.chroma",
config_class="llama_stack.providers.inline.vector_io.chroma.ChromaVectorIOConfig",
api_dependencies=[Api.inference],
@ -389,7 +392,7 @@ See [Chroma's documentation](https://docs.trychroma.com/docs/overview/introducti
api=Api.vector_io,
adapter_type="pgvector",
provider_type="remote::pgvector",
pip_packages=["psycopg2-binary"],
pip_packages=["psycopg2-binary"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.remote.vector_io.pgvector",
config_class="llama_stack.providers.remote.vector_io.pgvector.PGVectorVectorIOConfig",
api_dependencies=[Api.inference],
@ -500,7 +503,7 @@ See [PGVector's documentation](https://github.com/pgvector/pgvector) for more de
api=Api.vector_io,
adapter_type="weaviate",
provider_type="remote::weaviate",
pip_packages=["weaviate-client>=4.16.5"],
pip_packages=["weaviate-client>=4.16.5"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.remote.vector_io.weaviate",
config_class="llama_stack.providers.remote.vector_io.weaviate.WeaviateVectorIOConfig",
provider_data_validator="llama_stack.providers.remote.vector_io.weaviate.WeaviateRequestProviderData",
@ -541,7 +544,7 @@ See [Weaviate's documentation](https://weaviate.io/developers/weaviate) for more
InlineProviderSpec(
api=Api.vector_io,
provider_type="inline::qdrant",
pip_packages=["qdrant-client"],
pip_packages=["qdrant-client"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.inline.vector_io.qdrant",
config_class="llama_stack.providers.inline.vector_io.qdrant.QdrantVectorIOConfig",
api_dependencies=[Api.inference],
@ -594,7 +597,7 @@ See the [Qdrant documentation](https://qdrant.tech/documentation/) for more deta
api=Api.vector_io,
adapter_type="qdrant",
provider_type="remote::qdrant",
pip_packages=["qdrant-client"],
pip_packages=["qdrant-client"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.remote.vector_io.qdrant",
config_class="llama_stack.providers.remote.vector_io.qdrant.QdrantVectorIOConfig",
api_dependencies=[Api.inference],
@ -607,7 +610,7 @@ Please refer to the inline provider documentation.
api=Api.vector_io,
adapter_type="milvus",
provider_type="remote::milvus",
pip_packages=["pymilvus>=2.4.10"],
pip_packages=["pymilvus>=2.4.10"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.remote.vector_io.milvus",
config_class="llama_stack.providers.remote.vector_io.milvus.MilvusVectorIOConfig",
api_dependencies=[Api.inference],
@ -813,7 +816,7 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi
InlineProviderSpec(
api=Api.vector_io,
provider_type="inline::milvus",
pip_packages=["pymilvus[milvus-lite]>=2.4.10"],
pip_packages=["pymilvus[milvus-lite]>=2.4.10"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.inline.vector_io.milvus",
config_class="llama_stack.providers.inline.vector_io.milvus.MilvusVectorIOConfig",
api_dependencies=[Api.inference],

View file

@ -41,9 +41,6 @@ class DatabricksInferenceAdapter(OpenAIMixin):
).serving_endpoints.list() # TODO: this is not async
]
async def should_refresh_models(self) -> bool:
return False
async def openai_completion(
self,
model: str,

View file

@ -6,8 +6,6 @@
from typing import Any
from pydantic import Field
from llama_stack.providers.utils.inference.model_registry import RemoteInferenceProviderConfig
DEFAULT_OLLAMA_URL = "http://localhost:11434"
@ -15,10 +13,6 @@ DEFAULT_OLLAMA_URL = "http://localhost:11434"
class OllamaImplConfig(RemoteInferenceProviderConfig):
url: str = DEFAULT_OLLAMA_URL
refresh_models: bool = Field(
default=False,
description="Whether to refresh models periodically",
)
@classmethod
def sample_run_config(cls, url: str = "${env.OLLAMA_URL:=http://localhost:11434}", **kwargs) -> dict[str, Any]:

View file

@ -72,9 +72,6 @@ class OllamaInferenceAdapter(OpenAIMixin):
f"Ollama Server is not running (message: {r['message']}). Make sure to start it using `ollama serve` in a separate terminal"
)
async def should_refresh_models(self) -> bool:
return self.config.refresh_models
async def health(self) -> HealthResponse:
"""
Performs a health check by verifying connectivity to the Ollama server.

View file

@ -11,6 +11,6 @@ async def get_adapter_impl(config: RunpodImplConfig, _deps):
from .runpod import RunpodInferenceAdapter
assert isinstance(config, RunpodImplConfig), f"Unexpected config type: {type(config)}"
impl = RunpodInferenceAdapter(config)
impl = RunpodInferenceAdapter(config=config)
await impl.initialize()
return impl

View file

@ -4,69 +4,86 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any
from llama_stack.apis.inference import * # noqa: F403
from llama_stack.apis.inference import OpenAIEmbeddingsResponse
# from llama_stack.providers.datatypes import ModelsProtocolPrivate
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper, build_hf_repo_model_entry
from llama_stack.providers.utils.inference.openai_compat import (
get_sampling_options,
)
from llama_stack.providers.utils.inference.prompt_adapter import (
chat_completion_request_to_prompt,
from llama_stack.apis.inference import (
OpenAIMessageParam,
OpenAIResponseFormatParam,
)
from llama_stack.providers.utils.inference.openai_mixin import OpenAIMixin
from .config import RunpodImplConfig
# https://docs.runpod.io/serverless/vllm/overview#compatible-models
# https://github.com/runpod-workers/worker-vllm/blob/main/README.md#compatible-model-architectures
RUNPOD_SUPPORTED_MODELS = {
"Llama3.1-8B": "meta-llama/Llama-3.1-8B",
"Llama3.1-70B": "meta-llama/Llama-3.1-70B",
"Llama3.1-405B:bf16-mp8": "meta-llama/Llama-3.1-405B",
"Llama3.1-405B": "meta-llama/Llama-3.1-405B-FP8",
"Llama3.1-405B:bf16-mp16": "meta-llama/Llama-3.1-405B",
"Llama3.1-8B-Instruct": "meta-llama/Llama-3.1-8B-Instruct",
"Llama3.1-70B-Instruct": "meta-llama/Llama-3.1-70B-Instruct",
"Llama3.1-405B-Instruct:bf16-mp8": "meta-llama/Llama-3.1-405B-Instruct",
"Llama3.1-405B-Instruct": "meta-llama/Llama-3.1-405B-Instruct-FP8",
"Llama3.1-405B-Instruct:bf16-mp16": "meta-llama/Llama-3.1-405B-Instruct",
"Llama3.2-1B": "meta-llama/Llama-3.2-1B",
"Llama3.2-3B": "meta-llama/Llama-3.2-3B",
}
SAFETY_MODELS_ENTRIES = []
class RunpodInferenceAdapter(OpenAIMixin):
"""
Adapter for RunPod's OpenAI-compatible API endpoints.
Supports VLLM for serverless endpoint self-hosted or public endpoints.
Can work with any runpod endpoints that support OpenAI-compatible API
"""
# Create MODEL_ENTRIES from RUNPOD_SUPPORTED_MODELS for compatibility with starter template
MODEL_ENTRIES = [
build_hf_repo_model_entry(provider_model_id, model_descriptor)
for provider_model_id, model_descriptor in RUNPOD_SUPPORTED_MODELS.items()
] + SAFETY_MODELS_ENTRIES
config: RunpodImplConfig
def get_api_key(self) -> str:
"""Get API key for OpenAI client."""
return self.config.api_token
class RunpodInferenceAdapter(
ModelRegistryHelper,
Inference,
):
def __init__(self, config: RunpodImplConfig) -> None:
ModelRegistryHelper.__init__(self, stack_to_provider_models_map=RUNPOD_SUPPORTED_MODELS)
self.config = config
def get_base_url(self) -> str:
"""Get base URL for OpenAI client."""
return self.config.url
def _get_params(self, request: ChatCompletionRequest) -> dict:
return {
"model": self.map_to_provider_model(request.model),
"prompt": chat_completion_request_to_prompt(request),
"stream": request.stream,
**get_sampling_options(request.sampling_params),
}
async def openai_embeddings(
async def openai_chat_completion(
self,
model: str,
input: str | list[str],
encoding_format: str | None = "float",
dimensions: int | None = None,
messages: list[OpenAIMessageParam],
frequency_penalty: float | None = None,
function_call: str | dict[str, Any] | None = None,
functions: list[dict[str, Any]] | None = None,
logit_bias: dict[str, float] | None = None,
logprobs: bool | None = None,
max_completion_tokens: int | None = None,
max_tokens: int | None = None,
n: int | None = None,
parallel_tool_calls: bool | None = None,
presence_penalty: float | None = None,
response_format: OpenAIResponseFormatParam | None = None,
seed: int | None = None,
stop: str | list[str] | None = None,
stream: bool | None = None,
stream_options: dict[str, Any] | None = None,
temperature: float | None = None,
tool_choice: str | dict[str, Any] | None = None,
tools: list[dict[str, Any]] | None = None,
top_logprobs: int | None = None,
top_p: float | None = None,
user: str | None = None,
) -> OpenAIEmbeddingsResponse:
raise NotImplementedError()
):
"""Override to add RunPod-specific stream_options requirement."""
if stream and not stream_options:
stream_options = {"include_usage": True}
return await super().openai_chat_completion(
model=model,
messages=messages,
frequency_penalty=frequency_penalty,
function_call=function_call,
functions=functions,
logit_bias=logit_bias,
logprobs=logprobs,
max_completion_tokens=max_completion_tokens,
max_tokens=max_tokens,
n=n,
parallel_tool_calls=parallel_tool_calls,
presence_penalty=presence_penalty,
response_format=response_format,
seed=seed,
stop=stop,
stream=stream,
stream_options=stream_options,
temperature=temperature,
tool_choice=tool_choice,
tools=tools,
top_logprobs=top_logprobs,
top_p=top_p,
user=user,
)

View file

@ -63,9 +63,6 @@ class TogetherInferenceAdapter(OpenAIMixin, NeedsRequestProviderData):
# Together's /v1/models is not compatible with OpenAI's /v1/models. Together support ticket #13355 -> will not fix, use Together's own client
return [m.id for m in await self._get_client().models.list()]
async def should_refresh_models(self) -> bool:
return True
async def openai_embeddings(
self,
model: str,

View file

@ -30,10 +30,6 @@ class VLLMInferenceAdapterConfig(RemoteInferenceProviderConfig):
default=True,
description="Whether to verify TLS certificates. Can be a boolean or a path to a CA certificate file.",
)
refresh_models: bool = Field(
default=False,
description="Whether to refresh models periodically",
)
@field_validator("tls_verify")
@classmethod

View file

@ -53,10 +53,6 @@ class VLLMInferenceAdapter(OpenAIMixin):
"You must provide a URL in run.yaml (or via the VLLM_URL environment variable) to use vLLM."
)
async def should_refresh_models(self) -> bool:
# Strictly respecting the refresh_models directive
return self.config.refresh_models
async def health(self) -> HealthResponse:
"""
Performs a health check by verifying connectivity to the remote vLLM server.

View file

@ -140,14 +140,13 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
inference_api: Api.inference,
files_api: Files | None,
) -> None:
super().__init__(files_api=files_api, kvstore=None)
log.info(f"Initializing ChromaVectorIOAdapter with url: {config}")
self.config = config
self.inference_api = inference_api
self.client = None
self.cache = {}
self.kvstore: KVStore | None = None
self.vector_db_store = None
self.files_api = files_api
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)

View file

@ -309,14 +309,12 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
inference_api: Inference,
files_api: Files | None,
) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.cache = {}
self.client = None
self.inference_api = inference_api
self.files_api = files_api
self.kvstore: KVStore | None = None
self.vector_db_store = None
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self.metadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None:

View file

@ -345,14 +345,12 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
inference_api: Api.inference,
files_api: Files | None = None,
) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.inference_api = inference_api
self.conn = None
self.cache = {}
self.files_api = files_api
self.kvstore: KVStore | None = None
self.vector_db_store = None
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self.metadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None:

View file

@ -27,7 +27,7 @@ from llama_stack.apis.vector_io import (
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig
from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import (
ChunkForDeletion,
@ -162,14 +162,12 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
inference_api: Api.inference,
files_api: Files | None = None,
) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.client: AsyncQdrantClient = None
self.cache = {}
self.inference_api = inference_api
self.files_api = files_api
self.vector_db_store = None
self.kvstore: KVStore | None = None
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self._qdrant_lock = asyncio.Lock()
async def initialize(self) -> None:

View file

@ -284,14 +284,12 @@ class WeaviateVectorIOAdapter(
inference_api: Api.inference,
files_api: Files | None,
) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.inference_api = inference_api
self.client_cache = {}
self.cache = {}
self.files_api = files_api
self.kvstore: KVStore | None = None
self.vector_db_store = None
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self.metadata_collection_name = "openai_vector_stores_metadata"
def _get_client(self) -> weaviate.WeaviateClient:

View file

@ -24,6 +24,10 @@ class RemoteInferenceProviderConfig(BaseModel):
default=None,
description="List of models that should be registered with the model registry. If None, all models are allowed.",
)
refresh_models: bool = Field(
default=False,
description="Whether to refresh models periodically from the provider",
)
# TODO: this class is more confusing than useful right now. We need to make it

View file

@ -484,7 +484,7 @@ class OpenAIMixin(NeedsRequestProviderData, ABC, BaseModel):
return model in self._model_cache
async def should_refresh_models(self) -> bool:
return False
return self.config.refresh_models
#
# The model_dump implementations are to avoid serializing the extra fields,

View file

@ -12,6 +12,8 @@ import uuid
from abc import ABC, abstractmethod
from typing import Any
from pydantic import TypeAdapter
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files, OpenAIFileObject
from llama_stack.apis.vector_dbs import VectorDB
@ -50,12 +52,16 @@ logger = get_logger(name=__name__, category="providers::utils")
# Constants for OpenAI vector stores
CHUNK_MULTIPLIER = 5
FILE_BATCH_CLEANUP_INTERVAL_SECONDS = 24 * 60 * 60 # 1 day in seconds
MAX_CONCURRENT_FILES_PER_BATCH = 3 # Maximum concurrent file processing within a batch
FILE_BATCH_CHUNK_SIZE = 10 # Process files in chunks of this size
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:{VERSION}::"
OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX = f"openai_vector_stores_file_batches:{VERSION}::"
class OpenAIVectorStoreMixin(ABC):
@ -65,11 +71,15 @@ class OpenAIVectorStoreMixin(ABC):
an openai_vector_stores in-memory cache.
"""
# These should be provided by the implementing class
openai_vector_stores: dict[str, dict[str, Any]]
files_api: Files | None
# KV store for persisting OpenAI vector store metadata
kvstore: KVStore | None
# Implementing classes should call super().__init__() in their __init__ method
# to properly initialize the mixin attributes.
def __init__(self, files_api: Files | None = None, kvstore: KVStore | None = None):
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self.openai_file_batches: dict[str, dict[str, Any]] = {}
self.files_api = files_api
self.kvstore = kvstore
self._last_file_batch_cleanup_time = 0
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Save vector store metadata to persistent storage."""
@ -159,9 +169,129 @@ class OpenAIVectorStoreMixin(ABC):
for idx in range(len(raw_items)):
await self.kvstore.delete(f"{contents_prefix}{idx}")
async def _save_openai_vector_store_file_batch(self, batch_id: str, batch_info: dict[str, Any]) -> None:
"""Save file batch metadata to persistent storage."""
assert self.kvstore
key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}"
await self.kvstore.set(key=key, value=json.dumps(batch_info))
# update in-memory cache
self.openai_file_batches[batch_id] = batch_info
async def _load_openai_vector_store_file_batches(self) -> dict[str, dict[str, Any]]:
"""Load all file batch metadata from persistent storage."""
assert self.kvstore
start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX
end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff"
stored_data = await self.kvstore.values_in_range(start_key, end_key)
batches: dict[str, dict[str, Any]] = {}
for item in stored_data:
info = json.loads(item)
batches[info["id"]] = info
return batches
async def _delete_openai_vector_store_file_batch(self, batch_id: str) -> None:
"""Delete file batch metadata from persistent storage and in-memory cache."""
assert self.kvstore
key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}"
await self.kvstore.delete(key)
# remove from in-memory cache
self.openai_file_batches.pop(batch_id, None)
async def _cleanup_expired_file_batches(self) -> None:
"""Clean up expired file batches from persistent storage."""
assert self.kvstore
start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX
end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff"
stored_data = await self.kvstore.values_in_range(start_key, end_key)
current_time = int(time.time())
expired_count = 0
for item in stored_data:
info = json.loads(item)
expires_at = info.get("expires_at")
if expires_at and current_time > expires_at:
logger.info(f"Cleaning up expired file batch: {info['id']}")
await self.kvstore.delete(f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{info['id']}")
# Remove from in-memory cache if present
self.openai_file_batches.pop(info["id"], None)
expired_count += 1
if expired_count > 0:
logger.info(f"Cleaned up {expired_count} expired file batches")
async def _get_completed_files_in_batch(self, vector_store_id: str, file_ids: list[str]) -> set[str]:
"""Determine which files in a batch are actually completed by checking vector store file_ids."""
if vector_store_id not in self.openai_vector_stores:
return set()
store_info = self.openai_vector_stores[vector_store_id]
completed_files = set(file_ids) & set(store_info["file_ids"])
return completed_files
async def _analyze_batch_completion_on_resume(self, batch_id: str, batch_info: dict[str, Any]) -> list[str]:
"""Analyze batch completion status and return remaining files to process.
Returns:
List of file IDs that still need processing. Empty list if batch is complete.
"""
vector_store_id = batch_info["vector_store_id"]
all_file_ids = batch_info["file_ids"]
# Find files that are actually completed
completed_files = await self._get_completed_files_in_batch(vector_store_id, all_file_ids)
remaining_files = [file_id for file_id in all_file_ids if file_id not in completed_files]
completed_count = len(completed_files)
total_count = len(all_file_ids)
remaining_count = len(remaining_files)
# Update file counts to reflect actual state
batch_info["file_counts"] = {
"completed": completed_count,
"failed": 0, # We don't track failed files during resume - they'll be retried
"in_progress": remaining_count,
"cancelled": 0,
"total": total_count,
}
# If all files are already completed, mark batch as completed
if remaining_count == 0:
batch_info["status"] = "completed"
logger.info(f"Batch {batch_id} is already fully completed, updating status")
# Save updated batch info
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
return remaining_files
async def _resume_incomplete_batches(self) -> None:
"""Resume processing of incomplete file batches after server restart."""
for batch_id, batch_info in self.openai_file_batches.items():
if batch_info["status"] == "in_progress":
logger.info(f"Analyzing incomplete file batch: {batch_id}")
remaining_files = await self._analyze_batch_completion_on_resume(batch_id, batch_info)
# Check if batch is now completed after analysis
if batch_info["status"] == "completed":
continue
if remaining_files:
logger.info(f"Resuming batch {batch_id} with {len(remaining_files)} remaining files")
# Restart the background processing task with only remaining files
task = asyncio.create_task(self._process_file_batch_async(batch_id, batch_info, remaining_files))
self._file_batch_tasks[batch_id] = task
async def initialize_openai_vector_stores(self) -> None:
"""Load existing OpenAI vector stores into the in-memory cache."""
"""Load existing OpenAI vector stores and file batches into the in-memory cache."""
self.openai_vector_stores = await self._load_openai_vector_stores()
self.openai_file_batches = await self._load_openai_vector_store_file_batches()
self._file_batch_tasks = {}
# TODO: Resume only works for single worker deployment. Jobs with multiple workers will need to be handled differently.
await self._resume_incomplete_batches()
self._last_file_batch_cleanup_time = 0
@abstractmethod
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
@ -570,6 +700,14 @@ class OpenAIVectorStoreMixin(ABC):
if vector_store_id not in self.openai_vector_stores:
raise VectorStoreNotFoundError(vector_store_id)
# Check if file is already attached to this vector store
store_info = self.openai_vector_stores[vector_store_id]
if file_id in store_info["file_ids"]:
logger.warning(f"File {file_id} is already attached to vector store {vector_store_id}, skipping")
# Return existing file object
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
return VectorStoreFileObject(**file_info)
attributes = attributes or {}
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
created_at = int(time.time())
@ -615,7 +753,6 @@ class OpenAIVectorStoreMixin(ABC):
chunk_overlap_tokens,
attributes,
)
if not chunks:
vector_store_file_object.status = "failed"
vector_store_file_object.last_error = VectorStoreFileLastError(
@ -828,7 +965,230 @@ class OpenAIVectorStoreMixin(ABC):
chunking_strategy: VectorStoreChunkingStrategy | None = None,
) -> VectorStoreFileBatchObject:
"""Create a vector store file batch."""
raise NotImplementedError("openai_create_vector_store_file_batch is not implemented yet")
if vector_store_id not in self.openai_vector_stores:
raise VectorStoreNotFoundError(vector_store_id)
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
created_at = int(time.time())
batch_id = f"batch_{uuid.uuid4()}"
# File batches expire after 7 days
expires_at = created_at + (7 * 24 * 60 * 60)
# Initialize batch file counts - all files start as in_progress
file_counts = VectorStoreFileCounts(
completed=0,
cancelled=0,
failed=0,
in_progress=len(file_ids),
total=len(file_ids),
)
# Create batch object immediately with in_progress status
batch_object = VectorStoreFileBatchObject(
id=batch_id,
created_at=created_at,
vector_store_id=vector_store_id,
status="in_progress",
file_counts=file_counts,
)
batch_info = {
**batch_object.model_dump(),
"file_ids": file_ids,
"attributes": attributes,
"chunking_strategy": chunking_strategy.model_dump(),
"expires_at": expires_at,
}
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
# Start background processing of files
task = asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
self._file_batch_tasks[batch_id] = task
# Run cleanup if needed (throttled to once every 1 day)
current_time = int(time.time())
if current_time - self._last_file_batch_cleanup_time >= FILE_BATCH_CLEANUP_INTERVAL_SECONDS:
logger.info("Running throttled cleanup of expired file batches")
asyncio.create_task(self._cleanup_expired_file_batches())
self._last_file_batch_cleanup_time = current_time
return batch_object
async def _process_files_with_concurrency(
self,
file_ids: list[str],
vector_store_id: str,
attributes: dict[str, Any],
chunking_strategy_obj: Any,
batch_id: str,
batch_info: dict[str, Any],
) -> None:
"""Process files with controlled concurrency and chunking."""
semaphore = asyncio.Semaphore(MAX_CONCURRENT_FILES_PER_BATCH)
async def process_single_file(file_id: str) -> tuple[str, bool]:
"""Process a single file with concurrency control."""
async with semaphore:
try:
vector_store_file_object = await self.openai_attach_file_to_vector_store(
vector_store_id=vector_store_id,
file_id=file_id,
attributes=attributes,
chunking_strategy=chunking_strategy_obj,
)
return file_id, vector_store_file_object.status == "completed"
except Exception as e:
logger.error(f"Failed to process file {file_id} in batch {batch_id}: {e}")
return file_id, False
# Process files in chunks to avoid creating too many tasks at once
total_files = len(file_ids)
for chunk_start in range(0, total_files, FILE_BATCH_CHUNK_SIZE):
chunk_end = min(chunk_start + FILE_BATCH_CHUNK_SIZE, total_files)
chunk = file_ids[chunk_start:chunk_end]
chunk_num = chunk_start // FILE_BATCH_CHUNK_SIZE + 1
total_chunks = (total_files + FILE_BATCH_CHUNK_SIZE - 1) // FILE_BATCH_CHUNK_SIZE
logger.info(
f"Processing chunk {chunk_num} of {total_chunks} ({len(chunk)} files, {chunk_start + 1}-{chunk_end} of {total_files} total files)"
)
async with asyncio.TaskGroup() as tg:
chunk_tasks = [tg.create_task(process_single_file(file_id)) for file_id in chunk]
chunk_results = [task.result() for task in chunk_tasks]
# Update counts after each chunk for progressive feedback
for _, success in chunk_results:
self._update_file_counts(batch_info, success=success)
# Save progress after each chunk
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
def _update_file_counts(self, batch_info: dict[str, Any], success: bool) -> None:
"""Update file counts based on processing result."""
if success:
batch_info["file_counts"]["completed"] += 1
else:
batch_info["file_counts"]["failed"] += 1
batch_info["file_counts"]["in_progress"] -= 1
def _update_batch_status(self, batch_info: dict[str, Any]) -> None:
"""Update final batch status based on file processing results."""
if batch_info["file_counts"]["failed"] == 0:
batch_info["status"] = "completed"
elif batch_info["file_counts"]["completed"] == 0:
batch_info["status"] = "failed"
else:
batch_info["status"] = "completed" # Partial success counts as completed
async def _process_file_batch_async(
self,
batch_id: str,
batch_info: dict[str, Any],
override_file_ids: list[str] | None = None,
) -> None:
"""Process files in a batch asynchronously in the background."""
file_ids = override_file_ids if override_file_ids is not None else batch_info["file_ids"]
attributes = batch_info["attributes"]
chunking_strategy = batch_info["chunking_strategy"]
vector_store_id = batch_info["vector_store_id"]
chunking_strategy_adapter: TypeAdapter[VectorStoreChunkingStrategy] = TypeAdapter(VectorStoreChunkingStrategy)
chunking_strategy_obj = chunking_strategy_adapter.validate_python(chunking_strategy)
try:
# Process all files with controlled concurrency
await self._process_files_with_concurrency(
file_ids=file_ids,
vector_store_id=vector_store_id,
attributes=attributes,
chunking_strategy_obj=chunking_strategy_obj,
batch_id=batch_id,
batch_info=batch_info,
)
# Update final batch status
self._update_batch_status(batch_info)
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
logger.info(f"File batch {batch_id} processing completed with status: {batch_info['status']}")
except asyncio.CancelledError:
logger.info(f"File batch {batch_id} processing was cancelled")
# Clean up task reference if it still exists
self._file_batch_tasks.pop(batch_id, None)
raise # Re-raise to ensure proper cancellation propagation
finally:
# Always clean up task reference when processing ends
self._file_batch_tasks.pop(batch_id, None)
def _get_and_validate_batch(self, batch_id: str, vector_store_id: str) -> dict[str, Any]:
"""Get and validate batch exists and belongs to vector store."""
if vector_store_id not in self.openai_vector_stores:
raise VectorStoreNotFoundError(vector_store_id)
if batch_id not in self.openai_file_batches:
raise ValueError(f"File batch {batch_id} not found")
batch_info = self.openai_file_batches[batch_id]
# Check if batch has expired (read-only check)
expires_at = batch_info.get("expires_at")
if expires_at:
current_time = int(time.time())
if current_time > expires_at:
raise ValueError(f"File batch {batch_id} has expired after 7 days from creation")
if batch_info["vector_store_id"] != vector_store_id:
raise ValueError(f"File batch {batch_id} does not belong to vector store {vector_store_id}")
return batch_info
def _paginate_objects(
self,
objects: list[Any],
limit: int | None = 20,
after: str | None = None,
before: str | None = None,
) -> tuple[list[Any], bool, str | None, str | None]:
"""Apply pagination to a list of objects with id fields."""
limit = min(limit or 20, 100) # Cap at 100 as per OpenAI
# Find start index
start_idx = 0
if after:
for i, obj in enumerate(objects):
if obj.id == after:
start_idx = i + 1
break
# Find end index
end_idx = start_idx + limit
if before:
for i, obj in enumerate(objects[start_idx:], start_idx):
if obj.id == before:
end_idx = i
break
# Apply pagination
paginated_objects = objects[start_idx:end_idx]
# Determine pagination info
has_more = end_idx < len(objects)
first_id = paginated_objects[0].id if paginated_objects else None
last_id = paginated_objects[-1].id if paginated_objects else None
return paginated_objects, has_more, first_id, last_id
async def openai_retrieve_vector_store_file_batch(
self,
batch_id: str,
vector_store_id: str,
) -> VectorStoreFileBatchObject:
"""Retrieve a vector store file batch."""
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
return VectorStoreFileBatchObject(**batch_info)
async def openai_list_files_in_vector_store_file_batch(
self,
@ -841,15 +1201,39 @@ class OpenAIVectorStoreMixin(ABC):
order: str | None = "desc",
) -> VectorStoreFilesListInBatchResponse:
"""Returns a list of vector store files in a batch."""
raise NotImplementedError("openai_list_files_in_vector_store_file_batch is not implemented yet")
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
batch_file_ids = batch_info["file_ids"]
async def openai_retrieve_vector_store_file_batch(
self,
batch_id: str,
vector_store_id: str,
) -> VectorStoreFileBatchObject:
"""Retrieve a vector store file batch."""
raise NotImplementedError("openai_retrieve_vector_store_file_batch is not implemented yet")
# Load file objects for files in this batch
batch_file_objects = []
for file_id in batch_file_ids:
try:
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
file_object = VectorStoreFileObject(**file_info)
# Apply status filter if provided
if filter and file_object.status != filter:
continue
batch_file_objects.append(file_object)
except Exception as e:
logger.warning(f"Could not load file {file_id} from batch {batch_id}: {e}")
continue
# Sort by created_at
reverse_order = order == "desc"
batch_file_objects.sort(key=lambda x: x.created_at, reverse=reverse_order)
# Apply pagination using helper
paginated_files, has_more, first_id, last_id = self._paginate_objects(batch_file_objects, limit, after, before)
return VectorStoreFilesListInBatchResponse(
data=paginated_files,
first_id=first_id,
last_id=last_id,
has_more=has_more,
)
async def openai_cancel_vector_store_file_batch(
self,
@ -857,4 +1241,24 @@ class OpenAIVectorStoreMixin(ABC):
vector_store_id: str,
) -> VectorStoreFileBatchObject:
"""Cancel a vector store file batch."""
raise NotImplementedError("openai_cancel_vector_store_file_batch is not implemented yet")
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
if batch_info["status"] not in ["in_progress"]:
raise ValueError(f"Cannot cancel batch {batch_id} with status {batch_info['status']}")
# Cancel the actual processing task if it exists
if batch_id in self._file_batch_tasks:
task = self._file_batch_tasks[batch_id]
if not task.done():
task.cancel()
logger.info(f"Cancelled processing task for file batch: {batch_id}")
# Remove from task tracking
del self._file_batch_tasks[batch_id]
batch_info["status"] = "cancelled"
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
updated_batch = VectorStoreFileBatchObject(**batch_info)
return updated_batch