add tools to chat completion request

This commit is contained in:
Hardik Shah 2024-08-21 17:48:48 -07:00
parent 9777639a1c
commit 68855ed218
26 changed files with 558 additions and 226 deletions

View file

@ -110,35 +110,6 @@ class Session(BaseModel):
started_at: datetime
@json_schema_type
class ToolPromptFormat(Enum):
"""This Enum refers to the prompt format for calling zero shot tools
`json` --
Refers to the json format for calling tools.
The json format takes the form like
{
"type": "function",
"function" : {
"name": "function_name",
"description": "function_description",
"parameters": {...}
}
}
`function_tag` --
This is an example of how you could define
your own user defined format for making tool calls.
The function_tag format looks like this,
<function=function_name>(parameters)</function>
The detailed prompts for each of these formats are defined in `system_prompt.py`
"""
json = "json"
function_tag = "function_tag"
@json_schema_type
class AgenticSystemInstanceConfig(BaseModel):
instructions: str

View file

@ -56,10 +56,10 @@ from llama_toolchain.safety.api.datatypes import (
)
from llama_toolchain.agentic_system.api.endpoints import * # noqa
from llama_toolchain.tools.base import BaseTool
from llama_toolchain.tools.builtin import SingleMessageBuiltinTool
from .safety import SafetyException, ShieldRunnerMixin
from .system_prompt import get_agentic_prefix_messages
from .tools.base import BaseTool
from .tools.builtin import SingleMessageBuiltinTool
class AgentInstance(ShieldRunnerMixin):
@ -85,18 +85,6 @@ class AgentInstance(ShieldRunnerMixin):
self.inference_api = inference_api
self.safety_api = safety_api
if prefix_messages is not None and len(prefix_messages) > 0:
self.prefix_messages = prefix_messages
else:
self.prefix_messages = get_agentic_prefix_messages(
builtin_tools,
custom_tool_definitions,
tool_prompt_format,
)
for m in self.prefix_messages:
print(m.content)
self.max_infer_iters = max_infer_iters
self.tools_dict = {t.get_name(): t for t in builtin_tools}
@ -344,7 +332,7 @@ class AgentInstance(ShieldRunnerMixin):
stream: bool = False,
max_gen_len: Optional[int] = None,
) -> AsyncGenerator:
input_messages = preprocess_dialog(input_messages, self.prefix_messages)
input_messages = preprocess_dialog(input_messages)
attachments = []
@ -373,7 +361,8 @@ class AgentInstance(ShieldRunnerMixin):
req = ChatCompletionRequest(
model=self.model,
messages=input_messages,
available_tools=self.instance_config.available_tools,
tools=self.instance_config.available_tools,
tool_prompt_format=self.instance_config.tool_prompt_format,
stream=True,
sampling_params=SamplingParams(
temperature=temperature,
@ -601,14 +590,12 @@ def attachment_message(url: URL) -> ToolResponseMessage:
)
def preprocess_dialog(
messages: List[Message], prefix_messages: List[Message]
) -> List[Message]:
def preprocess_dialog(messages: List[Message]) -> List[Message]:
"""
Preprocesses the dialog by removing the system message and
adding the system message to the beginning of the dialog.
"""
ret = prefix_messages.copy()
ret = []
for m in messages:
if m.role == Role.system.value:

View file

@ -24,17 +24,17 @@ from llama_toolchain.agentic_system.api import (
AgenticSystemTurnCreateRequest,
)
from .agent_instance import AgentInstance
from .config import AgenticSystemConfig
from .tools.builtin import (
from llama_toolchain.tools.builtin import (
BraveSearchTool,
CodeInterpreterTool,
PhotogenTool,
WolframAlphaTool,
)
from .tools.safety import with_safety
from llama_toolchain.tools.safety import with_safety
from .agent_instance import AgentInstance
from .config import AgenticSystemConfig
logger = logging.getLogger()

View file

@ -1,180 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
import textwrap
from datetime import datetime
from typing import List
from llama_toolchain.agentic_system.api.datatypes import ToolPromptFormat
from llama_toolchain.inference.api import (
BuiltinTool,
Message,
SystemMessage,
ToolDefinition,
UserMessage,
)
from .tools.builtin import SingleMessageBuiltinTool
def get_agentic_prefix_messages(
builtin_tools: List[SingleMessageBuiltinTool],
custom_tools: List[ToolDefinition],
tool_prompt_format: ToolPromptFormat,
) -> List[Message]:
messages = []
content = ""
if builtin_tools:
content += "Environment: ipython\n"
tool_str = ", ".join(
[
t.get_name()
for t in builtin_tools
if t.get_name() != BuiltinTool.code_interpreter.value
]
)
if tool_str:
content += f"Tools: {tool_str}"
current_date = datetime.now()
formatted_date = current_date.strftime("%d %B %Y")
date_str = f"""
Cutting Knowledge Date: December 2023
Today Date: {formatted_date}\n"""
content += date_str
messages.append(SystemMessage(content=content))
if custom_tools:
if tool_prompt_format == ToolPromptFormat.function_tag:
text = prompt_for_function_tag(custom_tools)
messages.append(UserMessage(content=text))
elif tool_prompt_format == ToolPromptFormat.json:
text = prompt_for_json(custom_tools)
messages.append(UserMessage(content=text))
else:
raise NotImplementedError(
f"Tool prompt format {tool_prompt_format} is not supported"
)
else:
messages.append(SystemMessage(content=content))
return messages
def prompt_for_json(custom_tools: List[ToolDefinition]) -> str:
tool_defs = "\n".join(
translate_custom_tool_definition_to_json(t) for t in custom_tools
)
content = textwrap.dedent(
"""
Answer the user's question by making use of the following functions if needed.
If none of the function can be used, please say so.
Here is a list of functions in JSON format:
{tool_defs}
Return function calls in JSON format.
"""
)
content = content.lstrip("\n").format(tool_defs=tool_defs)
return content
def prompt_for_function_tag(custom_tools: List[ToolDefinition]) -> str:
custom_tool_params = ""
for t in custom_tools:
custom_tool_params += get_instruction_string(t) + "\n"
custom_tool_params += get_parameters_string(t) + "\n\n"
content = f"""
You have access to the following functions:
{custom_tool_params}
Think very carefully before calling functions.
If you choose to call a function ONLY reply in the following format with no prefix or suffix:
<function=example_function_name>{{"example_name": "example_value"}}</function>
Reminder:
- If looking for real time information use relevant functions before falling back to brave_search
- Function calls MUST follow the specified format, start with <function= and end with </function>
- Required parameters MUST be specified
- Only call one function at a time
- Put the entire function call reply on one line
"""
return content
def get_instruction_string(custom_tool_definition) -> str:
return f"Use the function '{custom_tool_definition.tool_name}' to '{custom_tool_definition.description}'"
def get_parameters_string(custom_tool_definition) -> str:
return json.dumps(
{
"name": custom_tool_definition.tool_name,
"description": custom_tool_definition.description,
"parameters": {
name: definition.__dict__
for name, definition in custom_tool_definition.parameters.items()
},
}
)
def translate_custom_tool_definition_to_json(tool_def):
"""Translates ToolDefinition to json as expected by model
eg. output for a function
{
"type": "function",
"function": {
"name": "conv_int",
"description": "Convert serialized fract24 integer into int value.",
"parameters": {
"type": "object",
"properties": [
{
"data": {
"type": "object",
"description": ""
}
}
],
"required": ["data"]
}
}
}
"""
assert isinstance(tool_def.tool_name, str)
func_def = {"type": "function", "function": {}}
func_def["function"]["name"] = tool_def.tool_name
func_def["function"]["description"] = tool_def.description or ""
if tool_def.parameters:
required = []
properties = []
for p_name, p_def in tool_def.parameters.items():
properties.append(
{
p_name: {
# TODO: see if this should not always be object
"type": "object",
"description": p_def.description or "",
}
}
)
if p_def.required:
required.append(p_name)
func_def["function"]["parameters"] = {
"type": "object",
"properties": properties,
"required": required,
}
else:
func_def["function"]["parameters"] = {}
return json.dumps(func_def, indent=4)

View file

@ -1,5 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -1,20 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from abc import ABC, abstractmethod
from typing import List
from llama_toolchain.inference.api import Message
class BaseTool(ABC):
@abstractmethod
def get_name(self) -> str:
raise NotImplementedError
@abstractmethod
async def run(self, messages: List[Message]) -> List[Message]:
raise NotImplementedError

View file

@ -1,322 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
import os
import re
from abc import abstractmethod
from typing import List, Optional
import requests
from termcolor import cprint
from .ipython_tool.code_execution import (
CodeExecutionContext,
CodeExecutionRequest,
CodeExecutor,
TOOLS_ATTACHMENT_KEY_REGEX,
)
from llama_toolchain.inference.api import * # noqa: F403
from .base import BaseTool
def interpret_content_as_attachment(content: str) -> Optional[Attachment]:
match = re.search(TOOLS_ATTACHMENT_KEY_REGEX, content)
if match:
snippet = match.group(1)
data = json.loads(snippet)
return Attachment(
url=URL(uri="file://" + data["filepath"]), mime_type=data["mimetype"]
)
return None
class SingleMessageBuiltinTool(BaseTool):
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, f"Expected single message, got {len(messages)}"
message = messages[0]
assert len(message.tool_calls) == 1, "Expected a single tool call"
tool_call = messages[0].tool_calls[0]
query = tool_call.arguments["query"]
response: str = await self.run_impl(query)
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response,
)
if attachment := interpret_content_as_attachment(response):
message.content = attachment
return [message]
@abstractmethod
async def run_impl(self, query: str) -> str:
raise NotImplementedError()
class PhotogenTool(SingleMessageBuiltinTool):
def __init__(self, dump_dir: str) -> None:
self.dump_dir = dump_dir
def get_name(self) -> str:
return BuiltinTool.photogen.value
async def run_impl(self, query: str) -> str:
"""
Implement this to give the model an ability to generate images.
Return:
info = {
"filepath": str(image_filepath),
"mimetype": "image/png",
}
"""
raise NotImplementedError()
class BraveSearchTool(SingleMessageBuiltinTool):
def __init__(self, api_key: str) -> None:
self.api_key = api_key
def get_name(self) -> str:
return BuiltinTool.brave_search.value
async def run_impl(self, query: str) -> str:
url = "https://api.search.brave.com/res/v1/web/search"
headers = {
"X-Subscription-Token": self.api_key,
"Accept-Encoding": "gzip",
"Accept": "application/json",
}
payload = {"q": query}
response = requests.get(url=url, params=payload, headers=headers)
return json.dumps(self._clean_brave_response(response.json()))
def _clean_brave_response(self, search_response, top_k=3):
query = None
clean_response = []
if "query" in search_response:
if "original" in search_response["query"]:
query = search_response["query"]["original"]
if "mixed" in search_response:
mixed_results = search_response["mixed"]
for m in mixed_results["main"][:top_k]:
r_type = m["type"]
results = search_response[r_type]["results"]
if r_type == "web":
# For web data - add a single output from the search
idx = m["index"]
selected_keys = [
"type",
"title",
"url",
"description",
"date",
"extra_snippets",
]
cleaned = {
k: v for k, v in results[idx].items() if k in selected_keys
}
elif r_type == "faq":
# For faw data - take a list of all the questions & answers
selected_keys = ["type", "question", "answer", "title", "url"]
cleaned = []
for q in results:
cleaned.append(
{k: v for k, v in q.items() if k in selected_keys}
)
elif r_type == "infobox":
idx = m["index"]
selected_keys = [
"type",
"title",
"url",
"description",
"long_desc",
]
cleaned = {
k: v for k, v in results[idx].items() if k in selected_keys
}
elif r_type == "videos":
selected_keys = [
"type",
"url",
"title",
"description",
"date",
]
cleaned = []
for q in results:
cleaned.append(
{k: v for k, v in q.items() if k in selected_keys}
)
elif r_type == "locations":
# For faw data - take a list of all the questions & answers
selected_keys = [
"type",
"title",
"url",
"description",
"coordinates",
"postal_address",
"contact",
"rating",
"distance",
"zoom_level",
]
cleaned = []
for q in results:
cleaned.append(
{k: v for k, v in q.items() if k in selected_keys}
)
elif r_type == "news":
# For faw data - take a list of all the questions & answers
selected_keys = [
"type",
"title",
"url",
"description",
]
cleaned = []
for q in results:
cleaned.append(
{k: v for k, v in q.items() if k in selected_keys}
)
else:
cleaned = []
clean_response.append(cleaned)
return {"query": query, "top_k": clean_response}
class WolframAlphaTool(SingleMessageBuiltinTool):
def __init__(self, api_key: str) -> None:
self.api_key = api_key
self.url = "https://api.wolframalpha.com/v2/query"
def get_name(self) -> str:
return BuiltinTool.wolfram_alpha.value
async def run_impl(self, query: str) -> str:
params = {
"input": query,
"appid": self.api_key,
"format": "plaintext",
"output": "json",
}
response = requests.get(
self.url,
params=params,
)
return json.dumps(self._clean_wolfram_alpha_response(response.json()))
def _clean_wolfram_alpha_response(self, wa_response):
remove = {
"queryresult": [
"datatypes",
"error",
"timedout",
"timedoutpods",
"numpods",
"timing",
"parsetiming",
"parsetimedout",
"recalculate",
"id",
"host",
"server",
"related",
"version",
{
"pods": [
"scanner",
"id",
"error",
"expressiontypes",
"states",
"infos",
"position",
"numsubpods",
]
},
"assumptions",
],
}
for main_key in remove:
for key_to_remove in remove[main_key]:
try:
if key_to_remove == "assumptions":
if "assumptions" in wa_response[main_key]:
del wa_response[main_key][key_to_remove]
if isinstance(key_to_remove, dict):
for sub_key in key_to_remove:
if sub_key == "pods":
for i in range(len(wa_response[main_key][sub_key])):
if (
wa_response[main_key][sub_key][i]["title"]
== "Result"
):
del wa_response[main_key][sub_key][i + 1 :]
break
sub_items = wa_response[main_key][sub_key]
for i in range(len(sub_items)):
for sub_key_to_remove in key_to_remove[sub_key]:
if sub_key_to_remove in sub_items[i]:
del sub_items[i][sub_key_to_remove]
elif key_to_remove in wa_response[main_key]:
del wa_response[main_key][key_to_remove]
except KeyError:
pass
return wa_response
class CodeInterpreterTool(BaseTool):
def __init__(self) -> None:
ctx = CodeExecutionContext(
matplotlib_dump_dir=f"/tmp/{os.environ['USER']}_matplotlib_dump",
)
self.code_executor = CodeExecutor(ctx)
def get_name(self) -> str:
return BuiltinTool.code_interpreter.value
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
message = messages[0]
assert len(message.tool_calls) == 1, "Expected a single tool call"
tool_call = messages[0].tool_calls[0]
script = tool_call.arguments["code"]
req = CodeExecutionRequest(scripts=[script])
res = self.code_executor.execute(req)
pieces = [res["process_status"]]
for out_type in ["stdout", "stderr"]:
res_out = res[out_type]
if res_out != "":
pieces.extend([f"[{out_type}]", res_out, f"[/{out_type}]"])
if out_type == "stderr":
cprint(f"ipython tool error: ↓\n{res_out}", color="red")
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content="\n".join(pieces),
)
if attachment := interpret_content_as_attachment(res["stdout"]):
message.content = attachment
return [message]

