# 交接

交接是 OpenAI 在一个名为 [Swarm](https://github.com/openai/swarm) 的实验项目中引入的多代理设计模式。核心思想是让代理使用特殊的工具调用将任务委托给其他代理。

我们可以使用 AutoGen Core API 通过事件驱动代理来实现交接模式。使用 AutoGen (v0.4+) 相比 OpenAI 实现和之前版本 (v0.2) 提供了以下优势:

1. 它可以通过使用分布式代理运行时扩展到分布式环境。
2. 它提供了引入自己的代理实现的灵活性。
3. 原生异步 API 使其易于与 UI 和其他系统集成。

本笔记本演示了交接模式的简单实现。建议阅读[Topics and Subscriptions](../core-concepts/topic-and-subscription.md)以理解发布-订阅和事件驱动代理的基本概念。


## 场景

这个场景是基于 [OpenAI 示例](https://github.com/openai/openai-cookbook/blob/main/examples/Orchestrating_agents.ipynb)修改的。

考虑一个客户服务场景,客户正试图获得产品退款,或从聊天机器人购买新产品。聊天机器人是一个由三个 AI 代理和一个人类代理组成的多代理团队:

- 分诊代理,负责理解客户的请求并决定交接给哪些其他代理。
- 退款代理,负责处理退款请求。
- 销售代理,负责处理销售请求。
- 人类代理,负责处理 AI 代理无法处理的复杂请求。

在这个场景中,客户通过用户代理与聊天机器人交互。

下图显示了此场景中代理的交互拓扑。

![Handoffs](handoffs.svg)

让我们使用 AutoGen Core 来实现这个场景。首先,我们需要导入必要的模块。

In [1]:
import json
import uuid
from typing import List, Tuple

from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.base import MessageContext, TopicId
from autogen_core.components import FunctionCall, RoutedAgent, TypeSubscription, message_handler
from autogen_core.components.models import (
 AssistantMessage,
 ChatCompletionClient,
 FunctionExecutionResult,
 FunctionExecutionResultMessage,
 LLMMessage,
 SystemMessage,
 UserMessage,
)
from autogen_core.components.tools import FunctionTool, Tool
from autogen_ext.models import OpenAIChatCompletionClient
from pydantic import BaseModel

## 消息协议

首先,我们需要定义代理之间通信的消息协议。我们使用事件驱动的发布-订阅通信,因此这些消息类型将被用作事件。

- `UserLogin` 是用户登录并开始新会话时由运行时发布的消息。
- `UserTask` 是包含用户会话聊天历史的消息。当 AI 代理将任务交接给其他代理时,它也会发布 `UserTask` 消息。
- `AgentResponse` 是由 AI 代理和人类代理发布的消息,它也包含聊天历史以及供客户回复的主题类型。

In [2]:
class UserLogin(BaseModel):
 pass


class UserTask(BaseModel):
 context: List[LLMMessage]


class AgentResponse(BaseModel):
 reply_to_topic_type: str
 context: List[LLMMessage]

## AI 代理

我们从 `AIAgent` 类开始,这是多代理聊天机器人中所有 AI 代理(即分诊、销售和问题与维修代理)的类。
`AIAgent` 使用 {py:class}`~autogen_core.components.models.ChatCompletionClient` 来生成响应。
它可以直接使用常规工具或使用 `delegate_tools` 将任务委托给其他代理。
它订阅主题类型 `agent_topic_type` 以接收来自客户的消息,并通过发布到主题类型 `user_topic_type` 向客户发送消息。

在 `handle_task` 方法中,代理首先使用模型生成响应。如果响应包含交接工具调用,代理通过向工具调用结果中指定的主题发布 `UserTask` 消息将任务委托给另一个代理。如果响应是常规工具调用,代理执行该工具并再次调用模型生成下一个响应,直到响应不是工具调用。

当模型响应不是工具调用时,代理通过发布到 `user_topic_type` 向客户发送 `AgentResponse` 消息。

In [3]:
class AIAgent(RoutedAgent):
 def __init__(
 self,
 description: str,
 system_message: SystemMessage,
 model_client: ChatCompletionClient,
 tools: List[Tool],
 delegate_tools: List[Tool],
 agent_topic_type: str,
 user_topic_type: str,
 ) -> None:
 super().__init__(description)
 self._system_message = system_message
 self._model_client = model_client
 self._tools = dict([(tool.name, tool) for tool in tools])
 self._tool_schema = [tool.schema for tool in tools]
 self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
 self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
 self._agent_topic_type = agent_topic_type
 self._user_topic_type = user_topic_type

 @message_handler
 async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
 # Send the task to the LLM.
 llm_result = await self._model_client.create(
 messages=[self._system_message] + message.context,
 tools=self._tool_schema + self._delegate_tool_schema,
 cancellation_token=ctx.cancellation_token,
 )
 print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
 # Process the LLM result.
 while isinstance(llm_result.content, list) and all(isinstance(m, FunctionCall) for m in llm_result.content):
 tool_call_results: List[FunctionExecutionResult] = []
 delegate_targets: List[Tuple[str, UserTask]] = []
 # Process each function call.
 for call in llm_result.content:
 arguments = json.loads(call.arguments)
 if call.name in self._tools:
 # Execute the tool directly.
 result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
 result_as_str = self._tools[call.name].return_value_as_string(result)
 tool_call_results.append(FunctionExecutionResult(call_id=call.id, content=result_as_str))
 elif call.name in self._delegate_tools:
 # Execute the tool to get the delegate agent's topic type.
 result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
 topic_type = self._delegate_tools[call.name].return_value_as_string(result)
 # Create the context for the delegate agent, including the function call and the result.
 delegate_messages = list(message.context) + [
 AssistantMessage(content=[call], source=self.id.type),
 FunctionExecutionResultMessage(
 content=[
 FunctionExecutionResult(
 call_id=call.id, content=f"Transfered to {topic_type}. Adopt persona immediately."
 )
 ]
 ),
 ]
 delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
 else:
 raise ValueError(f"Unknown tool: {call.name}")
 if len(delegate_targets) > 0:
 # Delegate the task to other agents by publishing messages to the corresponding topics.
 for topic_type, task in delegate_targets:
 print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True)
 await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key))
 if len(tool_call_results) > 0:
 print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
 # Make another LLM call with the results.
 message.context.extend(
 [
 AssistantMessage(content=llm_result.content, source=self.id.type),
 FunctionExecutionResultMessage(content=tool_call_results),
 ]
 )
 llm_result = await self._model_client.create(
 messages=[self._system_message] + message.context,
 tools=self._tool_schema + self._delegate_tool_schema,
 cancellation_token=ctx.cancellation_token,
 )
 print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
 else:
 # The task has been delegated, so we are done.
 return
 # The task has been completed, publish the final result.
 assert isinstance(llm_result.content, str)
 message.context.append(AssistantMessage(content=llm_result.content, source=self.id.type))
 await self.publish_message(
 AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
 topic_id=TopicId(self._user_topic_type, source=self.id.key),
 )

