mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-07-02 20:40:36 +00:00
using a property for Chunk.chunk_id
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
This commit is contained in:
parent
f90fce218e
commit
fa36b672f1
10 changed files with 163 additions and 86 deletions
|
@ -8,6 +8,7 @@
|
|||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
import uuid
|
||||
from typing import Annotated, Any, Literal, Protocol, runtime_checkable
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
@ -15,6 +16,7 @@ from pydantic import BaseModel, Field
|
|||
from llama_stack.apis.inference import InterleavedContent
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.providers.utils.telemetry.trace_protocol import trace_protocol
|
||||
from llama_stack.providers.utils.vector_io.chunk_utils import generate_chunk_id
|
||||
from llama_stack.schema_utils import json_schema_type, webmethod
|
||||
from llama_stack.strong_typing.schema import register_schema
|
||||
|
||||
|
@ -23,10 +25,12 @@ from llama_stack.strong_typing.schema import register_schema
|
|||
class ChunkMetadata(BaseModel):
|
||||
"""
|
||||
`ChunkMetadata` is backend metadata for a `Chunk` that is used to store additional information about the chunk that
|
||||
will NOT be inserted into the context during inference, but is required for backend functionality.
|
||||
Use `metadata` in `Chunk` for metadata that will be used during inference.
|
||||
will not be used in the context during inference, but is required for backend functionality. The `ChunkMetadata`
|
||||
is set during chunk creation in `MemoryToolRuntimeImpl().insert()`and is not expected to change after.
|
||||
Use `Chunk.metadata` for metadata that will be used in the context during inference.
|
||||
:param chunk_id: The ID of the chunk. If not set, it will be generated based on the document ID and content.
|
||||
:param document_id: The ID of the document this chunk belongs to.
|
||||
:param source: The source of the content, such as a URL or file path.
|
||||
:param source: The source of the content, such as a URL, file path, or other identifier.
|
||||
:param created_timestamp: An optional timestamp indicating when the chunk was created.
|
||||
:param updated_timestamp: An optional timestamp indicating when the chunk was last updated.
|
||||
:param chunk_window: The window of the chunk, which can be used to group related chunks together.
|
||||
|
@ -37,8 +41,8 @@ class ChunkMetadata(BaseModel):
|
|||
:param metadata_token_count: The number of tokens in the metadata of the chunk.
|
||||
"""
|
||||
|
||||
chunk_id: str = None
|
||||
document_id: str | None = None
|
||||
chunk_id: str | None = None
|
||||
source: str | None = None
|
||||
created_timestamp: int | None = None
|
||||
updated_timestamp: int | None = None
|
||||
|
@ -56,16 +60,37 @@ class Chunk(BaseModel):
|
|||
A chunk of content that can be inserted into a vector database.
|
||||
:param content: The content of the chunk, which can be interleaved text, images, or other types.
|
||||
:param embedding: Optional embedding for the chunk. If not provided, it will be computed later.
|
||||
:param metadata: Metadata associated with the chunk that will be used during inference.
|
||||
:param chunk_metadata: Metadata for the chunk that will NOT be inserted into the context during inference
|
||||
that is required backend functionality.
|
||||
:param metadata: Metadata associated with the chunk that will be used in the model context during inference.
|
||||
:param stored_chunk_id: The chunk ID that is stored in the vector database. Used for backend functionality.
|
||||
:param chunk_metadata: Metadata for the chunk that will NOT be used in the context during inference.
|
||||
The `chunk_metadata` is required backend functionality.
|
||||
"""
|
||||
|
||||
content: InterleavedContent
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
embedding: list[float] | None = None
|
||||
# The alias parameter serializes the field as "chunk_id" in JSON but keeps the internal name as "stored_chunk_id"
|
||||
stored_chunk_id: str | None = Field(default=None, alias="chunk_id")
|
||||
chunk_metadata: ChunkMetadata | None = None
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
def model_post_init(self, __context):
|
||||
# Extract chunk_id from metadata if present
|
||||
if self.metadata and "chunk_id" in self.metadata:
|
||||
self.stored_chunk_id = self.metadata.pop("chunk_id")
|
||||
|
||||
@property
|
||||
def chunk_id(self) -> str:
|
||||
"""Returns the chunk ID, which is either an input `chunk_id` or a generated one if not set."""
|
||||
if self.stored_chunk_id:
|
||||
return self.stored_chunk_id
|
||||
|
||||
if "document_id" in self.metadata:
|
||||
return generate_chunk_id(self.metadata["document_id"], str(self.content))
|
||||
|
||||
return generate_chunk_id(str(uuid.uuid4()), str(self.content))
|
||||
|
||||
|
||||
@json_schema_type
|
||||
class QueryChunksResponse(BaseModel):
|
||||
|
|
|
@ -81,6 +81,7 @@ class MemoryToolRuntimeImpl(ToolGroupsProtocolPrivate, ToolRuntime, RAGToolRunti
|
|||
chunks = []
|
||||
for doc in documents:
|
||||
content = await content_from_doc(doc)
|
||||
# TODO: we should add enrichment here as URLs won't be added to the metadata by default
|
||||
chunks.extend(
|
||||
make_overlapped_chunks(
|
||||
doc.document_id,
|
||||
|
@ -161,18 +162,19 @@ class MemoryToolRuntimeImpl(ToolGroupsProtocolPrivate, ToolRuntime, RAGToolRunti
|
|||
break
|
||||
|
||||
metadata_fields_to_exclude_from_context = [
|
||||
"chunk_tokenizer",
|
||||
"chunk_window",
|
||||
"token_count",
|
||||
"metadata_token_count",
|
||||
"chunk_tokenizer",
|
||||
"chunk_embedding_model",
|
||||
"created_timestamp",
|
||||
"updated_timestamp",
|
||||
"chunk_window",
|
||||
"chunk_tokenizer",
|
||||
"chunk_embedding_model",
|
||||
"chunk_embedding_dimension",
|
||||
"token_count",
|
||||
"content_token_count",
|
||||
"metadata_token_count",
|
||||
]
|
||||
metadata_subset = {k: v for k, v in metadata.items() if k not in metadata_fields_to_exclude_from_context}
|
||||
metadata_subset = {
|
||||
k: v for k, v in metadata.items() if k not in metadata_fields_to_exclude_from_context and v
|
||||
}
|
||||
text_content = query_config.chunk_template.format(index=i + 1, chunk=chunk, metadata=metadata_subset)
|
||||
picked.append(TextContentItem(text=text_content))
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ from llama_stack.providers.utils.memory.vector_store import (
|
|||
EmbeddingIndex,
|
||||
VectorDBWithIndex,
|
||||
)
|
||||
from llama_stack.providers.utils.vector_io.chunk_utils import extract_or_generate_chunk_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -200,9 +199,7 @@ class SQLiteVecIndex(EmbeddingIndex):
|
|||
batch_embeddings = embeddings[i : i + batch_size]
|
||||
|
||||
# Insert metadata
|
||||
metadata_data = [
|
||||
(extract_or_generate_chunk_id(chunk), chunk.model_dump_json()) for chunk in batch_chunks
|
||||
]
|
||||
metadata_data = [(chunk.chunk_id, chunk.model_dump_json()) for chunk in batch_chunks]
|
||||
cur.executemany(
|
||||
f"""
|
||||
INSERT INTO {self.metadata_table} (id, chunk)
|
||||
|
@ -216,7 +213,7 @@ class SQLiteVecIndex(EmbeddingIndex):
|
|||
embedding_data = [
|
||||
(
|
||||
(
|
||||
extract_or_generate_chunk_id(chunk),
|
||||
chunk.chunk_id,
|
||||
serialize_vector(emb.tolist()),
|
||||
)
|
||||
)
|
||||
|
@ -228,7 +225,7 @@ class SQLiteVecIndex(EmbeddingIndex):
|
|||
)
|
||||
|
||||
# Insert FTS content
|
||||
fts_data = [(extract_or_generate_chunk_id(chunk), chunk.content) for chunk in batch_chunks]
|
||||
fts_data = [(chunk.chunk_id, chunk.content) for chunk in batch_chunks]
|
||||
# DELETE existing entries with same IDs (FTS5 doesn't support ON CONFLICT)
|
||||
cur.executemany(
|
||||
f"DELETE FROM {self.fts_table} WHERE id = ?;",
|
||||
|
@ -376,13 +373,12 @@ class SQLiteVecIndex(EmbeddingIndex):
|
|||
vector_response = await self.query_vector(embedding, k, score_threshold)
|
||||
keyword_response = await self.query_keyword(query_string, k, score_threshold)
|
||||
|
||||
# Convert responses to score dictionaries using generate_chunk_id
|
||||
# Convert responses to score dictionaries using chunk_id
|
||||
vector_scores = {
|
||||
extract_or_generate_chunk_id(chunk): score
|
||||
for chunk, score in zip(vector_response.chunks, vector_response.scores, strict=False)
|
||||
chunk.chunk_id: score for chunk, score in zip(vector_response.chunks, vector_response.scores, strict=False)
|
||||
}
|
||||
keyword_scores = {
|
||||
extract_or_generate_chunk_id(chunk): score
|
||||
chunk.chunk_id: score
|
||||
for chunk, score in zip(keyword_response.chunks, keyword_response.scores, strict=False)
|
||||
}
|
||||
|
||||
|
@ -405,10 +401,10 @@ class SQLiteVecIndex(EmbeddingIndex):
|
|||
# Create a map of chunk_id to chunk for both responses
|
||||
chunk_map = {}
|
||||
for c in vector_response.chunks:
|
||||
chunk_id = extract_or_generate_chunk_id(c)
|
||||
chunk_id = c.chunk_id
|
||||
chunk_map[chunk_id] = c
|
||||
for c in keyword_response.chunks:
|
||||
chunk_id = extract_or_generate_chunk_id(c)
|
||||
chunk_id = c.chunk_id
|
||||
chunk_map[chunk_id] = c
|
||||
|
||||
# Use the map to look up chunks by their IDs
|
||||
|
|
|
@ -151,9 +151,6 @@ def make_overlapped_chunks(
|
|||
document_id: str, text: str, window_len: int, overlap_len: int, metadata: dict[str, Any]
|
||||
) -> list[Chunk]:
|
||||
default_tokenizer = "DEFAULT_TIKTOKEN_TOKENIZER"
|
||||
default_embedding_model = (
|
||||
"DEFAULT_EMBEDDING_MODEL" # This will be correctly updated in `VectorDBWithIndex.insert_chunks`
|
||||
)
|
||||
tokenizer = Tokenizer.get_instance()
|
||||
tokens = tokenizer.encode(text, bos=False, eos=False)
|
||||
try:
|
||||
|
@ -167,20 +164,22 @@ def make_overlapped_chunks(
|
|||
for i in range(0, len(tokens), window_len - overlap_len):
|
||||
toks = tokens[i : i + window_len]
|
||||
chunk = tokenizer.decode(toks)
|
||||
chunk_id = generate_chunk_id(chunk, text)
|
||||
chunk_metadata = metadata.copy()
|
||||
chunk_metadata["chunk_id"] = chunk_id
|
||||
chunk_metadata["document_id"] = document_id
|
||||
chunk_metadata["token_count"] = len(toks)
|
||||
chunk_metadata["metadata_token_count"] = len(metadata_tokens)
|
||||
|
||||
backend_chunk_metadata = ChunkMetadata(
|
||||
chunk_id=chunk_id,
|
||||
document_id=document_id,
|
||||
chunk_id=generate_chunk_id(chunk, text),
|
||||
source=metadata.get("source", None),
|
||||
created_timestamp=metadata.get("created_timestamp", int(time.time())),
|
||||
updated_timestamp=int(time.time()),
|
||||
chunk_window=f"{i}-{i + len(toks)}",
|
||||
chunk_tokenizer=default_tokenizer,
|
||||
chunk_embedding_model=default_embedding_model,
|
||||
chunk_embedding_model=None, # This will be set in `VectorDBWithIndex.insert_chunks`
|
||||
content_token_count=len(toks),
|
||||
metadata_token_count=len(metadata_tokens),
|
||||
)
|
||||
|
@ -255,13 +254,12 @@ class VectorDBWithIndex:
|
|||
) -> None:
|
||||
chunks_to_embed = []
|
||||
for i, c in enumerate(chunks):
|
||||
# this should be done in `make_overlapped_chunks` but we do it here for convenience
|
||||
if c.embedding is None:
|
||||
chunks_to_embed.append(c)
|
||||
else:
|
||||
if c.chunk_metadata:
|
||||
c.chunk_metadata.chunk_embedding_model = self.vector_db.embedding_model
|
||||
c.chunk_metadata.chunk_embedding_dimension = self.vector_db.embedding_dimension
|
||||
else:
|
||||
_validate_embedding(c.embedding, i, self.vector_db.embedding_dimension)
|
||||
|
||||
if chunks_to_embed:
|
||||
|
|
|
@ -5,38 +5,10 @@
|
|||
# the root directory of this source tree.
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from llama_stack.apis.vector_io import Chunk
|
||||
|
||||
|
||||
def generate_chunk_id(document_id: str, chunk_text: str) -> str:
|
||||
"""Generate a unique chunk ID using a hash of document ID and chunk text."""
|
||||
hash_input = f"{document_id}:{chunk_text}".encode()
|
||||
return str(uuid.UUID(hashlib.md5(hash_input).hexdigest()))
|
||||
|
||||
|
||||
def extract_chunk_id_from_metadata(chunk: Chunk) -> str | None:
|
||||
"""Extract existing chunk ID from metadata. This is for compatibility with older Chunks
|
||||
that stored the document_id in the metadata and not in the ChunkMetadata."""
|
||||
if chunk.chunk_metadata is not None and hasattr(chunk.chunk_metadata, "chunk_id"):
|
||||
return chunk.chunk_metadata.chunk_id
|
||||
|
||||
if "chunk_id" in chunk.metadata:
|
||||
return str(chunk.metadata["chunk_id"])
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def extract_or_generate_chunk_id(chunk: Chunk) -> str:
|
||||
"""Extract existing chunk ID or generate a new one if not present. This is for compatibility with older Chunks
|
||||
that stored the document_id in the metadata."""
|
||||
stored_chunk_id = extract_chunk_id_from_metadata(chunk)
|
||||
if stored_chunk_id:
|
||||
return stored_chunk_id
|
||||
elif "document_id" in chunk.metadata:
|
||||
return generate_chunk_id(chunk.metadata["document_id"], str(chunk.content))
|
||||
else:
|
||||
logging.warning("Chunk has no ID or document_id in metadata. Generating random ID.")
|
||||
return str(uuid.uuid4())
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue