{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 消息和通信\n", "\n", "AutoGen core 中的代理可以对消息做出反应、发送和发布消息,消息是代理之间唯一的通信方式。\n", "\n", "## 消息\n", "\n", "消息是可序列化的对象,它们可以使用以下方式定义:\n", "\n", "- Pydantic 的 {py:class}`pydantic.BaseModel` 的子类,或\n", "- 数据类\n", "\n", "例如:" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "\n", "@dataclass\n", "class TextMessage:\n", " content: str\n", " source: str\n", "\n", "\n", "@dataclass\n", "class ImageMessage:\n", " url: str\n", " source: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{note}\n", "消息纯粹是数据,不应包含任何逻辑。\n", "```\n", "\n", "## 消息处理程序\n", "\n", "当代理收到消息时,运行时将调用代理的消息处理程序({py:meth}`~autogen_core.base.Agent.on_message`),该处理程序应实现代理的消息处理逻辑。如果代理无法处理此消息,代理应该抛出 {py:class}`~autogen_core.base.exceptions.CantHandleException`。\n", "\n", "基类 {py:class}`~autogen_core.base.BaseAgent` 不提供消息处理逻辑,除非是高级用例,否则不建议直接实现{py:meth}`~autogen_core.base.Agent.on_message` 方法。\n", "\n", "开发人员应该从实现 {py:class}`~autogen_core.components.RoutedAgent` 基类开始,该基类提供内置的消息路由功能。\n", "\n", "### 按类型路由消息\n", "\n", "{py:class}`~autogen_core.components.RoutedAgent` 基类提供了一种机制,通过 {py:meth}`~autogen_core.components.message_handler` 装饰器将消息类型与消息处理程序关联起来,因此开发人员不需要实现 {py:meth}`~autogen_core.base.Agent.on_message` 方法。\n", "\n", "例如,以下类型路由代理使用不同的消息处理程序响应 `TextMessage` 和 `ImageMessage`:" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "from autogen_core.application import SingleThreadedAgentRuntime\n", "from autogen_core.base import AgentId, MessageContext\n", "from autogen_core.components import RoutedAgent, message_handler\n", "\n", "\n", "class MyAgent(RoutedAgent):\n", " @message_handler\n", " async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler\n", " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you sent me {message.url}!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "创建代理运行时并注册代理类型(参见 [Agent and Agent Runtime](agent-and-agent-runtime.ipynb)):" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "AgentType(type='my_agent')" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await MyAgent.register(runtime, \"my_agent\", lambda: MyAgent(\"My Agent\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用 `TextMessage` 和 `ImageMessage` 测试这个代理。" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello, User, you said Hello, World!!\n", "Hello, User, you sent me https://example.com/image.jpg!\n" ] } ], "source": [ "runtime.start()\n", "agent_id = AgentId(\"my_agent\", \"default\")\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"User\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"User\"), agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "当传递第一条消息时,运行时会自动创建一个带有代理 ID `AgentId(\"my_agent\", \"default\")` 的 `MyAgent` 实例。\n", "\n", "### 路由相同类型的消息\n", "\n", "在某些场景中,将相同类型的消息路由到不同的处理程序是很有用的。例如,来自不同发送者代理的消息应该被不同地处理。您可以使用 {py:meth}`~autogen_core.components.message_handler` 装饰器的 `match` 参数。\n", "\n", "`match` 参数将相同消息类型的处理程序与特定消息关联起来 —— 它是消息类型路由的次要参数。它接受一个可调用对象,该对象接受消息和 {py:class}`~autogen_core.base.MessageContext` 作为参数,并返回一个布尔值,指示消息是否应该由装饰的处理程序处理。可调用对象按处理程序的字母顺序检查。\n", "\n", "这里是一个使用 `match` 参数基于发送者代理路由消息的代理示例:" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "class RoutedBySenderAgent(RoutedAgent):\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user1\")) # type: ignore\n", " async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello from user 1 handler, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user2\")) # type: ignore\n", " async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello from user 2 handler, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user2\")) # type: ignore\n", " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you sent me {message.url}!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "上述代理使用消息的 `source` 字段来确定发送者代理。如果可用,您也可以使用 {py:class}`~autogen_core.base.MessageContext` 的 `sender` 字段通过代理 ID 来确定发送者代理。\n", "\n", "让我们用不同 `source` 值的消息来测试这个代理:" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello from user 1 handler, user1-test, you said Hello, World!!\n", "Hello from user 2 handler, user2-test, you said Hello, World!!\n", "Hello, user2-test, you sent me https://example.com/image.jpg!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await RoutedBySenderAgent.register(runtime, \"my_agent\", lambda: RoutedBySenderAgent(\"Routed by sender agent\"))\n", "runtime.start()\n", "agent_id = AgentId(\"my_agent\", \"default\")\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"user1-test\"), agent_id)\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"user2-test\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"user1-test\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"user2-test\"), agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在上面的示例中,第一个 `ImageMessage` 没有被处理,因为消息的 `source` 字段与处理程序的 `match` 条件不匹配。\n", "\n", "## 直接消息传递\n", "\n", "AutoGen core 中有两种类型的通信:\n", "\n", "- **直接消息传递**:向另一个代理发送直接消息。\n", "- **广播**:向主题发布消息。\n", "\n", "让我们首先看看直接消息传递。要向另一个代理发送直接消息,在消息处理程序中使用 {py:meth}`autogen_core.base.BaseAgent.send_message` 方法,从运行时使用 {py:meth}`autogen_core.base.AgentRuntime.send_message` 方法。等待这些方法的调用将返回接收代理的消息处理程序的返回值。当接收代理的处理程序返回 `None` 时,将返回 `None`。\n", "\n", "```{note}\n", "如果在发送者等待时被调用的代理抛出异常,异常将传播回发送者。\n", "```\n", "\n", "### 请求/响应\n", "\n", "直接消息传递可用于请求/响应场景,其中发送者期望从接收者得到响应。接收者可以通过从其消息处理程序返回值来响应消息。您可以将此视为代理之间的函数调用。\n", "\n", "例如,考虑以下代理:" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "from autogen_core.application import SingleThreadedAgentRuntime\n", "from autogen_core.base import MessageContext\n", "from autogen_core.components import RoutedAgent, message_handler\n", "\n", "\n", "@dataclass\n", "class Message:\n", " content: str\n", "\n", "\n", "class InnerAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:\n", " return Message(content=f\"Hello from inner, {message.content}\")\n", "\n", "\n", "class OuterAgent(RoutedAgent):\n", " def __init__(self, description: str, inner_agent_type: str):\n", " super().__init__(description)\n", " self.inner_agent_id = AgentId(inner_agent_type, self.id.key)\n", "\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " print(f\"Received message: {message.content}\")\n", " # Send a direct message to the inner agent and receves a response.\n", " response = await self.send_message(Message(f\"Hello from outer, {message.content}\"), self.inner_agent_id)\n", " print(f\"Received inner response: {response.content}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在收到消息时,`OuterAgent` 向 `InnerAgent` 发送一个直接消息并接收响应消息。\n", "\n", "我们可以通过向 `OuterAgent` 发送 `Message` 来测试这些代理。" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received message: Hello, World!\n", "Received inner response: Hello from inner, Hello from outer, Hello, World!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await InnerAgent.register(runtime, \"inner_agent\", lambda: InnerAgent(\"InnerAgent\"))\n", "await OuterAgent.register(runtime, \"outer_agent\", lambda: OuterAgent(\"OuterAgent\", \"inner_agent\"))\n", "runtime.start()\n", "outer_agent_id = AgentId(\"outer_agent\", \"default\")\n", "await runtime.send_message(Message(content=\"Hello, World!\"), outer_agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "两个输出都是由 `OuterAgent` 的消息处理程序产生的,但是第二个输出是基于来自 `InnerAgent` 的响应。\n", "\n", "一般来说,当发送者和接收者紧密耦合时,直接消息传递是合适的 —— 它们是一起创建的,发送者链接到接收者的特定实例。例如,代理通过向 {py:class}`~autogen_core.components.tool_agent.ToolAgent` 的实例发送直接消息来执行工具调用,并使用响应形成动作-观察循环。\n", "\n", "## 广播\n", "\n", "广播实际上是带有主题和订阅的发布/订阅模型。阅读[主题和订阅](../core-concepts/topic-and-subscription.md)以了解核心概念。\n", "\n", "直接消息传递和广播之间的关键区别是广播不能用于请求/响应场景。当代理发布消息时,它是单向的,它不能从任何其他代理接收响应,即使接收代理的处理程序返回一个值。\n", "\n", "```{note}\n", "如果对已发布的消息给出响应,它将被丢弃。\n", "```\n", "\n", "```{note}\n", "如果代理发布了它订阅的消息类型,它将不会收到它发布的消息。这是为了防止无限循环。\n", "```\n", "\n", "### 订阅和发布主题\n", "\n", "[基于类型的订阅](../core-concepts/topic-and-subscription.md#type-based-subscription)将发布到给定主题类型的主题的消息映射到给定代理类型的代理。要使继承 {py:class}`~autogen_core.components.RoutedAgent` 的代理订阅给定主题类型的主题,您可以使用 {py:meth}`~autogen_core.components.type_subscription` 类装饰器。\n", "\n", "以下示例显示了一个使用 {py:meth}`~autogen_core.components.type_subscription` 装饰器订阅 `\"default\"` 主题类型的主题的 `ReceiverAgent` 类,并打印接收到的消息。" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [], "source": [ "from autogen_core.components import RoutedAgent, message_handler, type_subscription\n", "\n", "\n", "@type_subscription(topic_type=\"default\")\n", "class ReceivingAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " print(f\"Received a message: {message.content}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "要从代理的处理程序发布消息,使用 {py:meth}`~autogen_core.base.BaseAgent.publish_message` 方法并指定一个 {py:class}`~autogen_core.base.TopicId`。这个调用仍然必须被等待以允许运行时安排消息传递给所有订阅者,但它总是返回 `None`。如果代理在处理已发布的消息时抛出异常,这将被记录但不会传播回发布代理。\n", "\n", "以下示例显示了一个 `BroadcastingAgent`,它在收到消息时向主题发布消息。" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "from autogen_core.base import TopicId\n", "\n", "\n", "class BroadcastingAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " await self.publish_message(\n", " Message(\"Publishing a message from broadcasting agent!\"),\n", " topic_id=TopicId(type=\"default\", source=self.id.key),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`BroadcastingAgent` 向类型为 `\"default\"` 的主题发布消息,并将源分配给代理实例的代理键。\n", "\n", "订阅在代理运行时注册,可以作为代理类型注册的一部分,也可以通过单独的 API 方法注册。以下是我们如何为使用 {py:meth}`~autogen_core.components.type_subscription` 装饰器的接收代理和不使用装饰器的广播代理注册 {py:class}`~autogen_core.components.TypeSubscription`。" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received a message: Hello, World! From the runtime!\n", "Received a message: Publishing a message from broadcasting agent!\n" ] } ], "source": [ "from autogen_core.components import TypeSubscription\n", "\n", "runtime = SingleThreadedAgentRuntime()\n", "\n", "# Option 1: with type_subscription decorator\n", "# The type_subscription class decorator automatically adds a TypeSubscription to\n", "# the runtime when the agent is registered.\n", "await ReceivingAgent.register(runtime, \"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", "\n", "# Option 2: with TypeSubscription\n", "await BroadcastingAgent.register(runtime, \"broadcasting_agent\", lambda: BroadcastingAgent(\"Broadcasting Agent\"))\n", "await runtime.add_subscription(TypeSubscription(topic_type=\"default\", agent_type=\"broadcasting_agent\"))\n", "\n", "# Start the runtime and publish a message.\n", "runtime.start()\n", "await runtime.publish_message(\n", " Message(\"Hello, World! From the runtime!\"), topic_id=TopicId(type=\"default\", source=\"default\")\n", ")\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "如上例所示,您也可以通过运行时的 {py:meth}`~autogen_core.base.AgentRuntime.publish_message` 方法直接向主题发布消息,而无需创建代理实例。\n", "\n", "从输出中,您可以看到接收代理收到了两条消息:一条是通过运行时发布的,另一条是由广播代理发布的。\n", "\n", "### 默认主题和订阅\n", "\n", "在上面的示例中,我们分别使用 {py:class}`~autogen_core.base.TopicId` 和 {py:class}`~autogen_core.components.TypeSubscription` 来指定主题和订阅。这对许多场景来说是合适的方式。然而,当只有一个发布范围时,即所有代理都发布和订阅所有广播消息,我们可以使用便利类 {py:class}`~autogen_core.components.DefaultTopicId` 和 {py:meth}`~autogen_core.components.default_subscription` 来简化我们的代码。\n", "\n", "{py:class}`~autogen_core.components.DefaultTopicId` 用于创建使用 `\"default\"` 作为主题类型默认值和发布代理的键作为主题源默认值的主题。{py:meth}`~autogen_core.components.default_subscription` 用于创建订阅默认主题的类型订阅。我们可以通过使用 {py:class}`~autogen_core.components.DefaultTopicId` 和 {py:meth}`~autogen_core.components.default_subscription` 来简化 `BroadcastingAgent`。" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "from autogen_core.components import DefaultTopicId, default_subscription\n", "\n", "\n", "@default_subscription\n", "class BroadcastingAgentDefaultTopic(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " # Publish a message to all agents in the same namespace.\n", " await self.publish_message(\n", " Message(\"Publishing a message from broadcasting agent!\"),\n", " topic_id=DefaultTopicId(),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "当运行时调用 {py:meth}`~autogen_core.BaseAgent.register` 来注册代理类型时,它会创建一个 {py:class}`~autogen_core.components.TypeSubscription`,其主题类型使用 `\"default\"` 作为默认值,代理类型使用在同一上下文中注册的相同代理类型。" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received a message: Hello, World! From the runtime!\n", "Received a message: Publishing a message from broadcasting agent!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await BroadcastingAgentDefaultTopic.register(\n", " runtime, \"broadcasting_agent\", lambda: BroadcastingAgentDefaultTopic(\"Broadcasting Agent\")\n", ")\n", "await ReceivingAgent.register(runtime, \"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", "runtime.start()\n", "await runtime.publish_message(Message(\"Hello, World! From the runtime!\"), topic_id=DefaultTopicId())\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{note}\n", "如果您的场景允许所有代理发布和订阅所有广播消息,请使用 {py:class}`~autogen_core.components.DefaultTopicId` 和 {py:meth}`~autogen_core.components.default_subscription` 来装饰您的代理类。\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "agnext", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 2 }