## 人类代理

`HumanAgent` 类是聊天机器人中人类的代理。它用于处理 AI 代理无法处理的请求。`HumanAgent` 订阅主题类型 `agent_topic_type` 以接收消息,并发布到主题类型 `user_topic_type` 以向客户发送消息。

在这个实现中,`HumanAgent` 只是简单地使用控制台来获取您的输入。在实际应用中,您可以按如下方式改进这个设计:

* 在 `handle_user_task` 方法中,通过 Teams 或 Slack 等聊天应用程序发送通知。
* 聊天应用程序通过运行时将人类的响应发布到 `agent_topic_type` 指定的主题
* 创建另一个消息处理程序来处理人类的响应并将其发送回客户。

In [4]:
class HumanAgent(RoutedAgent):
 def __init__(self, description: str, agent_topic_type: str, user_topic_type: str) -> None:
 super().__init__(description)
 self._agent_topic_type = agent_topic_type
 self._user_topic_type = user_topic_type

 @message_handler
 async def handle_user_task(self, message: UserTask, ctx: MessageContext) -> None:
 human_input = input("Human agent input: ")
 print(f"{'-'*80}\n{self.id.type}:\n{human_input}", flush=True)
 message.context.append(AssistantMessage(content=human_input, source=self.id.type))
 await self.publish_message(
 AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
 topic_id=TopicId(self._user_topic_type, source=self.id.key),
 )

