[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/meta-llama/llama-stack/blob/main/docs/notebooks/Llama_Stack_Agent_Workflows.ipynb)

# Build and Monitor Agent Workflows with Llama Stack + Anthropic's Best Practice

This notebook contains Llama Stack implementations of common agent workflows defined in Anthropic's blog post [Building Effective Agent Workflows](https://www.anthropic.com/research/building-effective-agents). 

**1. Basic Workflows**
- 1.1 Prompt Chaining
- 1.2 Routing
- 1.3 Parallelization

**2. Advanced Workflows**
- 2.1 Evaluator-Optimizer
- 2.2 Orchestrator-Workers


For each workflow type, we present minimal implementations using Llama Stack using task examples from [anthropic-cookbook](https://github.com/anthropics/anthropic-cookbook/tree/main/patterns/agents), and showcase how to monitor the internals within each workflow execution. 

## 0. Setup

In [None]:
# NBVAL_SKIP
!pip install -U llama-stack llama-stack-client
llama stack list-deps fireworks | xargs -L1 uv pip install


In [None]:
from llama_stack_client import LlamaStackClient, Agent
from llama_stack.core.library_client import LlamaStackAsLibraryClient
from rich.pretty import pprint
import json
import uuid
from pydantic import BaseModel
import rich
import os
try:
    from google.colab import userdata
    os.environ['FIREWORKS_API_KEY'] = userdata.get('FIREWORKS_API_KEY')
except ImportError:
    print("Not in Google Colab environment")

client = LlamaStackAsLibraryClient("fireworks", provider_data = {"fireworks_api_key": os.environ['FIREWORKS_API_KEY']})
_ = client.initialize()

# Uncomment to run on a hosted Llama Stack server
# client = LlamaStackClient(base_url="http://localhost:8321")

MODEL_ID = "meta-llama/Llama-3.3-70B-Instruct"

base_agent_config = dict(
    model=MODEL_ID,
    instructions="You are a helpful assistant.",
    sampling_params={
        "strategy": {"type": "top_p", "temperature": 1.0, "top_p": 0.9},
    },
)

## 1. Basic Workflows

### 1.1 Prompt Chaining

**Prompt chaining** decomposes a task into a sequence of steps, where each LLM call processes the output of the previous one.

![](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F7418719e3dab222dccb379b8879e1dc08ad34c78-2401x1000.png&w=3840&q=75)

**Example: Formatting Report Data**
- We'll build a agent and use prompt chaining by sending in a series of prompts to guide the agent to extract the data from the report.

In [109]:
vanilla_agent_config = {
    **base_agent_config,
    "instructions": """
    You are a helpful assistant capable of structuring data extraction and formatting. 

    You will be given tasks to extract and format data from a performance report. Here is the report:

    Q3 Performance Summary:
    Our customer satisfaction score rose to 92 points this quarter.
    Revenue grew by 45% compared to last year.
    Market share is now at 23% in our primary market.
    Customer churn decreased to 5% from 8%.
    New user acquisition cost is $43 per user.
    Product adoption rate increased to 78%.
    Employee satisfaction is at 87 points.
    Operating margin improved to 34%.
    """,
}

vanilla_agent = Agent(client, **vanilla_agent_config)
prompt_chaining_session_id = vanilla_agent.create_session(session_name=f"vanilla_agent_{uuid.uuid4()}")

prompts = [
    """Extract only the numerical values and their associated metrics from the text.
    Format each as 'value: metric' on a new line.
    Example format:
    92: customer satisfaction
    45%: revenue growth""",

    """Convert all numerical values to percentages where possible.
    If not a percentage or points, convert to decimal (e.g., 92 points -> 92%).
    Keep one number per line.
    Example format:
    92%: customer satisfaction
    45%: revenue growth""",

    """Sort all lines in descending order by numerical value.
    Keep the format 'value: metric' on each line.
    Example:
    92%: customer satisfaction
    87%: employee satisfaction""",

    """Format the sorted data as a markdown table with columns:
    | Metric | Value |
    |:--|--:|
    | Customer Satisfaction | 92% |""",
]

for i, prompt in enumerate(prompts):    
    response = vanilla_agent.create_turn(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        session_id=prompt_chaining_session_id,
        stream=False,
    )
    print("========= Turn: ", i, "=========")
    print(response.output_message.content)
    print("\n")

92: customer satisfaction score
45%: revenue growth
23%: market share
5%: customer churn
43: new user acquisition cost
78%: product adoption rate
87: employee satisfaction
34%: operating margin
8%: customer churn (previous)


92%: customer satisfaction
45%: revenue growth
23%: market share
5%: customer churn
87%: employee satisfaction
78%: product adoption rate
34%: operating margin
8%: previous customer churn
0.043: new user acquisition cost (as a decimal, assuming $43 is a dollar value and not a percentage)


92%: customer satisfaction
87%: employee satisfaction
78%: product adoption rate
45%: revenue growth
34%: operating margin
23%: market share
8%: previous customer churn
5%: customer churn
0.043: new user acquisition cost


| Metric | Value |
|:--|--:|
| Customer Satisfaction | 92% |
| Employee Satisfaction | 87% |
| Product Adoption Rate | 78% |
| Revenue Growth | 45% |
| Operating Margin | 34% |
| Market Share | 23% |
| Previous Customer Churn | 8% |
| Customer Churn | 5% |
| N

#### 1.1.1 Monitor Prompt Chaining Internals

We can use the `prompt_chaining_session_id` to retrieve details about what happened during the agent session. We can see that we created 4 sequential turns, to guide the agents to extract the data from the report.

In [101]:
vanilla_agent_session = client.agents.session.retrieve(session_id=prompt_chaining_session_id, agent_id=vanilla_agent.agent_id)
pprint(vanilla_agent_session.to_dict())

### 1.2 Routing

**Routing** classifies an input and directs it to a specialized followup task. This workflow allows for separation of concerns, and building more specialized prompts. 

![](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F5c0c0e9fe4def0b584c04d37849941da55e5e71c-2401x1000.png&w=3840&q=75)

**Example: Routing to Support Teams**
We'll demonstrating how routing workflows works with: 
   - **4 specialized agents**, each specializes in a different support team from billing, technical, account, and product
   - **1 routing agent** that decides which specialized agent to route the user's request to based on the user's request.

In [38]:
# 1. Define a couple of specialized agents
billing_agent_config = {
    **base_agent_config,
    "instructions": """You are a billing support specialist. Follow these guidelines:
    1. Always start with "Billing Support Response:"
    2. First acknowledge the specific billing issue
    3. Explain any charges or discrepancies clearly
    4. List concrete next steps with timeline
    5. End with payment options if relevant
    
    Keep responses professional but friendly.
    """,
}

technical_agent_config = {
    **base_agent_config,
    "instructions": """You are a technical support engineer. Follow these guidelines:
    1. Always start with "Technical Support Response:"
    2. List exact steps to resolve the issue
    3. Include system requirements if relevant
    4. Provide workarounds for common problems
    5. End with escalation path if needed
    
    Use clear, numbered steps and technical details.
    """,
}

account_agent_config = {
    **base_agent_config,
    "instructions": """You are an account security specialist. Follow these guidelines:
    1. Always start with "Account Support Response:"
    2. Prioritize account security and verification
    3. Provide clear steps for account recovery/changes
    4. Include security tips and warnings
    5. Set clear expectations for resolution time
    
    Maintain a serious, security-focused tone.
    """,
}

product_agent_config = {
    **base_agent_config,
    "instructions": """You are a product specialist. Follow these guidelines:
    1. Always start with "Product Support Response:"
    2. Focus on feature education and best practices
    3. Include specific examples of usage
    4. Link to relevant documentation sections
    5. Suggest related features that might help
    
    Be educational and encouraging in tone.
    """,
}

specialized_agents = {
    "billing": Agent(client, **billing_agent_config),
    "technical": Agent(client, **technical_agent_config),
    "account": Agent(client, **account_agent_config),
    "product": Agent(client, **product_agent_config),
}

# 2. Define a routing agent
class OutputSchema(BaseModel):
    reasoning: str
    support_team: str

routing_agent_config = {
    **base_agent_config,
    "instructions": f"""You are a routing agent. Analyze the user's input and select the most appropriate support team from these options: 

    {list(specialized_agents.keys())}

    Return the name of the support team in JSON format.

    First explain your reasoning, then provide your selection in this JSON format: 
    {{
        "reasoning": "<your explanation>",
        "support_team": "<support team name>"
    }}

    Note the support team name can only be one of the following: {specialized_agents.keys()}
    """,
    "response_format": {
        "type": "json_schema",
        "json_schema": OutputSchema.model_json_schema()
    }
}

routing_agent = Agent(client, **routing_agent_config)

# 3. Create a session for all agents
routing_agent_session_id = routing_agent.create_session(session_name=f"routing_agent_{uuid.uuid4()}")
specialized_agents_session_ids = {
    "billing": specialized_agents["billing"].create_session(session_name=f"billing_agent_{uuid.uuid4()}"),
    "technical": specialized_agents["technical"].create_session(session_name=f"technical_agent_{uuid.uuid4()}"),
    "account": specialized_agents["account"].create_session(session_name=f"account_agent_{uuid.uuid4()}"),
    "product": specialized_agents["product"].create_session(session_name=f"product_agent_{uuid.uuid4()}"),
}

# 4. Combine routing with specialized agents
def process_user_query(query):
    # Step 1: Route to the appropriate support team
    routing_response = routing_agent.create_turn(
        messages=[
            {
                "role": "user",
                "content": query,
            }
        ],
        session_id=routing_agent_session_id,
        stream=False,
    )
    try:
        routing_result = json.loads(routing_response.output_message.content)
        rich.print(f"ðŸ”€ [cyan] Routing Result: {routing_result['reasoning']} [/cyan]")
        rich.print(f"ðŸ”€ [cyan] Routing to {routing_result['support_team']}... [/cyan]")

        # Route to the appropriate support team
        return specialized_agents[routing_result["support_team"]].create_turn(
            messages=[
                {"role": "user", "content": query}
            ],
            session_id=specialized_agents_session_ids[routing_result["support_team"]],
            stream=False,
        )
    except json.JSONDecodeError:
        print("Error: Invalid JSON response from routing agent")
        return None


tickets = [
    """Subject: Can't access my account
    Message: Hi, I've been trying to log in for the past hour but keep getting an 'invalid password' error. 
    I'm sure I'm using the right password. Can you help me regain access? This is urgent as I need to 
    submit a report by end of day.
    - John""",
    
    """Subject: Unexpected charge on my card
    Message: Hello, I just noticed a charge of $49.99 on my credit card from your company, but I thought
    I was on the $29.99 plan. Can you explain this charge and adjust it if it's a mistake?
    Thanks,
    Sarah""",
    
    """Subject: How to export data?
    Message: I need to export all my project data to Excel. I've looked through the docs but can't
    figure out how to do a bulk export. Is this possible? If so, could you walk me through the steps?
    Best regards,
    Mike"""
]

for i, ticket in enumerate(tickets):
    print(f"========= Processing ticket {i+1}: =========")
    response = process_user_query(ticket)
    print(response.output_message.content)
    print("\n")



Account Support Response:

Dear John,

We take account security and accessibility very seriously. To ensure the integrity of your account, we must follow a thorough verification process. Before we can assist you with regaining access, we need to confirm your identity.

To initiate the account recovery process, please follow these steps:

1. **Verify your account information**: Please reply to this email with your full name, the email address associated with your account, and the last 4 digits of your phone number (if you have one listed on your account).
2. **Password reset**: We will send you a password reset link to the email address associated with your account. This link will allow you to create a new password. Please note that this link will only be valid for 24 hours.
3. **Security questions**: You may be prompted to answer security questions to further verify your identity.

**Important Security Note**: If you are using a public computer or network, please be cautious when acces

Billing Support Response:

I apologize for the unexpected charge on your credit card, Sarah. I understand that you were expecting to be billed $29.99, but instead, you were charged $49.99. I'm here to help you resolve this issue.

After reviewing your account, I found that the $49.99 charge is due to an upgrade to our premium plan, which was accidentally applied to your account during a recent system update. This upgrade includes additional features that are not part of the standard $29.99 plan.

To correct this, I will immediately downgrade your account back to the $29.99 plan, and I will also process a refund of $20.00, which is the difference between the two plans. You can expect to see the refund credited back to your credit card within the next 3-5 business days.

In the meantime, I will also send you a confirmation email with the updated account details and a receipt for the corrected charge. If you have any further questions or concerns, please don't hesitate to reach out to me 

Technical Support Response:

Exporting data in bulk to Excel is a feature available in our system. To achieve this, follow these steps:

1. **Login to the system**: Ensure you are logged in with the correct credentials and have the necessary permissions to access and export project data.
2. **Navigate to the Project Dashboard**: Click on the "Projects" tab and select the project for which you want to export data.
3. **Access the Data Export Tool**: In the project dashboard, click on the "Tools" menu and select "Data Export" from the dropdown list.
4. **Select Export Options**: In the Data Export tool, choose the data types you want to export (e.g., tasks, issues, users, etc.). You can select all data types or specific ones based on your requirements.
5. **Choose the Export Format**: Select "Excel (.xlsx)" as the export format from the available options.
6. **Configure Export Settings**: You can configure additional settings such as:
	* Date range: Specify a date range for the data to b

#### 1.2.2 Monitor Routing Internals

We can query the internal details about what happened within each agent (routing agent and specialized agents) by using the session id. 
- **Routing agent** processed all user's request
- **Specialized agent** gets user's request based on the routing agent's decision, we can see that `billing` agent never get any user's request. 

In [95]:
routing_agent_session = client.agents.session.retrieve(session_id=routing_agent_session_id, agent_id=routing_agent.agent_id)
print("Routing Agent Session:")
pprint(routing_agent_session.to_dict())

for specialized_agent_type, specialized_agent in specialized_agents.items():
    specialized_agent_session = client.agents.session.retrieve(session_id=specialized_agent.session_id, agent_id=specialized_agent.agent_id)
    print(f"Specialized Agent {specialized_agent_type} Session:")
    pprint(specialized_agent_session.to_dict())

Routing Agent Session:


Specialized Agent billing Session:


Specialized Agent technical Session:


Specialized Agent account Session:


Specialized Agent product Session:


### 1.3 Parallelization

**Parallelization** divides a task into multiple independent subtasks, which are processed in parallel, and have their outputs aggregated programatically. 

![](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F406bb032ca007fd1624f261af717d70e6ca86286-2401x1000.png&w=3840&q=75)

**Example: Stackholder Impact Analysis**

In [125]:
from concurrent.futures import ThreadPoolExecutor
from typing import List

worker_agent_config = {
    **base_agent_config,
    "instructions": """You are a helpful assistant that can analyze the impact of market changes on stakeholders.
    Analyze how market changes will impact this stakeholder group.
    Provide specific impacts and recommended actions.
    Format with clear sections and priorities.
    """,
}

def create_worker_task(task: str):
    worker_agent = Agent(client, **worker_agent_config)
    worker_session_id = worker_agent.create_session(session_name=f"worker_agent_{uuid.uuid4()}")
    task_response = worker_agent.create_turn(
        messages=[{"role": "user", "content": task}],
        stream=False,
        session_id=worker_session_id,
    )
    return {
        "worker_agent": worker_agent,
        "task_response": task_response.output_message.content,
    }

def parallelization_workflow(tasks: List[str]):
    if isinstance(client, LlamaStackClient):
        # NOTE: LlamaStackAsLibraryClient does not support parallel thread pool execution
        with ThreadPoolExecutor(max_workers=len(tasks)) as executor:
            futures = [executor.submit(create_worker_task, task) for task in tasks]
            results = [future.result() for future in futures]
            return results
    else:
        results = []
        for task in tasks:
            result = create_worker_task(task)
            results.append(result)
        return results

stakeholders = [
    """Customers:
    - Price sensitive
    - Want better tech
    - Environmental concerns""",
    
    """Employees:
    - Job security worries
    - Need new skills
    - Want clear direction""",
    
    """Investors:
    - Expect growth
    - Want cost control
    - Risk concerns""",
    
    """Suppliers:
    - Capacity constraints
    - Price pressures
    - Tech transitions"""
]

results = parallelization_workflow(stakeholders)
for i, result in enumerate(results):
    print(f"========= Stakeholder {i+1}: =========")
    print(result["task_response"])
    print("\n")


**Market Change Impact Analysis: Customers**

### Overview
The customer stakeholder group is a crucial segment that will be impacted by market changes. As a price-sensitive group, they are likely to be influenced by fluctuations in prices. Additionally, their desire for better technology and environmental concerns will drive their purchasing decisions.

### Specific Impacts

1. **Price Increases**: If market changes lead to price increases, customers may be deterred from making purchases, potentially leading to a decline in sales.
2. **Technological Advancements**: If competitors introduce new and improved technologies, customers may switch to alternative products or services, leading to a loss of market share.
3. **Environmental Regulations**: Changes in environmental regulations or increasing consumer awareness of environmental issues may lead to a shift in demand towards more sustainable products or services.
4. **Supply Chain Disruptions**: Market changes that affect supply chains 

#### 1.3.1 Monitor Parallelization Internals

Now, let's see how the worker agents processed the tasks. 

In [126]:
for i, result in enumerate(results):
    print(f"========= Worker Agent {i+1}: =========")
    session_response = client.agents.session.retrieve(session_id=result["worker_agent"].session_id, agent_id=result["worker_agent"].agent_id)
    pprint(session_response.to_dict())










## 2. Evaluator-Optimizer Workflow

In the evaluator-optimizer workflow, one LLM call generates a response while another provider evaluation and feedback in a loop. 

![](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F14f51e6406ccb29e695da48b17017e899a6119c7-2401x1000.png&w=3840&q=75)

**Example: Code Generation**

We'll showcase how to use the evaluator-optimizer workflow to generate a code implementation. 
- **Generator agent** generates a code implementation
- **Evaluator agent** evaluates the code implementation
- Loop until the evaluator returns "PASS"

In [110]:
class GeneratorOutputSchema(BaseModel):
    thoughts: str
    response: str

generator_agent_config = {
    **base_agent_config,
    "instructions": """Your goal is to complete the task based on <user input>. If there are feedback 
    from your previous generations, you should reflect on them to improve your solution

    Output your answer concisely in the following JSON format:
    {{
        "thoughts": "<Your understanding of the task and feedback and how you plan to improve>",
        "response": "<Your code implementation here>"
    }}
    """,
    "response_format": {
        "type": "json_schema",
        "json_schema": GeneratorOutputSchema.model_json_schema()
    }
}

class EvaluatorOutputSchema(BaseModel):
    evaluation: str
    feedback: str

evaluator_agent_config = {
    **base_agent_config,
    "instructions": """Evaluate this following code implementation for:
    1. code correctness
    2. time complexity
    3. style and best practices

    You should be evaluating only and not attemping to solve the task.
    Only output "PASS" if all criteria are met and you have no further suggestions for improvements.
    Output your evaluation concisely in the following JSON format.
    {{
        "evaluation": "<evaluation enum output>",
        "feedback": "What needs improvement and why."
    }}

    The evaluation enum output should be one of the following:
    - PASS
    - NEEDS_IMPROVEMENT
    - FAIL
    """,
    "response_format": {
        "type": "json_schema",
        "json_schema": EvaluatorOutputSchema.model_json_schema()
    }
}

generator_agent = Agent(client, **generator_agent_config)
evaluator_agent = Agent(client, **evaluator_agent_config)
generator_session_id = generator_agent.create_session(session_name=f"generator_agent_{uuid.uuid4()}")
evaluator_session_id = evaluator_agent.create_session(session_name=f"evaluator_agent_{uuid.uuid4()}")

def generator_evaluator_workflow(user_input):
    # Step 1: Generate a response
    generator_response = generator_agent.create_turn(
        messages=[
            {"role": "user", "content": user_input}
        ],
        session_id=generator_session_id,
        stream=False,
    )
    generator_result = json.loads(generator_response.output_message.content)

    # Step 2: While evaluator does not return PASS, re-generate and re-evaluate
    while True:
        # Step 2.1: Evaluate the response
        evaluator_response = evaluator_agent.create_turn(
            messages=[
                {"role": "user", "content": generator_result["response"]}
            ],
            session_id=evaluator_session_id,
            stream=False,
        )

        evaluator_result = json.loads(evaluator_response.output_message.content)

        # Step 2.2: If evaluator returns PASS, return the response
        if evaluator_result["evaluation"] == "PASS":
            return generator_result

        # Step 2.3: If evaluator returns NEEDS_IMPROVEMENT | FAIL, attach the feedback and re-generate
        generator_response = generator_agent.create_turn(
            messages=[
                {"role": "user", "content": f"{evaluator_result['feedback']}"}
            ],
            session_id=generator_session_id,
            stream=False,
        )
        generator_result = json.loads(generator_response.output_message.content)

In [113]:
coding_task = """
Implement a Stack with:
1. push(x)
2. pop()
3. getMin()
All operations should be O(1).
"""

output = generator_evaluator_workflow(coding_task)
print(output["response"])

```python
class MinStack:
    def __init__(self):
        self.stack = []
        self.min_stack = []
    
    def push(self, x: int) -> None:
        self.stack.append(x)
        if not self.min_stack or x <= self.min_stack[-1]:
            self.min_stack.append(x)
    
    def pop(self) -> None:
        if self.stack:
            if self.stack[-1] == self.min_stack[-1]:
                self.min_stack.pop()
            self.stack.pop()
    
    def getMin(self) -> int:
        if self.min_stack:
            return self.min_stack[-1]
        else:
            return None
```


### 2.1. Monitor Generator-Evaluator Internals

In addition to final output from workflow, we can also look at how the generator and evaluator agents processed the user's request. Note that the `evaluator_agent` PASSED after 1 iteration. 

In [102]:
generator_agent_session = client.agents.session.retrieve(session_id=generator_session_id, agent_id=generator_agent.agent_id)
pprint(generator_agent_session.to_dict())

evaluator_agent_session = client.agents.session.retrieve(session_id=evaluator_session_id, agent_id=evaluator_agent.agent_id)
pprint(evaluator_agent_session.to_dict())

## 3. Orchestrator-Workers Workflow

In the orchestrator-workers workflow, a central LLM dynamically breaks down tasks, delegates them to worker LLMs, and synthesizes their results.

![](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F8985fc683fae4780fb34eab1365ab78c7e51bc8e-2401x1000.png&w=3840&q=75)

**Example: Content Generation**

We'll showcase how to use the orchestrator-workers workflow to generate a content. 
- **Orchestrator agent** analyzes the user's request and breaks it down into 2-3 distinct approaches
- **Worker agents** are spawn up by the orchestrator agent to generate content based on each approach

In [103]:
from typing import List, Dict
class OrchestratorOutputSchema(BaseModel):
    analysis: str
    tasks: List[Dict[str, str]]

orchestrator_agent_config = {
    **base_agent_config,
    "instructions": """Your job is to analyize the task provided by the user andbreak it down into 2-3 distinct approaches:

    Return your response in the following JSON format:
    {{
        "analysis": "<Your understanding of the task and which variations would be valuable. Focus on how each approach serves different aspects of the task.>",
        "tasks": [
            {{
                "type": "formal",
                "description": "Write a precise, technical version that emphasizes specifications"
            }},
            {{
                "type": "conversational",
                "description": "Write an engaging, friendly version that connects with readers"
            }}
        ]
    }}
    """,
    "response_format": {
        "type": "json_schema",
        "json_schema": OrchestratorOutputSchema.model_json_schema()
    }
}

worker_agent_config = {
    **base_agent_config,
    "instructions": """You will be given a task guideline. Generate content based on the provided
    task, following the style and guideline descriptions. 

    Return your response in this format:

    Response: Your content here, maintaining the specified style and fully addressing requirements.
    """,
}


In [104]:
def orchestrator_worker_workflow(task, context):
    # single orchestrator agent
    orchestrator_agent = Agent(client, **orchestrator_agent_config)
    orchestrator_session_id = orchestrator_agent.create_session(session_name=f"orchestrator_agent_{uuid.uuid4()}")

    orchestrator_response = orchestrator_agent.create_turn(
        messages=[{"role": "user", "content": f"Your task is to {task}. Here is some context: {context}"}],
        stream=False,
        session_id=orchestrator_session_id,
    )

    orchestrator_result = json.loads(orchestrator_response.output_message.content)
    rich.print(f"[bold cyan] Orchestrator Analysis: [/bold cyan]")
    pprint(orchestrator_result)

    workers = {}
    # spawn multiple worker agents
    for task in orchestrator_result["tasks"]:
        worker_agent = Agent(client, **worker_agent_config)
        worker_session_id = worker_agent.create_session(session_name=f"worker_agent_{uuid.uuid4()}")
        workers[task["type"]] = worker_agent
    
        worker_response = worker_agent.create_turn(
            messages=[{"role": "user", "content": f"Your task is to {task['description']}."}],
            stream=False,
            session_id=worker_session_id,
        )
        rich.print(f"[bold yellow] >>> Worker {task['type']} <<< [/bold yellow]")
        rich.print(worker_response.output_message.content)
    
    return orchestrator_agent, workers

In [105]:
orchestrator_agent, workers = orchestrator_worker_workflow(
    task="Write a product description for a new eco-friendly water bottle",
    context={
        "target_audience": "environmentally conscious millennials",
        "key_features": ["plastic-free", "insulated", "lifetime warranty"]
    }
)

#### 3.2. Monitor Orchestrator-Workers Workflow's Internals

Let's see what happened with the orchestrator agent and worker agents it spawn up. 

In [91]:
orchestrator_session = client.agents.session.retrieve(session_id=orchestrator_agent.session_id, agent_id=orchestrator_agent.agent_id)
pprint(orchestrator_session.to_dict())

for worker_type, worker in workers.items():
    worker_session = client.agents.session.retrieve(session_id=worker.session_id, agent_id=worker.agent_id)
    print(f"Worker {worker_type} Session:")
    pprint(worker_session.to_dict())

Worker formal Session:


Worker conversational Session:


Worker creative Session:
