rename folder; code readme update

This commit is contained in:
Jeff Tang 2024-12-11 18:04:32 -08:00
parent a45c82aa1a
commit 61e837380c
6 changed files with 175 additions and 275 deletions

View file

@ -0,0 +1,167 @@
# A Llama and Llama Stack Powered Email Agent
This is a Llama Stack port of the [Llama Powered Email Agent](https://github.com/meta-llama/llama-recipes/tree/gmagent/recipes/use_cases/email_agent) app that shows how to build an email agent app powered by Llama 3.1 8B and Llama Stack, using Llama Stack custom tool and agent APIs.
Currently implemented features of the agent include:
* search for emails and attachments
* get email detail
* reply to a specific email
* forward an email
* get summary of a PDF attachment
* draft and send an email
We'll mainly cover here how to port a Llama app using native custom tools supported in Llama 3.1 (and later) and an agent implementation from scratch to using Llama Stack APIs. See the link above for a comprehensive overview, definition, and resources of LLM agents.
# Setup and Installation
See the link above for Enable Gmail API and Install Ollama with Llama 3.1 8B.
## Install required packages
First, create a Conda or virtual env, then activate it and install the required Python libraries (slightly different from the original app because here we'll also install the `llama-stack-client` package):
```
git clone https://github.com/meta-llama/llama-stack
cd llama-stack/docs/zero_to_hero_guide/email_agent
pip install -r requirements.txt
```
# Run Email Agent
The steps are also the same as the [original app]((https://github.com/meta-llama/llama-recipes/tree/gmagent/recipes/use_cases/email_agent).
# Implementation Notes
Notes here mainly cover how custom tools (functions) are defined and how the Llama Stack Agent class is used with the custom tools.
## Available Custom Tool Definition
The `functions_prompt.py` defines the following six custom tools (functions), each as a subclass of Llama Stack's `CustomTool`, along with examples for each function call spec that Llama should return):
* ListEmailsTool
* GetEmailDetailTool
* SendEmailTool
* GetPDFSummaryTool
* CreateDraftTool
* SendDraftTool
Below is an example custom tool call spec in JSON format, for the user asks such as "do i have emails with attachments larger than 5mb", "any attachments larger than 5mb" or "let me know if i have large attachments over 5mb":
```
{"name": "list_emails", "parameters": {"query": "has:attachment larger:5mb"}}
```
Porting the custom function definition in the original app to Llama Stack's CustomTool subclass is straightforward. Below is an example of the original custom function definition:
```
list_emails_function = """
{
"type": "function",
"function": {
"name": "list_emails",
"description": "Return a list of emails matching an optionally specified query.",
"parameters": {
"type": "dic",
"properties": [
{
"maxResults": {
"type": "integer",
"description": "The default maximum number of emails to return is 100; the maximum allowed value for this field is 500."
}
},
{
"query": {
"type": "string",
"description": "One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not)."
}
}
],
"required": []
}
}
}
"""
```
And its Llama Stack CustomTool subclass implementation is:
```
class ListEmailsTool(CustomTool):
"""Custom tool for List Emails."""
def get_name(self) -> str:
return "list_emails"
def get_description(self) -> str:
return "Return a list of emails matching an optionally specified query."
def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]:
return {
"maxResults": ToolParamDefinitionParam(
param_type="int",
description="The default maximum number of emails to return is 100; the maximum allowed value for this field is 500.",
required=False
),
"query": ToolParamDefinitionParam(
param_type="str",
description="One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not).",
required=False
)
}
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
role="ipython",
)
return [message]
async def run_impl(self, query: str, maxResults: int = 100) -> Dict[str, Any]:
"""Query to get a list of emails matching the query."""
emails = list_emails(query)
return {"name": self.get_name(), "result": emails}
```
Each CustomTool subclass has a `run_impl` method that calls actual Gmail API-based tool call implementation (same as the original app), which, in the example above, is `list_emails`.
## The Llama Stack Agent class
The `create_email_agent` in main.py creates a Llama Stack Agent with 6 custom tools using a `LlamaStackClient` instance that connects to Together.ai's Llama Stack server. The agent then creates a session, uses the same session in a loop to create a turn for each user ask. Inside each turn, a tool call spec is generated based on the user ask and, if needed after processing of the tool call spec to match what the actual Gmail API expects (e.g. get_email_detail requires an email id but the tool call spec generated by Llama doesn't have the id), actual tool calling happens. After post-processing of the tool call result, a user-friendly message is generated to respond to the user's original ask.
## Memory
In `shared.py` we define a simple dictionary `memory`, used to hold short-term results such as a list of found emails based on the user ask, or the draft id of a created email draft. They're needed to answer follow up user asks such as "what attachments does the email with subject xxx have" or "send the draft".
# TODOs
1. Improve the search, reply, forward, create email draft, and query about types of attachments.
2. Improve the fallback and error handling mechanism when the user asks don't lead to a correct function calling spec or the function calling fails.
3. Improve the user experience by showing progress when some Gmail search API calls take long (minutes) to complete.
4. Implement the async behavior of the agent - schedule an email to be sent later.
5. Implement the agent planning - decomposing a complicated ask into sub-tasks, using ReAct and other methods.
6. Implement the agent long-term memory - longer context and memory across sessions (consider using Llama Stack/MemGPT/Letta)
7. Implement reflection - on the tool calling spec and results.
8. Introduce multiple-agent collaboration.
9. Implement the agent observability.
10. Compare different agent frameworks using the agent as the case study.
11. Add and implement a test plan and productionize the email agent.
# Resources
1. Lilian Weng's blog [LLM Powered Autonomous Agents](https://lilianweng.github.io/posts/2023-06-23-agent/)
2. Andrew Ng's posts [Agentic Design Patterns](https://www.deeplearning.ai/the-batch/how-agents-can-improve-llm-performance/) with basic [implementations from scratch](https://github.com/neural-maze/agentic_patterns).
3. LangChain's survey [State of AI Agents](https://www.langchain.com/stateofaiagents)
4. Deloitte's report [AI agents and multiagent systems](https://www2.deloitte.com/content/dam/Deloitte/us/Documents/consulting/us-ai-institute-generative-ai-agents-multiagent-systems.pdf)
5. Letta's blog [The AI agents stack](https://www.letta.com/blog/ai-agents-stack)
6. Microsoft's multi-agent system [Magentic-One](https://www.microsoft.com/en-us/research/articles/magentic-one-a-generalist-multi-agent-system-for-solving-complex-tasks)
7. Amazon's [Multi-Agent Orchestrator framework](https://awslabs.github.io/multi-agent-orchestrator/)
8. Deeplearning.ai's [agent related courses](https://www.deeplearning.ai/courses/?courses_date_desc%5Bquery%5D=agents) (Meta, AWS, Microsoft, LangChain, LlamaIndex, crewAI, AutoGen, Letta) and some [lessons ported to using Llama](https://github.com/meta-llama/llama-recipes/tree/main/recipes/quickstart/agents/DeepLearningai_Course_Notebooks).
9. Felicis's [The Agentic Web](https://www.felicis.com/insight/the-agentic-web)
10. A pretty complete [list of AI agents](https://github.com/e2b-dev/awesome-ai-agents), not including [/dev/agents](https://sdsa.ai/), a very new startup building the next-gen OS for AI agents, though.
11. Sequoia's [post](https://www.linkedin.com/posts/konstantinebuhler_the-ai-landscape-is-shifting-from-simple-activity-7270111755710672897-ZHnr/) on 2024 being the year of AI agents and 2025 networks of AI agents.

View file

@ -0,0 +1,513 @@
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from bs4 import BeautifulSoup
import os
import pytz
import base64
import pickle
from datetime import datetime, timezone
import ollama
from pypdf import PdfReader
from pathlib import Path
from shared import memory
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.compose']
user_email = None
service = None
user_id = 'me'
def authenticate_gmail(user_email):
creds = None
token_file = f'token_{user_email}.pickle' # Unique token file for each user
# Load the user's token if it exists
if os.path.exists(token_file):
with open(token_file, 'rb') as token:
creds = pickle.load(token)
# If no valid credentials, prompt the user to log in
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_console()
# Save the new credentials to a user-specific token file
with open(token_file, 'wb') as token:
pickle.dump(creds, token)
# Build the Gmail API service
service = build('gmail', 'v1', credentials=creds)
return service
def num_of_emails(query=''):
response = service.users().messages().list(
userId='me',
q=query,
maxResults=1).execute()
return response.get('resultSizeEstimate', 0)
def list_emails(query='', max_results=100):
emails = []
next_page_token = None
while True:
response = service.users().messages().list(
userId=user_id,
maxResults=max_results,
pageToken=next_page_token,
q=query
).execute()
if 'messages' in response:
for msg in response['messages']:
sender, subject, received_time = get_email_info(msg['id'])
emails.append(
{
"message_id": msg['id'],
"sender": sender,
"subject": subject,
"received_time": received_time
}
)
next_page_token = response.get('nextPageToken')
if not next_page_token:
break
return emails
def get_email_detail(detail, which):
message_id = None
# pre-processing
if 'from ' in which:
sender = which.split('from ')[-1]
for email in memory['emails']:
if email['sender'].find(sender) != -1:
message_id = email['message_id']
break
elif 'subject:' in which:
subject = which.split('subject:')[-1]
# exact match beats substring
for email in memory['emails']:
if email['subject'].upper() == subject.upper():
message_id = email['message_id']
break
elif email['subject'].upper().find(subject.upper()) != -1:
message_id = email['message_id']
elif 'id_' in which:
message_id = which.split('id_')[-1]
else:
message_id = memory['emails'][-1]['message_id']
if detail == 'body':
return get_email_body(message_id)
elif detail == 'attachment':
return get_email_attachments(message_id)
def get_email_body(message_id):
try:
message = service.users().messages().get(
userId=user_id,
id=message_id,
format='full').execute()
# Recursive function to extract the parts
def extract_parts(payload):
text_body = ""
if 'parts' in payload:
for part in payload['parts']:
return extract_parts(part)
else:
mime_type = payload.get('mimeType')
body = payload.get('body', {}).get('data')
if mime_type == 'text/html':
decoded_body = base64.urlsafe_b64decode(body).decode('utf-8')
soup = BeautifulSoup(decoded_body, 'html.parser')
text_body = soup.get_text().strip()
elif mime_type == 'text/plain':
decoded_body = base64.urlsafe_b64decode(body).decode('utf-8')
text_body = decoded_body
return text_body
return extract_parts(message['payload'])
except Exception as e:
print(f"An error occurred: {e}")
return None
def parse_message(message):
payload = message['payload']
headers = payload.get("headers")
subject = None
sender = None
for header in headers:
if header['name'] == 'Subject':
subject = header['value']
elif header['name'] == 'From':
sender = header['value']
internal_date = message.get('internalDate')
utc_time = datetime.fromtimestamp(int(internal_date) / 1000, tz=timezone.utc)
# Convert UTC to the specified timezone
local_timezone = pytz.timezone("America/Los_Angeles")
local_time = utc_time.astimezone(local_timezone)
# Format the local time as a string
received_time = local_time.strftime('%Y-%m-%d %H:%M:%S %Z')
# Check if the email is plain text or multipart
if 'parts' in payload:
# Multipart message - find the text/plain or text/html part
for part in payload['parts']:
if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html': # You can also look for 'text/html'
data = part['body']['data']
body = base64.urlsafe_b64decode(data).decode('utf-8')
return sender, subject, received_time, body
elif part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']:
return sender, subject, received_time, get_email_body(message.get('id'))
else:
# Single part message
data = payload['body']['data']
body = base64.urlsafe_b64decode(data).decode('utf-8')
return sender, subject, received_time, body
def get_email_info(msg_id):
message = service.users().messages().get(
userId=user_id,
id=msg_id,
format='full').execute()
sender, subject, received_time, body = parse_message(message)
return sender, subject, received_time
def reply_email(message_id, reply_text):
# Fetch the original message
original_message = service.users().messages().get(
userId=user_id,
id=message_id,
format='full').execute()
# Get headers
headers = original_message['payload']['headers']
subject = None
to = None
for header in headers:
if header['name'] == 'Subject':
subject = header['value']
if header['name'] == 'From':
to = header['value']
# Create the reply subject
if not subject.startswith("Re: "):
subject = "Re: " + subject
# Compose the reply message
reply_message = MIMEText(reply_text)
reply_message['to'] = to
reply_message['from'] = user_id
reply_message['subject'] = subject
reply_message['In-Reply-To'] = message_id
# Encode and send the message
raw_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode("utf-8")
body = {'raw': raw_message,
'threadId': original_message['threadId']}
sent_message = service.users().messages().send(
userId=user_id,
body=body).execute()
print("Reply sent. Message ID:", sent_message['id'])
def forward_email(message_id, forward_to, email_body=None):
"""
Forwards an email, preserving the original MIME type, including multipart/related.
"""
# Get the original message in 'full' format
original_message = service.users().messages().get(
userId=user_id,
id=message_id,
format='full').execute()
# Extract the payload and headers
payload = original_message.get('payload', {})
headers = payload.get('headers', [])
parts = payload.get('parts', [])
# Get the Subject
subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject')
# Create a new MIME message for forwarding
mime_message = MIMEMultipart(payload.get('mimeType', 'mixed').split('/')[-1])
mime_message['To'] = forward_to
mime_message['Subject'] = f"Fwd: {subject}"
# Add the optional custom email body
if email_body:
mime_message.attach(MIMEText(email_body, 'plain'))
# Function to fetch attachment data by attachmentId
def fetch_attachment_data(attachment_id, message_id):
attachment = service.users().messages().attachments().get(
userId=user_id, messageId=message_id, id=attachment_id
).execute()
return base64.urlsafe_b64decode(attachment['data'])
# Rebuild MIME structure
def rebuild_parts(parts):
"""
Recursively rebuild MIME parts.
"""
if not parts:
return None
for part in parts:
part_mime_type = part.get('mimeType', 'text/plain')
part_body = part.get('body', {})
part_data = part_body.get('data', '')
part_parts = part.get('parts', []) # Sub-parts for multipart types
filename = part.get('filename')
attachment_id = part_body.get('attachmentId')
if part_mime_type.startswith('multipart/'):
# Rebuild nested multipart
sub_multipart = MIMEMultipart(part_mime_type.split('/')[-1])
sub_parts = rebuild_parts(part_parts)
if sub_parts:
for sub_part in sub_parts:
sub_multipart.attach(sub_part)
yield sub_multipart
elif filename and attachment_id:
# Handle attachments
decoded_data = fetch_attachment_data(attachment_id, message_id)
attachment = MIMEBase(*part_mime_type.split('/'))
attachment.set_payload(decoded_data)
encoders.encode_base64(attachment)
attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"')
yield attachment
else:
if part_data:
# Decode and attach non-multipart parts
decoded_data = base64.urlsafe_b64decode(part_data)
if part_mime_type == 'text/plain':
yield MIMEText(decoded_data.decode('utf-8'), 'plain')
elif part_mime_type == 'text/html':
yield MIMEText(decoded_data.decode('utf-8'), 'html')
# Rebuild the main MIME structure
rebuilt_parts = rebuild_parts(parts)
if rebuilt_parts:
for rebuilt_part in rebuilt_parts:
mime_message.attach(rebuilt_part)
# Encode the MIME message to base64
raw = base64.urlsafe_b64encode(mime_message.as_bytes()).decode('utf-8')
# Send the email
forward_body = {'raw': raw}
sent_message = service.users().messages().send(userId=user_id, body=forward_body).execute()
print(f"Message forwarded successfully! Message ID: {sent_message['id']}")
def send_email(action, to, subject, body="", email_id=""):
if action == "compose":
message = MIMEText(body)
message['to'] = to
message['from'] = user_id
message['subject'] = subject
# Encode and send the message
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")
body = {'raw': raw_message}
sent_message = service.users().messages().send(
userId=user_id,
body=body).execute()
return sent_message['id']
elif action == "reply": # reply or forward; a message id is needed
reply_email(email_id, body)
elif action == "forward":
forward_email(email_id, to, body)
def create_draft(action, to, subject, body="", email_id=""):
if action == "new":
message = MIMEText(body)
message['to'] = to
message['from'] = user_id
message['subject'] = subject
encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
draft_body = {'message': {'raw': encoded_message}}
draft = service.users().drafts().create(
userId=user_id,
body=draft_body).execute()
print(f"Draft created with ID: {draft['id']}")
return draft['id']
elif action == "reply":
return create_reply_draft(email_id, body)
elif action == "forward":
return create_forward_draft(email_id, to, body)
else:
return
def create_reply_draft(message_id, reply_text):
# Fetch the original message
original_message = service.users().messages().get(
userId=user_id,
id=message_id,
format='full').execute()
# Get headers
headers = original_message['payload']['headers']
subject = None
to = None
for header in headers:
if header['name'] == 'Subject':
subject = header['value']
if header['name'] == 'From':
to = header['value']
# Create the reply subject
if not subject.startswith("Re: "):
subject = "Re: " + subject
# Compose the reply message
reply_message = MIMEText(reply_text)
reply_message['to'] = to
reply_message['from'] = user_id
reply_message['subject'] = subject
reply_message['In-Reply-To'] = message_id
encoded_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode()
draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}}
draft = service.users().drafts().create(userId=user_id, body=draft_body).execute()
return draft['id']
def create_forward_draft(message_id, recipient_email, custom_message=None):
# Get the original message
original_message = service.users().messages().get(
userId=user_id,
id=message_id,
format='raw').execute()
# Decode the raw message
raw_message = base64.urlsafe_b64decode(original_message['raw'].encode('utf-8'))
# Prepare the forward header and optional custom message
forward_header = f"----- Forwarded message -----\nFrom: {recipient_email}\n\n"
if custom_message:
forward_header += f"{custom_message}\n\n"
# Combine the forward header with the original message
new_message = forward_header + raw_message.decode('utf-8')
# Encode the combined message into base64 format
encoded_message = base64.urlsafe_b64encode(new_message.encode('utf-8')).decode('utf-8')
draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}}
draft = service.users().drafts().create(userId=user_id, body=draft_body).execute()
print(f"Forward draft created with ID: {draft['id']}")
return draft['id']
def send_draft(id):
sent_message = service.users().drafts().send(
userId=user_id,
body={'id': id}
).execute()
return f"Draft sent with email ID: {sent_message['id']}"
def get_pdf_summary(file_name):
text = pdf2text(file_name)
print("Calling Llama to generate a summary...")
response = llama31(text, "Generate a summary of the input text in 5 sentences.")
return response
def get_email_attachments(message_id, mime_type='application/pdf'):
attachments = []
# Helper function to process email parts
def process_parts(parts):
for part in parts:
if part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']:
# Recursively process nested parts
if 'parts' in part:
process_parts(part['parts'])
elif 'filename' in part and part['filename']:
if part['mimeType'] == mime_type: # Check for the desired MIME type
attachment_id = part['body'].get('attachmentId')
if attachment_id:
# Get the attachment data
attachment = service.users().messages().attachments().get(
userId=user_id,
messageId=message_id,
id=attachment_id
).execute()
# Decode the attachment content
file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8'))
with open(part['filename'], "wb") as f:
f.write(file_data)
# Save the attachment information
attachments.append(
{'filename': part['filename'],
'data': file_data,
'size': attachment.get('size', 0)
})
# Retrieve the email message
message = service.users().messages().get(
userId=user_id,
id=message_id,
format='full').execute()
payload = message['payload']
# Start processing the parts
if 'parts' in payload:
process_parts(payload['parts'])
rslt = ""
for a in attachments:
rslt += f"{a['filename']} - {a['size']} bytes\n"
return rslt #attachments
def pdf2text(file):
text = ''
try:
with Path(file).open("rb") as f:
reader = PdfReader(f)
text = "\n\n".join([page.extract_text() for page in reader.pages])
except Exception as e:
raise f"Error reading the PDF file: {str(e)}"
print(f"\nPDF text length: {len(text)}\n")
return text
def set_email_service(gmail):
global user_email
global service
user_email = gmail
service = authenticate_gmail(user_email)
def llama31(user_prompt: str, system_prompt = ""):
response = ollama.chat(model='llama3.1',
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response['message']['content']

View file

@ -0,0 +1,359 @@
from typing import List, Dict, Any
from llama_stack_client.types.tool_param_definition_param import ToolParamDefinitionParam
from llama_stack_client.types import CompletionMessage, ToolResponseMessage
from llama_stack_client.lib.agents.custom_tool import CustomTool
from email_agent import *
import json
class ListEmailsTool(CustomTool):
"""Custom tool for List Emails."""
def get_name(self) -> str:
return "list_emails"
def get_description(self) -> str:
return "Return a list of emails matching an optionally specified query."
def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]:
return {
"maxResults": ToolParamDefinitionParam(
param_type="int",
description="The default maximum number of emails to return is 100; the maximum allowed value for this field is 500.",
required=False
),
"query": ToolParamDefinitionParam(
param_type="str",
description="One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not).",
required=False
)
}
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
role="ipython",
)
return [message]
async def run_impl(self, query: str, maxResults: int = 100) -> Dict[str, Any]:
"""Query to get a list of emails matching the query."""
emails = list_emails(query)
return {"name": self.get_name(), "result": emails}
class GetEmailDetailTool(CustomTool):
"""Custom tool for Get Email Detail."""
def get_name(self) -> str:
return "get_email_detail"
def get_description(self) -> str:
return "Get detailed info about a specific email"
def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]:
return {
"detail": ToolParamDefinitionParam(
param_type="str",
description="what detail the user wants to know about - two possible values: body or attachment",
required=True
),
"query": ToolParamDefinitionParam(
param_type="str",
description="One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not).",
required=False
)
}
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
role="ipython",
)
return [message]
async def run_impl(self, detail: str, query: str) -> Dict[str, Any]:
"""Query to get the detail of an email."""
detail = get_email_detail(detail, query)
return {"name": self.get_name(), "result": detail}
class SendEmailTool(CustomTool):
"""Compose, reply, or forward email."""
def get_name(self) -> str:
return "send_email"
def get_description(self) -> str:
return "Compose, reply, or forward email"
def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]:
return {
"action": ToolParamDefinitionParam(
param_type="string",
description="Whether to compose, reply, or forward an email",
required=True
),
"to": ToolParamDefinitionParam(
param_type="str",
description="The recipient of the email",
required=True
),
"subject": ToolParamDefinitionParam(
param_type="str",
description="The subject of the email",
required=True
),
"body": ToolParamDefinitionParam(
param_type="str",
description="The content of the email",
required=True
),
"email_id": ToolParamDefinitionParam(
param_type="str",
description="The email id to reply or forward to",
required=False
)
}
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
role="ipython",
)
return [message]
async def run_impl(self, action, to, subject, body="", email_id="") -> Dict[str, Any]:
"""Send an email."""
result = send_email(action, to, subject, body, email_id)
return {"name": self.get_name(), "result": result}
class GetPDFSummaryTool(CustomTool):
"""Get a summary of a PDF attachment."""
def get_name(self) -> str:
return "get_pdf_summary"
def get_description(self) -> str:
return "Get a summary of a PDF attachment"
def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]:
return {
"file_name": ToolParamDefinitionParam(
param_type="string",
description="The name of the PDF file",
required=True
)
}
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
role="ipython",
)
return [message]
async def run_impl(self, file_name: str) -> Dict[str, Any]:
"""Get the summary of a PDF file."""
summary = get_pdf_summary(file_name)
return {"name": self.get_name(), "result": summary}
class CreateDraftTool(CustomTool):
"""Create a new, reply, or forward email draft."""
def get_name(self) -> str:
return "create_draft"
def get_description(self) -> str:
return "Create a new, reply, or forward email draft"
def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]:
return {
"action": ToolParamDefinitionParam(
param_type="string",
description="Whether to compose, reply, or forward an email",
required=True
),
"to": ToolParamDefinitionParam(
param_type="str",
description="The recipient of the email",
required=True
),
"subject": ToolParamDefinitionParam(
param_type="str",
description="The subject of the email",
required=True
),
"body": ToolParamDefinitionParam(
param_type="str",
description="The content of the email",
required=True
),
"email_id": ToolParamDefinitionParam(
param_type="str",
description="The email id to reply or forward to, or empty if draft a new email.",
required=True
)
}
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
role="ipython",
)
return [message]
async def run_impl(self, action, to, subject, body="", email_id="") -> Dict[str, Any]:
"""Create an email draft."""
result = create_draft(action, to, subject, body, email_id)
return {"name": self.get_name(), "result": result}
class SendDraftTool(CustomTool):
"""Send a draft email."""
def get_name(self) -> str:
return "send_draft"
def get_description(self) -> str:
return "Send a draft email"
def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]:
return {
"id": ToolParamDefinitionParam(
param_type="str",
description="The email draft id.",
required=True
)
}
async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]:
assert len(messages) == 1, "Expected single message"
message = messages[0]
tool_call = message.tool_calls[0]
try:
response = await self.run_impl(**tool_call.arguments)
response_str = json.dumps(response, ensure_ascii=False)
except Exception as e:
response_str = f"Error when running tool: {e}"
message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=response_str,
role="ipython",
)
return [message]
async def run_impl(self, id: str) -> Dict[str, Any]:
"""Send the last draft email."""
result = send_draft(memory['draft_id'])
return {"name": self.get_name(), "result": result}
examples = """
{"name": "list_emails", "parameters": {"query": "has:attachment larger:5mb"}}
{"name": "list_emails", "parameters": {"query": "has:attachment"}}
{"name": "list_emails", "parameters": {"query": "newer_than:1d"}}
{"name": "list_emails", "parameters": {"query": "older_than:1d"}}
{"name": "list_emails", "parameters": {"query": "is:unread"}}
{"name": "list_emails", "parameters": {"query": "<query> is:unread"}}
{"name": "list_emails", "parameters": {"query": "<query> is:read"}}
{"name": "get_email_detail", "parameters": {"detail": "body", "which": "first"}}
{"name": "get_email_detail", "parameters": {"detail": "body", "which": "last"}}
{"name": "get_email_detail", "parameters": {"detail": "body", "which": "second"}}
{"name": "get_email_detail", "parameters": {"detail": "body", "which": "subject <subject info>"}}
{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": "from <sender info>"}}
{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": "first"}}
{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": "last"}}
{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": "<email id>"}}
{"name": "send_email", "parameters": {"action": "compose", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx"}}
{"name": "send_email", "parameters": {"action": "reply", "to": "", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}}
{"name": "send_email", "parameters": {"action": "forward", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}}
{"name": "create_draft", "parameters": {"action": "new", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx", "email_id": ""}}
{"name": "create_draft", "parameters": {"action": "reply", "to": "", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}}
{"name": "create_draft", "parameters": {"action": "forward", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}}
{"name": "send_draft", "parameters": {"id": "..."}}
{"name": "get_pdf_summary", "parameters": {"file_name": "..."}}
"""
system_prompt = f"""
Your name is Email Agent, an assistant that can perform all email related tasks for your user.
Respond to the user's ask by making use of the following functions if needed.
If no available functions can be used, just say "I don't know" and don't make up facts.
Example responses:
{examples}
"""

