mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-17 21:09:47 +00:00
initial version working
This commit is contained in:
parent
a29013112f
commit
7ff39d1857
4 changed files with 1110 additions and 0 deletions
624
docs/zero_to_hero_guide/gmail_agent/gmagent.py
Normal file
624
docs/zero_to_hero_guide/gmail_agent/gmagent.py
Normal file
|
|
@ -0,0 +1,624 @@
|
|||
from google.auth.transport.requests import Request
|
||||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||||
from googleapiclient.discovery import build
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.base import MIMEBase
|
||||
from email import encoders
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
import os
|
||||
import pytz
|
||||
import base64
|
||||
import pickle
|
||||
from datetime import datetime, timezone
|
||||
import json
|
||||
import ollama
|
||||
from pypdf import PdfReader
|
||||
from pathlib import Path
|
||||
|
||||
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.compose']
|
||||
|
||||
def authenticate_gmail(user_email):
|
||||
creds = None
|
||||
token_file = f'token_{user_email}.pickle' # Unique token file for each user
|
||||
|
||||
# Load the user's token if it exists
|
||||
if os.path.exists(token_file):
|
||||
with open(token_file, 'rb') as token:
|
||||
creds = pickle.load(token)
|
||||
|
||||
# If no valid credentials, prompt the user to log in
|
||||
if not creds or not creds.valid:
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
creds.refresh(Request())
|
||||
else:
|
||||
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
|
||||
creds = flow.run_console()
|
||||
|
||||
# Save the new credentials to a user-specific token file
|
||||
with open(token_file, 'wb') as token:
|
||||
pickle.dump(creds, token)
|
||||
|
||||
# Build the Gmail API service
|
||||
service = build('gmail', 'v1', credentials=creds)
|
||||
return service
|
||||
|
||||
|
||||
def num_of_emails(query=''):
|
||||
response = service.users().messages().list(
|
||||
userId='me',
|
||||
q=query,
|
||||
maxResults=1).execute()
|
||||
return response.get('resultSizeEstimate', 0)
|
||||
|
||||
|
||||
def list_emails(query='', max_results=100):
|
||||
emails = []
|
||||
next_page_token = None
|
||||
|
||||
while True:
|
||||
response = service.users().messages().list(
|
||||
userId=user_id,
|
||||
maxResults=max_results,
|
||||
pageToken=next_page_token,
|
||||
q=query
|
||||
).execute()
|
||||
|
||||
if 'messages' in response:
|
||||
for msg in response['messages']:
|
||||
sender, subject, received_time = get_email_info(msg['id'])
|
||||
emails.append(
|
||||
{
|
||||
"message_id": msg['id'],
|
||||
"sender": sender,
|
||||
"subject": subject,
|
||||
"received_time": received_time
|
||||
}
|
||||
)
|
||||
|
||||
next_page_token = response.get('nextPageToken')
|
||||
|
||||
if not next_page_token:
|
||||
break
|
||||
|
||||
return emails
|
||||
|
||||
def get_email_detail(detail, which=''):
|
||||
if detail == 'body':
|
||||
return get_email_body(which)
|
||||
elif detail == 'attachment':
|
||||
return get_email_attachments(which)
|
||||
|
||||
|
||||
def get_email_body(message_id):
|
||||
try:
|
||||
message = service.users().messages().get(
|
||||
userId=user_id,
|
||||
id=message_id,
|
||||
format='full').execute()
|
||||
|
||||
# Recursive function to extract the parts
|
||||
def extract_parts(payload):
|
||||
text_body = ""
|
||||
if 'parts' in payload:
|
||||
for part in payload['parts']:
|
||||
return extract_parts(part)
|
||||
else:
|
||||
mime_type = payload.get('mimeType')
|
||||
body = payload.get('body', {}).get('data')
|
||||
if mime_type == 'text/html':
|
||||
decoded_body = base64.urlsafe_b64decode(body).decode('utf-8')
|
||||
soup = BeautifulSoup(decoded_body, 'html.parser')
|
||||
text_body = soup.get_text().strip()
|
||||
elif mime_type == 'text/plain':
|
||||
decoded_body = base64.urlsafe_b64decode(body).decode('utf-8')
|
||||
text_body = decoded_body
|
||||
|
||||
return text_body
|
||||
|
||||
return extract_parts(message['payload'])
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def parse_message(message):
|
||||
payload = message['payload']
|
||||
headers = payload.get("headers")
|
||||
|
||||
subject = None
|
||||
sender = None
|
||||
for header in headers:
|
||||
if header['name'] == 'Subject':
|
||||
subject = header['value']
|
||||
elif header['name'] == 'From':
|
||||
sender = header['value']
|
||||
|
||||
internal_date = message.get('internalDate')
|
||||
utc_time = datetime.fromtimestamp(int(internal_date) / 1000, tz=timezone.utc)
|
||||
|
||||
# Convert UTC to the specified timezone
|
||||
local_timezone = pytz.timezone("America/Los_Angeles")
|
||||
local_time = utc_time.astimezone(local_timezone)
|
||||
|
||||
# Format the local time as a string
|
||||
received_time = local_time.strftime('%Y-%m-%d %H:%M:%S %Z')
|
||||
|
||||
# Check if the email is plain text or multipart
|
||||
if 'parts' in payload:
|
||||
# Multipart message - find the text/plain or text/html part
|
||||
for part in payload['parts']:
|
||||
if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html': # You can also look for 'text/html'
|
||||
data = part['body']['data']
|
||||
body = base64.urlsafe_b64decode(data).decode('utf-8')
|
||||
return sender, subject, received_time, body
|
||||
elif part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']:
|
||||
return sender, subject, received_time, get_email_body(message.get('id'))
|
||||
else:
|
||||
# Single part message
|
||||
data = payload['body']['data']
|
||||
body = base64.urlsafe_b64decode(data).decode('utf-8')
|
||||
return sender, subject, received_time, body
|
||||
|
||||
|
||||
def get_email_info(msg_id):
|
||||
message = service.users().messages().get(
|
||||
userId=user_id,
|
||||
id=msg_id,
|
||||
format='full').execute()
|
||||
|
||||
sender, subject, received_time, body = parse_message(message)
|
||||
|
||||
return sender, subject, received_time
|
||||
|
||||
|
||||
def reply_email(message_id, reply_text):
|
||||
# Fetch the original message
|
||||
original_message = service.users().messages().get(
|
||||
userId=user_id,
|
||||
id=message_id,
|
||||
format='full').execute()
|
||||
|
||||
# Get headers
|
||||
headers = original_message['payload']['headers']
|
||||
subject = None
|
||||
to = None
|
||||
for header in headers:
|
||||
if header['name'] == 'Subject':
|
||||
subject = header['value']
|
||||
if header['name'] == 'From':
|
||||
to = header['value']
|
||||
|
||||
# Create the reply subject
|
||||
if not subject.startswith("Re: "):
|
||||
subject = "Re: " + subject
|
||||
|
||||
# Compose the reply message
|
||||
reply_message = MIMEText(reply_text)
|
||||
reply_message['to'] = to
|
||||
reply_message['from'] = user_id
|
||||
reply_message['subject'] = subject
|
||||
reply_message['In-Reply-To'] = message_id
|
||||
|
||||
# Encode and send the message
|
||||
raw_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode("utf-8")
|
||||
body = {'raw': raw_message,
|
||||
'threadId': original_message['threadId']}
|
||||
sent_message = service.users().messages().send(
|
||||
userId=user_id,
|
||||
body=body).execute()
|
||||
print("Reply sent. Message ID:", sent_message['id'])
|
||||
|
||||
|
||||
def forward_email(message_id, forward_to, email_body=None):
|
||||
"""
|
||||
Forwards an email, preserving the original MIME type, including multipart/related.
|
||||
"""
|
||||
# Get the original message in 'full' format
|
||||
original_message = service.users().messages().get(
|
||||
userId=user_id,
|
||||
id=message_id,
|
||||
format='full').execute()
|
||||
|
||||
# Extract the payload and headers
|
||||
payload = original_message.get('payload', {})
|
||||
headers = payload.get('headers', [])
|
||||
parts = payload.get('parts', [])
|
||||
# Get the Subject
|
||||
subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject')
|
||||
|
||||
# Create a new MIME message for forwarding
|
||||
mime_message = MIMEMultipart(payload.get('mimeType', 'mixed').split('/')[-1])
|
||||
mime_message['To'] = forward_to
|
||||
mime_message['Subject'] = f"Fwd: {subject}"
|
||||
|
||||
# Add the optional custom email body
|
||||
if email_body:
|
||||
mime_message.attach(MIMEText(email_body, 'plain'))
|
||||
|
||||
# Function to fetch attachment data by attachmentId
|
||||
def fetch_attachment_data(attachment_id, message_id):
|
||||
attachment = service.users().messages().attachments().get(
|
||||
userId=user_id, messageId=message_id, id=attachment_id
|
||||
).execute()
|
||||
return base64.urlsafe_b64decode(attachment['data'])
|
||||
|
||||
# Rebuild MIME structure
|
||||
def rebuild_parts(parts):
|
||||
"""
|
||||
Recursively rebuild MIME parts.
|
||||
"""
|
||||
if not parts:
|
||||
return None
|
||||
|
||||
for part in parts:
|
||||
part_mime_type = part.get('mimeType', 'text/plain')
|
||||
part_body = part.get('body', {})
|
||||
part_data = part_body.get('data', '')
|
||||
part_parts = part.get('parts', []) # Sub-parts for multipart types
|
||||
filename = part.get('filename')
|
||||
attachment_id = part_body.get('attachmentId')
|
||||
|
||||
if part_mime_type.startswith('multipart/'):
|
||||
# Rebuild nested multipart
|
||||
sub_multipart = MIMEMultipart(part_mime_type.split('/')[-1])
|
||||
sub_parts = rebuild_parts(part_parts)
|
||||
if sub_parts:
|
||||
for sub_part in sub_parts:
|
||||
sub_multipart.attach(sub_part)
|
||||
yield sub_multipart
|
||||
elif filename and attachment_id:
|
||||
# Handle attachments
|
||||
decoded_data = fetch_attachment_data(attachment_id, message_id)
|
||||
attachment = MIMEBase(*part_mime_type.split('/'))
|
||||
attachment.set_payload(decoded_data)
|
||||
encoders.encode_base64(attachment)
|
||||
attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"')
|
||||
yield attachment
|
||||
else:
|
||||
if part_data:
|
||||
# Decode and attach non-multipart parts
|
||||
decoded_data = base64.urlsafe_b64decode(part_data)
|
||||
|
||||
if part_mime_type == 'text/plain':
|
||||
yield MIMEText(decoded_data.decode('utf-8'), 'plain')
|
||||
elif part_mime_type == 'text/html':
|
||||
yield MIMEText(decoded_data.decode('utf-8'), 'html')
|
||||
|
||||
# Rebuild the main MIME structure
|
||||
rebuilt_parts = rebuild_parts(parts)
|
||||
if rebuilt_parts:
|
||||
for rebuilt_part in rebuilt_parts:
|
||||
mime_message.attach(rebuilt_part)
|
||||
|
||||
# Encode the MIME message to base64
|
||||
raw = base64.urlsafe_b64encode(mime_message.as_bytes()).decode('utf-8')
|
||||
|
||||
# Send the email
|
||||
forward_body = {'raw': raw}
|
||||
sent_message = service.users().messages().send(userId=user_id, body=forward_body).execute()
|
||||
|
||||
print(f"Message forwarded successfully! Message ID: {sent_message['id']}")
|
||||
|
||||
|
||||
def send_email(action, to, subject, body="", email_id=""):
|
||||
if action == "compose":
|
||||
message = MIMEText(body)
|
||||
message['to'] = to
|
||||
message['from'] = user_id
|
||||
message['subject'] = subject
|
||||
|
||||
# Encode and send the message
|
||||
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")
|
||||
body = {'raw': raw_message}
|
||||
sent_message = service.users().messages().send(
|
||||
userId=user_id,
|
||||
body=body).execute()
|
||||
return sent_message['id']
|
||||
elif action == "reply": # reply or forward; a message id is needed
|
||||
reply_email(email_id, body)
|
||||
elif action == "forward":
|
||||
forward_email(email_id, to, body)
|
||||
|
||||
|
||||
def create_draft(action, to, subject, body="", email_id=""):
|
||||
if action == "new":
|
||||
message = MIMEText(body)
|
||||
message['to'] = to
|
||||
message['from'] = user_id
|
||||
message['subject'] = subject
|
||||
|
||||
encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||||
draft_body = {'message': {'raw': encoded_message}}
|
||||
draft = service.users().drafts().create(
|
||||
userId=user_id,
|
||||
body=draft_body).execute()
|
||||
print(f"Draft created with ID: {draft['id']}")
|
||||
return draft['id']
|
||||
elif action == "reply":
|
||||
return create_reply_draft(email_id, body)
|
||||
elif action == "forward":
|
||||
return create_forward_draft(email_id, to, body)
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
|
||||
def create_reply_draft(message_id, reply_text):
|
||||
# Fetch the original message
|
||||
original_message = service.users().messages().get(
|
||||
userId=user_id,
|
||||
id=message_id,
|
||||
format='full').execute()
|
||||
|
||||
# Get headers
|
||||
headers = original_message['payload']['headers']
|
||||
subject = None
|
||||
to = None
|
||||
for header in headers:
|
||||
if header['name'] == 'Subject':
|
||||
subject = header['value']
|
||||
if header['name'] == 'From':
|
||||
to = header['value']
|
||||
|
||||
# Create the reply subject
|
||||
if not subject.startswith("Re: "):
|
||||
subject = "Re: " + subject
|
||||
|
||||
# Compose the reply message
|
||||
reply_message = MIMEText(reply_text)
|
||||
reply_message['to'] = to
|
||||
reply_message['from'] = user_id
|
||||
reply_message['subject'] = subject
|
||||
reply_message['In-Reply-To'] = message_id
|
||||
|
||||
encoded_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode()
|
||||
draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}}
|
||||
draft = service.users().drafts().create(userId=user_id, body=draft_body).execute()
|
||||
return draft['id']
|
||||
|
||||
|
||||
def create_forward_draft(message_id, recipient_email, custom_message=None):
|
||||
# Get the original message
|
||||
original_message = service.users().messages().get(
|
||||
userId=user_id,
|
||||
id=message_id,
|
||||
format='raw').execute()
|
||||
|
||||
# Decode the raw message
|
||||
raw_message = base64.urlsafe_b64decode(original_message['raw'].encode('utf-8'))
|
||||
|
||||
# Prepare the forward header and optional custom message
|
||||
forward_header = f"----- Forwarded message -----\nFrom: {recipient_email}\n\n"
|
||||
if custom_message:
|
||||
forward_header += f"{custom_message}\n\n"
|
||||
|
||||
# Combine the forward header with the original message
|
||||
new_message = forward_header + raw_message.decode('utf-8')
|
||||
|
||||
# Encode the combined message into base64 format
|
||||
encoded_message = base64.urlsafe_b64encode(new_message.encode('utf-8')).decode('utf-8')
|
||||
|
||||
draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}}
|
||||
draft = service.users().drafts().create(userId=user_id, body=draft_body).execute()
|
||||
print(f"Forward draft created with ID: {draft['id']}")
|
||||
return draft['id']
|
||||
|
||||
|
||||
def send_draft(id):
|
||||
sent_message = service.users().drafts().send(
|
||||
userId=user_id,
|
||||
body={'id': id}
|
||||
).execute()
|
||||
return f"Draft sent with email ID: {sent_message['id']}"
|
||||
|
||||
|
||||
def get_pdf_summary(file_name):
|
||||
text = pdf2text(file_name)
|
||||
print("Calling Llama to generate a summary...")
|
||||
response = llama31(text, "Generate a summary of the input text in 5 sentences.")
|
||||
return response
|
||||
|
||||
|
||||
def get_email_attachments(message_id, mime_type='application/pdf'):
|
||||
attachments = []
|
||||
|
||||
# Helper function to process email parts
|
||||
def process_parts(parts):
|
||||
for part in parts:
|
||||
if part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']:
|
||||
# Recursively process nested parts
|
||||
if 'parts' in part:
|
||||
process_parts(part['parts'])
|
||||
elif 'filename' in part and part['filename']:
|
||||
if part['mimeType'] == mime_type: # Check for the desired MIME type
|
||||
attachment_id = part['body'].get('attachmentId')
|
||||
if attachment_id:
|
||||
# Get the attachment data
|
||||
attachment = service.users().messages().attachments().get(
|
||||
userId=user_id,
|
||||
messageId=message_id,
|
||||
id=attachment_id
|
||||
).execute()
|
||||
|
||||
# Decode the attachment content
|
||||
file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8'))
|
||||
|
||||
with open(part['filename'], "wb") as f:
|
||||
f.write(file_data)
|
||||
|
||||
# Save the attachment information
|
||||
attachments.append(
|
||||
{'filename': part['filename'],
|
||||
'data': file_data,
|
||||
'size': attachment.get('size', 0)
|
||||
})
|
||||
|
||||
# Retrieve the email message
|
||||
message = service.users().messages().get(
|
||||
userId=user_id,
|
||||
id=message_id,
|
||||
format='full').execute()
|
||||
payload = message['payload']
|
||||
|
||||
# Start processing the parts
|
||||
if 'parts' in payload:
|
||||
process_parts(payload['parts'])
|
||||
|
||||
rslt = ""
|
||||
for a in attachments:
|
||||
rslt += f"{a['filename']} - {a['size']} bytes\n"
|
||||
return rslt #attachments
|
||||
|
||||
|
||||
def pdf2text(file):
|
||||
text = ''
|
||||
try:
|
||||
with Path(file).open("rb") as f:
|
||||
reader = PdfReader(f)
|
||||
text = "\n\n".join([page.extract_text() for page in reader.pages])
|
||||
except Exception as e:
|
||||
raise f"Error reading the PDF file: {str(e)}"
|
||||
|
||||
print(f"\nPDF text length: {len(text)}\n")
|
||||
|
||||
return text
|
||||
|
||||
|
||||
user_email = None
|
||||
service = None
|
||||
user_id = 'me'
|
||||
|
||||
def set_email_service(gmail):
|
||||
global user_email
|
||||
global service
|
||||
|
||||
user_email = gmail
|
||||
service = authenticate_gmail(user_email)
|
||||
|
||||
# class Agent:
|
||||
# def __init__(self, system_prompt=""):
|
||||
# self.system_prompt = system_prompt
|
||||
# self.messages = []
|
||||
#
|
||||
# # Gmagent-specific short term memory, used to answer follow up questions AFTER a list of emails is found matching user's query
|
||||
# self.emails = []
|
||||
# self.draft_id = None
|
||||
#
|
||||
# if self.system_prompt:
|
||||
# self.messages.append({"role": "system", "content": system_prompt})
|
||||
#
|
||||
# def __call__(self, user_prompt_or_tool_result, is_tool_call=False):
|
||||
# # if it's tool call result, use "ipython" instead of "user" for the role
|
||||
# self.messages.append({"role": ("ipython" if is_tool_call else "user"), "content": user_prompt_or_tool_result})
|
||||
# result = self.llama()
|
||||
# print(f"\nLlama returned: {result}.")
|
||||
# if type(result) == dict: # result is a dict only if it's a tool call spec
|
||||
# function_name = result["function_name"]
|
||||
# func = globals()[function_name]
|
||||
# parameters = result["parameters"]
|
||||
# if function_name == "get_email_detail":
|
||||
# # TODO: parse which - valid values are first, second,
|
||||
# # third, fourth, last, from xxx
|
||||
# if 'id' in parameters.keys():
|
||||
# parameters['which'] = parameters['id']
|
||||
# del parameters['id'] # per the function spec
|
||||
# elif 'which' in parameters.keys():
|
||||
# if 'from ' in parameters['which']:
|
||||
# sender = parameters['which'].split('from ')[-1]
|
||||
# for email in self.emails:
|
||||
# if email['sender'].find(sender) != -1:
|
||||
# parameters['which'] = email['message_id']
|
||||
# break
|
||||
# if 'subject ' in parameters['which']:
|
||||
# subject = parameters['which'].split('subject ')[-1]
|
||||
# # exact match beats substring
|
||||
# for email in self.emails:
|
||||
# if email['subject'].upper() == subject.upper():
|
||||
# parameters['which'] = email['message_id']
|
||||
# break
|
||||
# elif email['subject'].upper().find(subject.upper()) != -1:
|
||||
# parameters['which'] = email['message_id']
|
||||
#
|
||||
# elif 'id_' in parameters['which']:
|
||||
# parameters['which'] = parameters['which'].split('id_')[-1]
|
||||
# else:
|
||||
# parameters['which'] = self.emails[-1]['message_id']
|
||||
# elif function_name == "send_draft":
|
||||
# parameters['id'] = self.draft_id
|
||||
#
|
||||
# print(f"\nCalling tool to access Gmail API: {function_name}, {parameters}...")
|
||||
# result = func(**parameters)
|
||||
# print(f"\nTool calling returned: {result}")
|
||||
#
|
||||
# # convert function calling result to concise summary, offering interactive follow ups,
|
||||
# # for smooth and user friendly experience
|
||||
# if function_name == 'list_emails':
|
||||
# self.emails = result
|
||||
# num = len(result)
|
||||
# if num == 0:
|
||||
# output = "I couldn't find any such emails. What else would you like to do?"
|
||||
# elif num <= 5:
|
||||
# output = f"I found {num} email{'s' if num > 1 else ''} matching your query:\n"
|
||||
# for i, email in enumerate(result, start=1):
|
||||
# output += f"{i}. From: {email['sender']}, Subject: {email['subject']}, Received on: {email['received_time']}\n"
|
||||
# else:
|
||||
# output = f"I found {num} emails matching your query. Here are the first 5 emails:\n"
|
||||
# for i in range(1, 6):
|
||||
# output += f"{i}. From: {result[i-1]['sender']}, Subject: {result[i-1]['subject']}, Received on: {result[i-1]['received_time']}\n"
|
||||
# elif function_name == "get_email_detail":
|
||||
# output = result
|
||||
# elif function_name == "get_pdf_summary":
|
||||
# output = result
|
||||
# elif function_name == "send_email":
|
||||
# output = "Email sent."
|
||||
# elif function_name == "create_draft":
|
||||
# output = "Draft created."
|
||||
# self.draft_id = result
|
||||
# elif function_name == "send_draft":
|
||||
# output = result
|
||||
#
|
||||
# print(f"\n-------------------------\n\nGmagent: {output}\n")
|
||||
# else:
|
||||
# output = result # direct text, not JSON, response by Llama
|
||||
#
|
||||
# # adding this may cause Llama to hallucinate when answering
|
||||
# # follow up questions. e.g. "do i have emails with attachments
|
||||
# # larger than 20mb" got right tool calling response, then
|
||||
# # follow up "larger than 10mb" got hallucinated response.
|
||||
# # self.messages.append({"role": "assistant", "content": output})
|
||||
#
|
||||
# # this mitigates the hallucination
|
||||
# self.messages.append({"role": "assistant", "content": str(result)})
|
||||
#
|
||||
# return output
|
||||
#
|
||||
# def llama(self):
|
||||
# response = ollama.chat(model='llama3.1',
|
||||
# messages = self.messages,
|
||||
# options = {
|
||||
# "temperature": 0.0
|
||||
# }
|
||||
# )
|
||||
# result = response['message']['content']
|
||||
#
|
||||
# try:
|
||||
# res = json.loads(result.split("<|python_tag|>")[-1])
|
||||
# function_name = res['name']
|
||||
# parameters = res['parameters']
|
||||
# return {"function_name": function_name,
|
||||
# "parameters": parameters}
|
||||
# except:
|
||||
# return result
|
||||
#
|
||||
#
|
||||
def llama31(user_prompt: str, system_prompt = ""):
|
||||
response = ollama.chat(model='llama3.1',
|
||||
messages=[
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt},
|
||||
],
|
||||
)
|
||||
return response['message']['content']
|
||||
Loading…
Add table
Add a link
Reference in a new issue