Make each inference provider into its own subdirectory

This commit is contained in:
Ashwin Bharambe 2024-08-05 15:13:52 -07:00
parent f64668319c
commit 0de5a807c7
42 changed files with 123 additions and 103 deletions

View file

@ -0,0 +1,5 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -0,0 +1,106 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
from abc import abstractmethod
from typing import Dict, List
from llama_models.llama3_1.api.datatypes import * # noqa: F403
from llama_toolchain.agentic_system.api import * # noqa: F403
# TODO: this is symptomatic of us needing to pull more tooling related utilities
from llama_toolchain.agentic_system.meta_reference.tools.builtin import (
interpret_content_as_attachment,
)
class CustomTool:
"""
Developers can define their custom tools that models can use
by extending this class.
Developers need to provide
- name
- description
- params_definition
- implement tool's behavior in `run_impl` method
NOTE: The return of the `run` method needs to be json serializable
"""
@abstractmethod
def get_name(self) -> str:
raise NotImplementedError
@abstractmethod
def get_description(self) -> str:
raise NotImplementedError
@abstractmethod
def get_params_definition(self) -> Dict[str, ToolParamDefinition]:
raise NotImplementedError
def get_instruction_string(self) -> str:
return f"Use the function '{self.get_name()}' to: {self.get_description()}"
def parameters_for_system_prompt(self) -> str:
return json.dumps(
{
"name": self.get_name(),
"description": self.get_description(),
"parameters": {
name: definition.__dict__
for name, definition in self.get_params_definition().items()
},
}
)
def get_tool_definition(self) -> AgenticSystemToolDefinition:
return AgenticSystemToolDefinition(
tool_name=self.get_name(),
description=self.get_description(),
parameters=self.get_params_definition(),
)
@abstractmethod
async def run(self, messages: List[Message]) -> List[Message]:
raise NotImplementedError
class SingleMessageCustomTool(CustomTool):
"""
Helper class to handle custom tools that take a single message
Extending this class and implementing the `run_impl` method will
allow for the tool be called by the model and the necessary plumbing.
"""
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
)
if attachment := interpret_content_as_attachment(response_str):
message.content = attachment
return [message]
@abstractmethod
async def run_impl(self, *args, **kwargs):
raise NotImplementedError()

View file

@ -0,0 +1,83 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, AsyncGenerator, List
from llama_models.llama3_1.api.datatypes import StopReason, ToolResponseMessage
from llama_toolchain.agentic_system.api import (
AgenticSystem,
AgenticSystemTurnCreateRequest,
AgenticSystemTurnResponseEventType as EventType,
)
from llama_toolchain.inference.api import Message
async def execute_with_custom_tools(
system: AgenticSystem,
system_id: str,
session_id: str,
messages: List[Message],
custom_tools: List[Any],
max_iters: int = 5,
stream: bool = True,
) -> AsyncGenerator:
# first create a session, or do you keep a persistent session?
tools_dict = {t.get_name(): t for t in custom_tools}
current_messages = messages.copy()
n_iter = 0
while n_iter < max_iters:
n_iter += 1
request = AgenticSystemTurnCreateRequest(
system_id=system_id,
session_id=session_id,
messages=current_messages,
stream=stream,
)
turn = None
async for chunk in system.create_agentic_system_turn(request):
if chunk.event.payload.event_type != EventType.turn_complete.value:
yield chunk
else:
turn = chunk.event.payload.turn
message = turn.output_message
if len(message.tool_calls) == 0:
yield chunk
return
if message.stop_reason == StopReason.out_of_tokens:
yield chunk
return
tool_call = message.tool_calls[0]
if tool_call.tool_name not in tools_dict:
m = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=f"Unknown tool `{tool_call.tool_name}` was called. Try again with something else",
)
next_message = m
else:
tool = tools_dict[tool_call.tool_name]
result_messages = await execute_custom_tool(tool, message)
next_message = result_messages[0]
yield next_message
current_messages = [next_message]
async def execute_custom_tool(tool: Any, message: Message) -> List[Message]:
result_messages = await tool.run([message])
assert (
len(result_messages) == 1
), f"Expected single message, got {len(result_messages)}"
return result_messages