## 用户代理

`UserAgent` 类是与聊天机器人对话的客户的代理。它处理两种消息类型:`UserLogin` 和 `AgentResponse`。当 `UserAgent` 收到 `UserLogin` 消息时,它会与聊天机器人开始新会话,并向订阅主题类型 `agent_topic_type` 的 AI 代理发布 `UserTask` 消息。当 `UserAgent` 收到 `AgentResponse` 消息时,它会向用户显示来自聊天机器人的响应。

在这个实现中,`UserAgent` 使用控制台来获取您的输入。在实际应用中,您可以使用上述 `HumanAgent` 部分描述的相同思路来改进人机交互。

In [33]:
class UserAgent(RoutedAgent):
 def __init__(self, description: str, user_topic_type: str, agent_topic_type: str) -> None:
 super().__init__(description)
 self._user_topic_type = user_topic_type
 self._agent_topic_type = agent_topic_type

 @message_handler
 async def handle_user_login(self, message: UserLogin, ctx: MessageContext) -> None:
 print(f"{'-'*80}\nUser login, session ID: {self.id.key}.", flush=True)
 # Get the user's initial input after login.
 user_input = input("User: ")
 print(f"{'-'*80}\n{self.id.type}:\n{user_input}")
 await self.publish_message(
 UserTask(context=[UserMessage(content=user_input, source="User")]),
 topic_id=TopicId(self._agent_topic_type, source=self.id.key),
 )

 @message_handler
 async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None:
 # Get the user's input after receiving a response from an agent.
 user_input = input("User (type 'exit' to close the session): ")
 print(f"{'-'*80}\n{self.id.type}:\n{user_input}", flush=True)
 if user_input.strip().lower() == "exit":
 print(f"{'-'*80}\nUser session ended, session ID: {self.id.key}.")
 return
 message.context.append(UserMessage(content=user_input, source="User"))
 await self.publish_message(
 UserTask(context=message.context), topic_id=TopicId(message.reply_to_topic_type, source=self.id.key)
 )

## AI 代理的工具

如果 AI 代理不需要将任务交接给其他代理,它们可以使用常规工具来完成任务。我们使用简单的函数定义工具,并使用 {py:class}`~autogen_core.components.tools.FunctionTool` 包装器创建工具。

In [5]:
def execute_order(product: str, price: int) -> str:
 print("\n\n=== Order Summary ===")
 print(f"Product: {product}")
 print(f"Price: ${price}")
 print("=================\n")
 confirm = input("Confirm order? y/n: ").strip().lower()
 if confirm == "y":
 print("Order execution successful!")
 return "Success"
 else:
 print("Order cancelled!")
 return "User cancelled order."


def look_up_item(search_query: str) -> str:
 item_id = "item_132612938"
 print("Found item:", item_id)
 return item_id


def execute_refund(item_id: str, reason: str = "not provided") -> str:
 print("\n\n=== Refund Summary ===")
 print(f"Item ID: {item_id}")
 print(f"Reason: {reason}")
 print("=================\n")
 print("Refund execution successful!")
 return "success"


execute_order_tool = FunctionTool(execute_order, description="Price should be in USD.")
look_up_item_tool = FunctionTool(
 look_up_item, description="Use to find item ID.\nSearch query can be a description or keywords."
)
execute_refund_tool = FunctionTool(execute_refund, description="")

## 代理的主题类型

我们定义每个代理将订阅的主题类型。在[Topics and Subscriptions](../core-concepts/topic-and-subscription.md)中了解更多关于主题类型的信息。

In [35]:
sales_agent_topic_type = "SalesAgent"
issues_and_repairs_agent_topic_type = "IssuesAndRepairsAgent"
triage_agent_topic_type = "TriageAgent"
human_agent_topic_type = "HumanAgent"
user_topic_type = "User"

