mirror of
				https://github.com/meta-llama/llama-stack.git
				synced 2025-10-26 01:12:59 +00:00 
			
		
		
		
	# What does this PR do? <!-- Provide a short summary of what this PR does and why. Link to relevant issues if applicable. --> This PR adds static type coverage to `llama-stack` Part of https://github.com/meta-llama/llama-stack/issues/2647 <!-- If resolving an issue, uncomment and update the line below --> <!-- Closes #[issue-number] --> ## Test Plan <!-- Describe the tests you ran to verify your changes with result summaries. *Provide clear instructions so the plan can be easily re-executed.* --> Signed-off-by: Mustafa Elbehery <melbeher@redhat.com>
		
			
				
	
	
		
			52 lines
		
	
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			52 lines
		
	
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) Meta Platforms, Inc. and affiliates.
 | |
| # All rights reserved.
 | |
| #
 | |
| # This source code is licensed under the terms described in the LICENSE file in
 | |
| # the root directory of this source tree.
 | |
| 
 | |
| import asyncio
 | |
| import logging
 | |
| 
 | |
| from llama_stack.apis.inference import Message
 | |
| from llama_stack.apis.safety import Safety, SafetyViolation, ViolationLevel
 | |
| from llama_stack.providers.utils.telemetry import tracing
 | |
| 
 | |
| log = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class SafetyException(Exception):  # noqa: N818
 | |
|     def __init__(self, violation: SafetyViolation):
 | |
|         self.violation = violation
 | |
|         super().__init__(violation.user_message)
 | |
| 
 | |
| 
 | |
| class ShieldRunnerMixin:
 | |
|     def __init__(
 | |
|         self,
 | |
|         safety_api: Safety,
 | |
|         input_shields: list[str] | None = None,
 | |
|         output_shields: list[str] | None = None,
 | |
|     ):
 | |
|         self.safety_api = safety_api
 | |
|         self.input_shields = input_shields
 | |
|         self.output_shields = output_shields
 | |
| 
 | |
|     async def run_multiple_shields(self, messages: list[Message], identifiers: list[str]) -> None:
 | |
|         async def run_shield_with_span(identifier: str):
 | |
|             async with tracing.span(f"run_shield_{identifier}"):
 | |
|                 return await self.safety_api.run_shield(
 | |
|                     shield_id=identifier,
 | |
|                     messages=messages,
 | |
|                     params={},
 | |
|                 )
 | |
| 
 | |
|         responses = await asyncio.gather(*[run_shield_with_span(identifier) for identifier in identifiers])
 | |
|         for identifier, response in zip(identifiers, responses, strict=False):
 | |
|             if not response.violation:
 | |
|                 continue
 | |
| 
 | |
|             violation = response.violation
 | |
|             if violation.violation_level == ViolationLevel.ERROR:
 | |
|                 raise SafetyException(violation)
 | |
|             elif violation.violation_level == ViolationLevel.WARN:
 | |
|                 log.warning(f"[Warn]{identifier} raised a warning")
 |