View file

@ -1,5 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -1,133 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import errno
# Disabling potentially dangerous functions
import os as _os
from functools import partial
os_funcs_to_disable = [
"kill",
"system",
"putenv",
"remove",
"removedirs",
"rmdir",
"fchdir",
"setuid",
"fork",
"forkpty",
"killpg",
"rename",
"renames",
"truncate",
"replace",
# "unlink", # Commenting as this was blocking matpltlib from rendering plots correctly
"fchmod",
"fchown",
"chmod",
"chown",
"chroot",
"fchdir",
"lchflags",
"lchmod",
"lchown",
"chdir",
]
def call_not_allowed(*args, **kwargs):
raise OSError(errno.EPERM, "Call are not permitted in this environment")
for func_name in os_funcs_to_disable:
if hasattr(_os, func_name):
setattr(_os, func_name, partial(call_not_allowed, _func_name=f"os.{func_name}"))
import shutil as _shutil
for func_name in ["rmtree", "move", "chown"]:
if hasattr(_shutil, func_name):
setattr(
_shutil,
func_name,
partial(call_not_allowed, _func_name=f"shutil.{func_name}"),
)
import subprocess as _subprocess
def popen_not_allowed(*args, **kwargs):
raise _subprocess.CalledProcessError(
-1,
args[0] if args else "unknown",
stderr="subprocess.Popen is not allowed in this environment",
)
_subprocess.Popen = popen_not_allowed
import atexit as _atexit
import builtins as _builtins
import io as _io
import json as _json
import sys as _sys
# NB! The following "unused" imports crucial, make sure not not to remove
# them with linters - they're used in code_execution.py
from contextlib import ( # noqa
contextmanager as _contextmanager,
redirect_stderr as _redirect_stderr,
redirect_stdout as _redirect_stdout,
)
from multiprocessing.connection import Connection as _Connection
# Mangle imports to avoid polluting model execution namespace.
_IO_SINK = _io.StringIO()
_NETWORK_TIMEOUT = 5
_NETWORK_CONNECTIONS = None
def _open_connections():
global _NETWORK_CONNECTIONS
if _NETWORK_CONNECTIONS is not None:
# Ensure connections only opened once.
return _NETWORK_CONNECTIONS
req_w_fd, resp_r_fd = _sys.argv[1], _sys.argv[2]
req_con = _Connection(int(req_w_fd), readable=False)
resp_con = _Connection(int(resp_r_fd), writable=False)
_NETWORK_CONNECTIONS = (req_con, resp_con)
return _NETWORK_CONNECTIONS
_builtins._open_connections = _open_connections
@_atexit.register
def _close_connections():
global _NETWORK_CONNECTIONS
if _NETWORK_CONNECTIONS is None:
return
for con in _NETWORK_CONNECTIONS:
con.close()
del _NETWORK_CONNECTIONS
def _network_call(request):
# NOTE: We communicate with the parent process in json, encoded
# in raw bytes. We do this because native send/recv methods use
# pickle which involves execution of arbitrary code.
_open_connections()
req_con, resp_con = _NETWORK_CONNECTIONS
req_con.send_bytes(_json.dumps(request).encode("utf-8"))
if resp_con.poll(timeout=_NETWORK_TIMEOUT) is None:
raise Exception(f"Network request timed out: {_json.dumps(request)}")
else:
return _json.loads(resp_con.recv_bytes().decode("utf-8"))