## AI 代理的委托工具

除了常规工具外,AI 代理还可以使用称为委托工具的特殊工具将任务委托给其他代理。委托工具的概念仅在此设计模式中使用,委托工具也被定义为简单的函数。我们在这个设计模式中区分委托工具和常规工具,因为当 AI 代理调用委托工具时,我们将任务转移给另一个代理,而不是继续使用同一代理中的模型生成响应。

In [36]:
def transfer_to_sales_agent() -> str:
 return sales_agent_topic_type


def transfer_to_issues_and_repairs() -> str:
 return issues_and_repairs_agent_topic_type


def transfer_back_to_triage() -> str:
 return triage_agent_topic_type


def escalate_to_human() -> str:
 return human_agent_topic_type


transfer_to_sales_agent_tool = FunctionTool(
 transfer_to_sales_agent, description="Use for anything sales or buying related."
)
transfer_to_issues_and_repairs_tool = FunctionTool(
 transfer_to_issues_and_repairs, description="Use for issues, repairs, or refunds."
)
transfer_back_to_triage_tool = FunctionTool(
 transfer_back_to_triage,
 description="Call this if the user brings up a topic outside of your purview,\nincluding escalating to human.",
)
escalate_to_human_tool = FunctionTool(escalate_to_human, description="Only call this if explicitly asked to.")

## 创建团队

我们已经定义了 AI 代理、人类代理、用户代理、工具和主题类型。现在我们可以创建代理团队了。

