mirror of
				https://github.com/meta-llama/llama-stack.git
				synced 2025-10-25 09:05:37 +00:00 
			
		
		
		
	
		
			Some checks failed
		
		
	
	Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 1s
				
			SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 0s
				
			Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
				
			Python Package Build Test / build (3.13) (push) Failing after 1s
				
			SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 4s
				
			Integration Tests (Replay) / Integration Tests (, , , client=, ) (push) Failing after 4s
				
			Vector IO Integration Tests / test-matrix (push) Failing after 4s
				
			Python Package Build Test / build (3.12) (push) Failing after 3s
				
			Test External API and Providers / test-external (venv) (push) Failing after 4s
				
			Unit Tests / unit-tests (3.12) (push) Failing after 4s
				
			Unit Tests / unit-tests (3.13) (push) Failing after 3s
				
			UI Tests / ui-tests (22) (push) Successful in 33s
				
			Pre-commit / pre-commit (push) Successful in 1m15s
				
			Our integration tests need to be 'grouped' because each group often needs a specific set of models it works with. We separated vision tests due to this, and we have a separate set of tests which test "Responses" API. This PR makes this system a bit more official so it is very easy to target these groups and apply all testing infrastructure towards all the groups (for example, record-replay) uniformly. There are three suites declared: - base - vision - responses Note that our CI currently runs the "base" and "vision" suites. You can use the `--suite` option when running pytest (or any of the testing scripts or workflows.) For example: ``` OLLAMA_URL=http://localhost:11434 \ pytest -s -v tests/integration/ --stack-config starter --suite vision ```
		
			
				
	
	
		
			145 lines
		
	
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			145 lines
		
	
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) Meta Platforms, Inc. and affiliates.
 | |
| # All rights reserved.
 | |
| #
 | |
| # This source code is licensed under the terms described in the LICENSE file in
 | |
| # the root directory of this source tree.
 | |
| 
 | |
| from typing import Any
 | |
| 
 | |
| 
 | |
| class StreamingValidator:
 | |
|     """Helper class for validating streaming response events."""
 | |
| 
 | |
|     def __init__(self, chunks: list[Any]):
 | |
|         self.chunks = chunks
 | |
|         self.event_types = [chunk.type for chunk in chunks]
 | |
| 
 | |
|     def assert_basic_event_sequence(self):
 | |
|         """Verify basic created -> completed event sequence."""
 | |
|         assert len(self.chunks) >= 2, f"Expected at least 2 chunks (created + completed), got {len(self.chunks)}"
 | |
|         assert self.chunks[0].type == "response.created", (
 | |
|             f"First chunk should be response.created, got {self.chunks[0].type}"
 | |
|         )
 | |
|         assert self.chunks[-1].type == "response.completed", (
 | |
|             f"Last chunk should be response.completed, got {self.chunks[-1].type}"
 | |
|         )
 | |
| 
 | |
|         # Verify event order
 | |
|         created_index = self.event_types.index("response.created")
 | |
|         completed_index = self.event_types.index("response.completed")
 | |
|         assert created_index < completed_index, "response.created should come before response.completed"
 | |
| 
 | |
|     def assert_response_consistency(self):
 | |
|         """Verify response ID consistency across events."""
 | |
|         response_ids = set()
 | |
|         for chunk in self.chunks:
 | |
|             if hasattr(chunk, "response_id"):
 | |
|                 response_ids.add(chunk.response_id)
 | |
|             elif hasattr(chunk, "response") and hasattr(chunk.response, "id"):
 | |
|                 response_ids.add(chunk.response.id)
 | |
| 
 | |
|         assert len(response_ids) == 1, f"All events should reference the same response_id, found: {response_ids}"
 | |
| 
 | |
|     def assert_has_incremental_content(self):
 | |
|         """Verify that content is delivered incrementally via delta events."""
 | |
|         delta_events = [
 | |
|             i for i, event_type in enumerate(self.event_types) if event_type == "response.output_text.delta"
 | |
|         ]
 | |
|         assert len(delta_events) > 0, "Expected delta events for true incremental streaming, but found none"
 | |
| 
 | |
|         # Verify delta events have content
 | |
|         non_empty_deltas = 0
 | |
|         delta_content_total = ""
 | |
| 
 | |
|         for delta_idx in delta_events:
 | |
|             chunk = self.chunks[delta_idx]
 | |
|             if hasattr(chunk, "delta") and chunk.delta:
 | |
|                 delta_content_total += chunk.delta
 | |
|                 non_empty_deltas += 1
 | |
| 
 | |
|         assert non_empty_deltas > 0, "Delta events found but none contain content"
 | |
|         assert len(delta_content_total) > 0, "Delta events found but total delta content is empty"
 | |
