{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 分布式代理运行时\n", "\n", "```{attention}\n", "分布式代理运行时是一个实验性功能。预期 API 会有重大变化。\n", "```\n", "\n", "分布式代理运行时促进了跨进程边界的通信和代理生命周期管理。它由一个主机服务和至少一个工作运行时组成。\n", "\n", "主机服务维护与所有活动工作运行时的连接,促进消息传递,并为所有直接消息(即 RPC)保持会话。工作运行时处理应用程序代码(代理)并连接到主机服务。它还向主机服务通告它们支持的代理,这样主机服务就可以将消息传递给正确的工作者。\n", "\n", "我们可以使用 {py:class}`~autogen_core.application.WorkerAgentRuntimeHost` 启动主机服务。" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from autogen_core.application import WorkerAgentRuntimeHost\n", "\n", "host = WorkerAgentRuntimeHost(address=\"localhost:50051\")\n", "host.start() # Start a host service in the background." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "上述代码在后台启动主机服务,并在端口 50051 上接受工作者连接。\n", "\n", "在运行工作运行时之前,让我们定义我们的代理。该代理将在收到每条消息时发布一条新消息。它还会跟踪已发布的消息数量,一旦发布了 5 条消息就停止发布新消息。" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "from autogen_core.base import MessageContext\n", "from autogen_core.components import DefaultTopicId, RoutedAgent, default_subscription, message_handler\n", "\n", "\n", "@dataclass\n", "class MyMessage:\n", " content: str\n", "\n", "\n", "@default_subscription\n", "class MyAgent(RoutedAgent):\n", " def __init__(self, name: str) -> None:\n", " super().__init__(\"My agent\")\n", " self._name = name\n", " self._counter = 0\n", "\n", " @message_handler\n", " async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:\n", " self._counter += 1\n", " if self._counter > 5:\n", " return\n", " content = f\"{self._name}: Hello x {self._counter}\"\n", " print(content)\n", " await self.publish_message(MyMessage(content=content), DefaultTopicId())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "现在我们可以设置工作代理运行时。我们使用 {py:class}`~autogen_core.application.WorkerAgentRuntime`。我们设置两个工作运行时。每个运行时托管一个代理。所有代理都发布和订阅默认主题,所以它们可以看到所有被发布的消息。\n", "\n", "要运行代理,我们从一个worker发布消息。" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "worker1: Hello x 1\n", "worker2: Hello x 1\n", "worker2: Hello x 2\n", "worker1: Hello x 2\n", "worker1: Hello x 3\n", "worker2: Hello x 3\n", "worker2: Hello x 4\n", "worker1: Hello x 4\n", "worker1: Hello x 5\n", "worker2: Hello x 5\n" ] } ], "source": [ "import asyncio\n", "\n", "from autogen_core.application import WorkerAgentRuntime\n", "\n", "worker1 = WorkerAgentRuntime(host_address=\"localhost:50051\")\n", "worker1.start()\n", "await MyAgent.register(worker1, \"worker1\", lambda: MyAgent(\"worker1\"))\n", "\n", "worker2 = WorkerAgentRuntime(host_address=\"localhost:50051\")\n", "worker2.start()\n", "await MyAgent.register(worker2, \"worker2\", lambda: MyAgent(\"worker2\"))\n", "\n", "await worker2.publish_message(MyMessage(content=\"Hello!\"), DefaultTopicId())\n", "\n", "# Let the agents run for a while.\n", "await asyncio.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "我们可以看到每个代理恰好发布了 5 条消息。\n", "\n", "要停止工作运行时,我们可以调用 {py:meth}`~autogen_core.application.WorkerAgentRuntime.stop`。" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "await worker1.stop()\n", "await worker2.stop()\n", "\n", "# To keep the worker running until a termination signal is received (e.g., SIGTERM).\n", "# await worker1.stop_when_signal()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "我们可以调用 {py:meth}`~autogen_core.application.WorkerAgentRuntimeHost.stop` 来停止主机服务。" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "await host.stop()\n", "\n", "# To keep the host service running until a termination signal (e.g., SIGTERM)\n", "# await host.stop_when_signal()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 后续步骤\n", "要查看使用分布式运行时的完整示例,请查看以下样例:\n", "\n", "- [Distributed Workers](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/worker) \n", "- [Distributed Semantic Router](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/semantic_router) \n", "- [Distributed Group Chat](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/distributed-group-chat) \n" ] } ], "metadata": { "kernelspec": { "display_name": "agnext", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 2 }