View file

@ -1,256 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import base64
import json
import multiprocessing
import os
import re
import subprocess
import sys
import tempfile
import textwrap
import time
from dataclasses import dataclass
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import List
from PIL import Image
from .utils import get_code_env_prefix
TOOLS_ATTACHMENT_KEY = "__tools_attachment__"
TOOLS_ATTACHMENT_KEY_REGEX = re.compile(r"__tools_attachment__=(\{.*?\})")
DIRNAME = Path(__file__).parent
CODE_EXEC_TIMEOUT = 20
CODE_ENV_PREFIX = get_code_env_prefix()
STDOUTERR_SINK_WRAPPER_TEMPLATE = """\
with _redirect_stdout(_IO_SINK), _redirect_stderr(_IO_SINK):
{code}\
"""
TRYEXCEPT_WRAPPER_TEMPLATE = """\
try:
{code}
except:
pass\
"""
def generate_bwrap_command(bind_dirs: List[str]) -> str:
"""
Generate the bwrap command string for binding all
directories in the current directory read-only.
"""
bwrap_args = ""
bwrap_args += "--ro-bind / / "
# Add the --dev flag to mount device files
bwrap_args += "--dev /dev "
for d in bind_dirs:
bwrap_args += f"--bind {d} {d} "
# Add the --unshare-all flag to isolate the sandbox from the rest of the system
bwrap_args += "--unshare-all "
# Add the --die-with-parent flag to ensure the child process dies when bwrap's parent dies
bwrap_args += "--die-with-parent "
return bwrap_args
@dataclass
class CodeExecutionContext:
matplotlib_dump_dir: str
use_proxy: bool = False
@dataclass
class CodeExecutionRequest:
scripts: List[str]
only_last_cell_stdouterr: bool = True
only_last_cell_fail: bool = True
seed: int = 0
strip_fpaths_in_stderr: bool = True
class CodeExecutor:
def __init__(self, context: CodeExecutionContext):
self.context = context
def execute(self, req: CodeExecutionRequest) -> dict:
scripts = req.scripts
for i in range(len(scripts) - 1):
if req.only_last_cell_stdouterr:
scripts[i] = STDOUTERR_SINK_WRAPPER_TEMPLATE.format(
code=textwrap.indent(scripts[i], " " * 4)
)
if req.only_last_cell_fail:
scripts[i] = TRYEXCEPT_WRAPPER_TEMPLATE.format(
code=textwrap.indent(scripts[i], " " * 4)
)
# Seeds prefix:
seed = req.seed
seeds_prefix = f"""\
def _set_seeds():
import random
random.seed({seed})
import numpy as np
np.random.seed({seed})
_set_seeds()\
"""
script = "\n\n".join([seeds_prefix] + [CODE_ENV_PREFIX] + scripts)
with tempfile.TemporaryDirectory() as dpath:
bwrap_prefix = "bwrap " + generate_bwrap_command(bind_dirs=[dpath])
cmd = [*bwrap_prefix.split(), sys.executable, "-c", script]
code_fpath = os.path.join(dpath, "code.py")
with open(code_fpath, "w") as f:
f.write(script)
try:
python_path = os.environ.get("PYTHONPATH", "")
env = dict(
os.environ,
PYTHONHASHSEED=str(seed),
MPLCONFIGDIR=dpath,
MPLBACKEND="module://matplotlib_custom_backend",
PYTHONPATH=f"{DIRNAME}:{python_path}",
)
stdout, stderr, returncode = do_subprocess(
cmd=cmd,
env=env,
ctx=self.context,
)
stderr = stderr.strip()
if req.strip_fpaths_in_stderr:
pattern = r'File "([^"]+)", line (\d+)'
stderr = re.sub(pattern, r"line \2", stderr)
return {
"process_status": "completed",
"returncode": returncode,
"stdout": stdout.strip(),
"stderr": stderr,
}
except subprocess.TimeoutExpired:
return {
"process_status": "timeout",
"stdout": "Timed out",
"stderr": "Timed out",
}
except Exception as e:
return {
"process_status": "error",
"error_type": type(e).__name__,
"stderr": str(e),
"stdout": str(e),
}
def process_matplotlib_response(response, matplotlib_dump_dir: str):
image_data = response["image_data"]
# Convert the base64 string to a bytes object
images = [base64.b64decode(d["image_base64"]) for d in image_data]
# Create a list of PIL images from the bytes objects
images = [Image.open(BytesIO(img)) for img in images]
# Create a list of image paths
image_paths = []
for i, img in enumerate(images):
# create new directory for each day to better organize data:
dump_dname = datetime.today().strftime("%Y-%m-%d")
dump_dpath = Path(matplotlib_dump_dir, dump_dname)
dump_dpath.mkdir(parents=True, exist_ok=True)
# save image into a file
dump_fname = f"matplotlib_{str(time.time()).replace('.', '_')}_{i}.png"
dump_fpath = dump_dpath / dump_fname
img.save(dump_fpath, "PNG")
image_paths.append(str(dump_fpath))
# this is kind of convoluted, we send back this response to the subprocess which
# prints it out
info = {
"filepath": str(image_paths[-1]),
"mimetype": "image/png",
}
return f"{TOOLS_ATTACHMENT_KEY}={json.dumps(info)}"
def execute_subprocess_request(request, ctx: CodeExecutionContext):
"Route requests from the subprocess (via network Pipes) to the internet/tools."
if request["type"] == "matplotlib":
return process_matplotlib_response(request, ctx.matplotlib_dump_dir)
else:
raise Exception(f'Unrecognised network request type: {request["type"]}')
def do_subprocess(*, cmd: list, env: dict, ctx: CodeExecutionContext):
# Create Pipes to be used for any external tool/network requests.
req_r, req_w = multiprocessing.Pipe(duplex=False)
resp_r, resp_w = multiprocessing.Pipe(duplex=False)
cmd += [str(req_w.fileno()), str(resp_r.fileno())]
proc = subprocess.Popen(
cmd,
pass_fds=(req_w.fileno(), resp_r.fileno()),
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
env=env,
)
# Close unnecessary fds.
req_w.close()
resp_r.close()
pipe_close = False
done_read = False
start = time.monotonic()
while proc.poll() is None and not pipe_close:
if req_r.poll(0.1):
# NB: Python pipe semantics for poll and recv mean that
# poll() returns True is a pipe is closed.
# CF old school PEP from '09
# https://bugs.python.org/issue5573
try:
request = json.loads(req_r.recv_bytes().decode("utf-8"))
response = execute_subprocess_request(request, ctx)
resp_w.send_bytes(json.dumps(response).encode("utf-8"))
except EOFError:
# The request pipe is closed - set a marker to exit
# after the next attempt at reading stdout/stderr.
pipe_close = True
try:
# If lots has been printed, pipe might be full but
# proc cannot exit until all the stdout/stderr
# been written/read.
stdout, stderr = proc.communicate(timeout=0.3)
done_read = True
except subprocess.TimeoutExpired:
# The program has not terminated. Ignore it, there
# may be more network/tool requests.
continue
if time.monotonic() - start > CODE_EXEC_TIMEOUT:
proc.terminate()
raise subprocess.TimeoutExpired(cmd, CODE_EXEC_TIMEOUT)
if not done_read:
# Solve race condition where process terminates before
# we hit the while loop.
stdout, stderr = proc.communicate(timeout=0.3)
resp_w.close()
req_r.close()
return stdout, stderr, proc.returncode