| 
 | |
|         return delta_content_total
 | |
| 
 | |
|     def assert_content_quality(self, expected_content: str):
 | |
|         """Verify the final response contains expected content."""
 | |
|         final_chunk = self.chunks[-1]
 | |
|         if hasattr(final_chunk, "response"):
 | |
|             output_text = final_chunk.response.output_text.lower().strip()
 | |
|             assert len(output_text) > 0, "Response should have content"
 | |
|             assert expected_content.lower() in output_text, f"Expected '{expected_content}' in response"
 | |
| 
 | |
|     def assert_has_tool_calls(self):
 | |
|         """Verify tool call streaming events are present."""
 | |
|         # Check for tool call events
 | |
|         delta_events = [
 | |
|             chunk
 | |
|             for chunk in self.chunks
 | |
|             if chunk.type in ["response.function_call_arguments.delta", "response.mcp_call.arguments.delta"]
 | |
|         ]
 | |
|         done_events = [
 | |
|             chunk
 | |
|             for chunk in self.chunks
 | |
|             if chunk.type in ["response.function_call_arguments.done", "response.mcp_call.arguments.done"]
 | |
|         ]
 | |
| 
 | |
|         assert len(delta_events) > 0, f"Expected tool call delta events, got chunk types: {self.event_types}"
 | |
|         assert len(done_events) > 0, f"Expected tool call done events, got chunk types: {self.event_types}"
 | |
| 
 | |
|         # Verify output item events
 | |
|         item_added_events = [chunk for chunk in self.chunks if chunk.type == "response.output_item.added"]
 | |
|         item_done_events = [chunk for chunk in self.chunks if chunk.type == "response.output_item.done"]
 | |
| 
 | |
|         assert len(item_added_events) > 0, (
 | |
|             f"Expected response.output_item.added events, got chunk types: {self.event_types}"
 | |
|         )
 | |
|         assert len(item_done_events) > 0, (
 | |
|             f"Expected response.output_item.done events, got chunk types: {self.event_types}"
 | |
|         )
 | |
| 
 | |
|     def assert_has_mcp_events(self):
 | |
|         """Verify MCP-specific streaming events are present."""
 | |
|         # Tool execution progress events
 | |
|         mcp_in_progress_events = [chunk for chunk in self.chunks if chunk.type == "response.mcp_call.in_progress"]
 | |
|         mcp_completed_events = [chunk for chunk in self.chunks if chunk.type == "response.mcp_call.completed"]
 | |
| 
 | |
|         assert len(mcp_in_progress_events) > 0, (
 | |
|             f"Expected response.mcp_call.in_progress events, got chunk types: {self.event_types}"
 | |
|         )
 | |
|         assert len(mcp_completed_events) > 0, (
 | |
|             f"Expected response.mcp_call.completed events, got chunk types: {self.event_types}"
 | |
|         )
 | |
| 
 | |
|         # MCP list tools events
 | |
|         mcp_list_tools_in_progress_events = [
 | |
|             chunk for chunk in self.chunks if chunk.type == "response.mcp_list_tools.in_progress"
 | |
|         ]
 | |
|         mcp_list_tools_completed_events = [
 | |
|             chunk for chunk in self.chunks if chunk.type == "response.mcp_list_tools.completed"
 | |
|         ]
 | |
| 
 | |
|         assert len(mcp_list_tools_in_progress_events) > 0, (
 | |
|             f"Expected response.mcp_list_tools.in_progress events, got chunk types: {self.event_types}"
 | |
|         )
 | |
|         assert len(mcp_list_tools_completed_events) > 0, (
 | |
|             f"Expected response.mcp_list_tools.completed events, got chunk types: {self.event_types}"
 | |
|         )
 | |
| 
 | |
|     def assert_rich_streaming(self, min_chunks: int = 10):
 | |
|         """Verify we have substantial streaming activity."""
 | |
|         assert len(self.chunks) > min_chunks, (
 | |
|             f"Expected rich streaming with many events, got only {len(self.chunks)} chunks"
 | |
|         )
 | |
| 
 | |
|     def validate_event_structure(self):
 | |
|         """Validate the structure of various event types."""
 | |
|         for chunk in self.chunks:
 | |
|             if chunk.type == "response.created":
 | |
|                 assert chunk.response.status == "in_progress"
 | |
|             elif chunk.type == "response.completed":
 | |
|                 assert chunk.response.status == "completed"
 | |
|             elif hasattr(chunk, "item_id"):
 | |
|                 assert chunk.item_id, "Events with item_id should have non-empty item_id"
 | |
|             elif hasattr(chunk, "sequence_number"):
 | |
|                 assert isinstance(chunk.sequence_number, int), "sequence_number should be an integer"
 |