Update the meta reference safety implementation to match new API

This commit is contained in:
Ashwin Bharambe 2024-09-20 14:17:44 -07:00
parent 93e4ef3829
commit 51245a417b
11 changed files with 115 additions and 130 deletions

View file

@ -4,51 +4,46 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
from typing import List
from llama_models.llama3.api.datatypes import Message, Role, UserMessage
from llama_models.llama3.api.datatypes import Message
from termcolor import cprint
from llama_stack.apis.safety import (
OnViolationAction,
Safety,
ShieldDefinition,
ShieldResponse,
)
from llama_stack.apis.safety import * # noqa: F403
class SafetyException(Exception): # noqa: N818
def __init__(self, response: ShieldResponse):
self.response = response
super().__init__(response.violation_return_message)
def __init__(self, violation: SafetyViolation):
self.violation = violation
super().__init__(violation.user_message)
class ShieldRunnerMixin:
def __init__(
self,
safety_api: Safety,
input_shields: List[ShieldDefinition] = None,
output_shields: List[ShieldDefinition] = None,
input_shields: List[str] = None,
output_shields: List[str] = None,
):
self.safety_api = safety_api
self.input_shields = input_shields
self.output_shields = output_shields
async def run_shields(
self, messages: List[Message], shields: List[ShieldDefinition]
) -> List[ShieldResponse]:
messages = messages.copy()
# some shields like llama-guard require the first message to be a user message
# since this might be a tool call, first role might not be user
if len(messages) > 0 and messages[0].role != Role.user.value:
messages[0] = UserMessage(content=messages[0].content)
results = await self.safety_api.run_shields(
messages=messages,
shields=shields,
async def run_shields(self, messages: List[Message], shields: List[str]) -> None:
responses = await asyncio.gather(
*[
self.safety_api.run_shield(
shield_type=shield_type,
messages=messages,
)
for shield_type in shields
]
)
for shield, r in zip(shields, results):
if r.is_violation:
for shield, r in zip(shields, responses):
if r.violation:
if shield.on_violation_action == OnViolationAction.RAISE:
raise SafetyException(r)
elif shield.on_violation_action == OnViolationAction.WARN:
@ -56,5 +51,3 @@ class ShieldRunnerMixin:
f"[Warn]{shield.__class__.__name__} raised a warning",
color="red",
)
return results