View file

@ -1,87 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
A custom Matplotlib backend that overrides the show method to return image bytes.
"""
import base64
import io
import json as _json
import matplotlib
from matplotlib.backend_bases import FigureManagerBase
# Import necessary components from Matplotlib
from matplotlib.backends.backend_agg import FigureCanvasAgg
class CustomFigureCanvas(FigureCanvasAgg):
def show(self):
# Save the figure to a BytesIO object
buf = io.BytesIO()
self.print_png(buf)
image_bytes = buf.getvalue()
buf.close()
return image_bytes
class CustomFigureManager(FigureManagerBase):
def __init__(self, canvas, num):
super().__init__(canvas, num)
# Mimic module initialization that integrates with the Matplotlib backend system
def _create_figure_manager(num, *args, **kwargs):
"""
Create a custom figure manager instance.
"""
FigureClass = kwargs.pop("FigureClass", None) # noqa: N806
if FigureClass is None:
from matplotlib.figure import Figure
FigureClass = Figure # noqa: N806
fig = FigureClass(*args, **kwargs)
canvas = CustomFigureCanvas(fig)
manager = CustomFigureManager(canvas, num)
return manager
def show():
"""
Handle all figures and potentially return their images as bytes.
This function iterates over all figures registered with the custom backend,
renders them as images in bytes format, and could return a list of bytes objects,
one for each figure, or handle them as needed.
"""
image_data = []
for manager in matplotlib._pylab_helpers.Gcf.get_all_fig_managers():
# Get the figure from the manager
fig = manager.canvas.figure
buf = io.BytesIO() # Create a buffer for the figure
fig.savefig(buf, format="png") # Save the figure to the buffer in PNG format
buf.seek(0) # Go to the beginning of the buffer
image_bytes = buf.getvalue() # Retrieve bytes value
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
image_data.append({"image_base64": image_base64})
buf.close()
req_con, resp_con = _open_connections()
_json_dump = _json.dumps(
{
"type": "matplotlib",
"image_data": image_data,
}
)
req_con.send_bytes(_json_dump.encode("utf-8"))
resp = _json.loads(resp_con.recv_bytes().decode("utf-8"))
print(resp)
FigureCanvas = CustomFigureCanvas
FigureManager = CustomFigureManager

View file

@ -1,21 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import os
DIR = os.path.dirname(os.path.realpath(__file__))
CODE_ENV_PREFIX_FILE = os.path.join(DIR, "code_env_prefix.py")
CODE_ENV_PREFIX = None
def get_code_env_prefix() -> str:
global CODE_ENV_PREFIX
if CODE_ENV_PREFIX is None:
with open(CODE_ENV_PREFIX_FILE, "r") as f:
CODE_ENV_PREFIX = f.read()
return CODE_ENV_PREFIX

View file

@ -1,59 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import List
from llama_toolchain.agentic_system.meta_reference.safety import ShieldRunnerMixin
from llama_toolchain.inference.api import Message
from llama_toolchain.safety.api.datatypes import ShieldDefinition
from llama_toolchain.safety.api.endpoints import Safety
from .builtin import BaseTool
class SafeTool(BaseTool, ShieldRunnerMixin):
"""A tool that makes other tools safety enabled"""
def __init__(
self,
tool: BaseTool,
safety_api: Safety,
input_shields: List[ShieldDefinition] = None,
output_shields: List[ShieldDefinition] = None,
):
self._tool = tool
ShieldRunnerMixin.__init__(
self, safety_api, input_shields=input_shields, output_shields=output_shields
)
def get_name(self) -> str:
# return the name of the wrapped tool
return self._tool.get_name()
async def run(self, messages: List[Message]) -> List[Message]:
if self.input_shields:
await self.run_shields(messages, self.input_shields)
# run the underlying tool
res = await self._tool.run(messages)
if self.output_shields:
await self.run_shields(messages, self.output_shields)
return res
def with_safety(
tool: BaseTool,
safety_api: Safety,
input_shields: List[ShieldDefinition] = None,
output_shields: List[ShieldDefinition] = None,
) -> SafeTool:
return SafeTool(
tool,
safety_api,
input_shields=input_shields,
output_shields=output_shields,
)

View file

@ -1,5 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -1,106 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
from abc import abstractmethod
from typing import Dict, List
from llama_models.llama3.api.datatypes import * # noqa: F403
from llama_toolchain.agentic_system.api import * # noqa: F403
# TODO: this is symptomatic of us needing to pull more tooling related utilities
from llama_toolchain.agentic_system.meta_reference.tools.builtin import (
interpret_content_as_attachment,
)
class CustomTool:
"""
Developers can define their custom tools that models can use
by extending this class.
Developers need to provide
- name
- description
- params_definition
- implement tool's behavior in `run_impl` method
NOTE: The return of the `run` method needs to be json serializable
"""
@abstractmethod
def get_name(self) -> str:
raise NotImplementedError
@abstractmethod
def get_description(self) -> str:
raise NotImplementedError
@abstractmethod
def get_params_definition(self) -> Dict[str, ToolParamDefinition]:
raise NotImplementedError
def get_instruction_string(self) -> str:
return f"Use the function '{self.get_name()}' to: {self.get_description()}"
def parameters_for_system_prompt(self) -> str:
return json.dumps(
{
"name": self.get_name(),
"description": self.get_description(),
"parameters": {
name: definition.__dict__
for name, definition in self.get_params_definition().items()
},
}
)
def get_tool_definition(self) -> AgenticSystemToolDefinition:
return AgenticSystemToolDefinition(
tool_name=self.get_name(),
description=self.get_description(),
parameters=self.get_params_definition(),
)
@abstractmethod
async def run(self, messages: List[Message]) -> List[Message]:
raise NotImplementedError
class SingleMessageCustomTool(CustomTool):
"""
Helper class to handle custom tools that take a single message
Extending this class and implementing the `run_impl` method will
allow for the tool be called by the model and the necessary plumbing.
"""
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
)
if attachment := interpret_content_as_attachment(response_str):
message.content = attachment
return [message]
@abstractmethod
async def run_impl(self, *args, **kwargs):
raise NotImplementedError()

View file

@ -18,7 +18,7 @@ from llama_toolchain.agentic_system.api import (
from llama_toolchain.agentic_system.api.datatypes import ToolPromptFormat
from llama_toolchain.agentic_system.client import AgenticSystemClient
from llama_toolchain.agentic_system.tools.custom.execute import (
from llama_toolchain.agentic_system.meta_reference.execute_with_custom_tools import (
execute_with_custom_tools,
)
from llama_toolchain.safety.api.datatypes import BuiltinShield, ShieldDefinition