对于 AI 代理,我们使用 {py:class}~autogen_ext.models.OpenAIChatCompletionClient` 和 `gpt-4o-mini` 模型。

创建代理运行时后,我们通过提供代理类型和创建代理实例的工厂方法来注册每个代理。运行时负责管理代理生命周期,所以我们不需要自己实例化代理。在[Agent Runtime Environments](../core-concepts/architecture.md)中了解更多关于代理运行时的信息,在[Agent Identity and Lifecycle](../core-concepts/agent-identity-and-lifecycle.md)中了解更多关于代理生命周期的信息。

在下面的代码中,您可以看到我们使用 `AIAgent` 类来定义分诊、销售和问题与维修代理。我们为每个代理添加了常规工具和委托工具。我们还为每个代理添加了主题类型的订阅。

In [37]:
runtime = SingleThreadedAgentRuntime()

model_client = OpenAIChatCompletionClient(
 model="gpt-4o-mini",
 api_key="YOUR_API_KEY",
)

# Register the triage agent.
triage_agent_type = await AIAgent.register(
 runtime,
 type=triage_agent_topic_type, # Using the topic type as the agent type.
 factory=lambda: AIAgent(
 description="A triage agent.",
 system_message=SystemMessage(
 content="You are a customer service bot for ACME Inc. "
 "Introduce yourself. Always be very brief. "
 "Gather information to direct the customer to the right department. "
 "But make your questions subtle and natural."
 ),
 model_client=model_client,
 tools=[],
 delegate_tools=[
 transfer_to_issues_and_repairs_tool,
 transfer_to_sales_agent_tool,
 escalate_to_human_tool,
 ],
 agent_topic_type=triage_agent_topic_type,
 user_topic_type=user_topic_type,
 ),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=triage_agent_topic_type, agent_type=triage_agent_type.type))

# Register the sales agent.
sales_agent_type = await AIAgent.register(
 runtime,
 type=sales_agent_topic_type, # Using the topic type as the agent type.
 factory=lambda: AIAgent(
 description="A sales agent.",
 system_message=SystemMessage(
 content="You are a sales agent for ACME Inc."
 "Always answer in a sentence or less."
 "Follow the following routine with the user:"
 "1. Ask them about any problems in their life related to catching roadrunners.\n"
 "2. Casually mention one of ACME's crazy made-up products can help.\n"
 " - Don't mention price.\n"
 "3. Once the user is bought in, drop a ridiculous price.\n"
 "4. Only after everything, and if the user says yes, "
 "tell them a crazy caveat and execute their order.\n"
 ""
 ),
 model_client=model_client,
 tools=[execute_order_tool],
 delegate_tools=[transfer_back_to_triage_tool],
 agent_topic_type=sales_agent_topic_type,
 user_topic_type=user_topic_type,
 ),
)
# Add subscriptions for the sales agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=sales_agent_topic_type, agent_type=sales_agent_type.type))

# Register the issues and repairs agent.
issues_and_repairs_agent_type = await AIAgent.register(
 runtime,
 type=issues_and_repairs_agent_topic_type, # Using the topic type as the agent type.
 factory=lambda: AIAgent(
 description="An issues and repairs agent.",
 system_message=SystemMessage(
 content="You are a customer support agent for ACME Inc."
 "Always answer in a sentence or less."
 "Follow the following routine with the user:"
 "1. First, ask probing questions and understand the user's problem deeper.\n"
 " - unless the user has already provided a reason.\n"
 "2. Propose a fix (make one up).\n"
 "3. ONLY if not satesfied, offer a refund.\n"
 "4. If accepted, search for the ID and then execute refund."
 ),
 model_client=model_client,
 tools=[
 execute_refund_tool,
 look_up_item_tool,
 ],
 delegate_tools=[transfer_back_to_triage_tool],
 agent_topic_type=issues_and_repairs_agent_topic_type,
 user_topic_type=user_topic_type,
 ),
)
# Add subscriptions for the issues and repairs agent: it will receive messages published to its own topic only.
await runtime.add_subscription(
 TypeSubscription(topic_type=issues_and_repairs_agent_topic_type, agent_type=issues_and_repairs_agent_type.type)
)

# Register the human agent.
human_agent_type = await HumanAgent.register(
 runtime,
 type=human_agent_topic_type, # Using the topic type as the agent type.
 factory=lambda: HumanAgent(
 description="A human agent.",
 agent_topic_type=human_agent_topic_type,
 user_topic_type=user_topic_type,
 ),
)
# Add subscriptions for the human agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=human_agent_topic_type, agent_type=human_agent_type.type))

# Register the user agent.
user_agent_type = await UserAgent.register(
 runtime,
 type=user_topic_type,
 factory=lambda: UserAgent(
 description="A user agent.",
 user_topic_type=user_topic_type,
 agent_topic_type=triage_agent_topic_type, # Start with the triage agent.
 ),
)
# Add subscriptions for the user agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=user_topic_type, agent_type=user_agent_type.type))

## 运行团队

最后,我们可以启动运行时并通过向运行时发布 `UserLogin` 消息来模拟用户会话。消息被发布到主题 ID,其类型设置为 `user_topic_type`,来源设置为唯一的 `session_id`。这个 `session_id` 将用于创建此用户会话中的所有主题 ID,并且还将用于创建此用户会话中所有代理的代理 ID。要了解更多关于主题 ID 和代理 ID 如何创建的信息,请阅读[Agent Identity and Lifecycle](../core-concepts/agent-identity-and-lifecycle.md)和[Topics and Subscriptions](../core-concepts/topic-and-subscription.md)。

In [38]:
# Start the runtime.
runtime.start()

# Create a new session for the user.
session_id = str(uuid.uuid4())
await runtime.publish_message(UserLogin(), topic_id=TopicId(user_topic_type, source=session_id))

# Run until completion.
await runtime.stop_when_idle()

--------------------------------------------------------------------------------
User login, session ID: 7a568cf5-13e7-4e81-8616-8265a01b3f2b.
--------------------------------------------------------------------------------
User:
I want a refund
--------------------------------------------------------------------------------
TriageAgent:
I can help with that! Could I ask what item you're seeking a refund for?
--------------------------------------------------------------------------------
User:
A pair of shoes I bought
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_qPx1DXDL2NLcHs8QNo47egsJ', arguments='{}', name='transfer_to_issues_and_repairs')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to IssuesAndRepairsAgent
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I see you're looking for a refund on a pa

## 后续步骤

本笔记本演示了如何使用 AutoGen Core 实现交接模式。您可以通过添加更多代理和工具来继续改进这个设计,或者为用户代理和人类代理创建更好的用户界面。