View file

@ -0,0 +1,138 @@
import argparse
import email_agent
import asyncio
import json
from functions_prompt import *
from llama_stack_client import LlamaStackClient
from llama_stack_client.lib.agents.agent import Agent
from llama_stack_client.lib.agents.event_logger import EventLogger
from llama_stack_client.types.agent_create_params import (
AgentConfig,
)
from shared import memory
LLAMA_STACK_API_TOGETHER_URL="https://llama-stack.together.ai"
LLAMA31_8B_INSTRUCT = "Llama3.1-8B-Instruct"
async def create_email_agent(client: LlamaStackClient) -> Agent:
"""Create an agent with gmail tool capabilities."""
listEmailsTool = ListEmailsTool()
getEmailDetailTool = GetEmailDetailTool()
sendEmailTool = SendEmailTool()
getPDFSummaryTool = GetPDFSummaryTool()
createDraftTool = CreateDraftTool()
sendDraftTool = SendDraftTool()
agent_config = AgentConfig(
model=LLAMA31_8B_INSTRUCT,
instructions=system_prompt,
sampling_params={
"strategy": "greedy",
"temperature": 0.0,
"top_p": 0.9,
},
tools = [
listEmailsTool.get_tool_definition(),
getEmailDetailTool.get_tool_definition(),
sendEmailTool.get_tool_definition(),
getPDFSummaryTool.get_tool_definition(),
createDraftTool.get_tool_definition(),
sendDraftTool.get_tool_definition(),
],
tool_choice = "auto",
tool_prompt_format = "json",
input_shields = [],
output_shields = [],
enable_session_persistence = True
)
agent = Agent(
client = client,
agent_config = agent_config,
custom_tools = (
listEmailsTool,
getEmailDetailTool,
sendEmailTool,
getPDFSummaryTool,
createDraftTool,
sendDraftTool
)
)
return agent
async def main():
parser = argparse.ArgumentParser(description="Set email address")
parser.add_argument("--email", type=str, required=True, help="Your Gmail address")
args = parser.parse_args()
email_agent.set_email_service(args.email)
greeting = llama31("hello", "Your name is Email Agent, an assistant that can perform all email related tasks for your user.")
agent_response = f"{greeting}\n\nYour ask: "
client = LlamaStackClient(base_url=LLAMA_STACK_API_TOGETHER_URL)
agent = await create_email_agent(client)
session_id = agent.create_session("email-session")
while True:
ask = input(agent_response)
if ask == "bye":
print(llama31("bye"))
break
print("\n-------------------------\nCalling Llama...")
response = agent.create_turn(
messages=[{"role": "user", "content": ask}],
session_id=session_id,
)
async for log in EventLogger().log(response):
if log.role == "CustomTool":
tool_name = json.loads(log.content)['name']
result = json.loads(log.content)['result']
# post processing
if tool_name == 'list_emails':
memory['emails'] = result
num = len(result)
if num == 0:
output = "I couldn't find any such emails. What else would you like to do?"
elif num <= 5:
output = f"I found {num} email{'s' if num > 1 else ''} matching your query:\n"
for i, email in enumerate(result, start=1):
output += f"{i}. From: {email['sender']}, Subject: {email['subject']}, Received on: {email['received_time']}\n"
else:
output = f"I found {num} emails matching your query. Here are the first 5 emails:\n"
for i in range(1, 6):
output += f"{i}. From: {result[i - 1]['sender']}, Subject: {result[i - 1]['subject']}, Received on: {result[i - 1]['received_time']}\n"
elif tool_name == "get_email_detail":
output = result
elif tool_name == "create_draft":
output = "Draft created."
memory['draft_id'] = result
elif tool_name == "send_draft":
output = result
elif tool_name == "send_email":
output = "Email sent."
elif tool_name == "get_pdf_summary":
output = result
print(f"\n-------------------------\n\nGmagent: {output}\n")
elif log.role == "inference":
print("Llama returned: ", end="")
else:
print(log, end="")
agent_response = "Your ask: "
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,10 @@
google-auth==2.27.0
google-auth-oauthlib==0.4.6
google-auth-httplib2==0.1.0
google-api-python-client==2.34.0
llama_stack_client==0.0.50
pytz
beautifulsoup4
ollama==0.4.4
pypdf
termcolor