# 分布式代理运行时

```{attention}
分布式代理运行时是一个实验性功能。预期 API 会有重大变化。
```

分布式代理运行时促进了跨进程边界的通信和代理生命周期管理。它由一个主机服务和至少一个工作运行时组成。

主机服务维护与所有活动工作运行时的连接,促进消息传递,并为所有直接消息(即 RPC)保持会话。工作运行时处理应用程序代码(代理)并连接到主机服务。它还向主机服务通告它们支持的代理,这样主机服务就可以将消息传递给正确的工作者。

我们可以使用 {py:class}`~autogen_core.application.WorkerAgentRuntimeHost` 启动主机服务。

In [1]:
from autogen_core.application import WorkerAgentRuntimeHost

host = WorkerAgentRuntimeHost(address="localhost:50051")
host.start() # Start a host service in the background.

上述代码在后台启动主机服务,并在端口 50051 上接受工作者连接。

在运行工作运行时之前,让我们定义我们的代理。该代理将在收到每条消息时发布一条新消息。它还会跟踪已发布的消息数量,一旦发布了 5 条消息就停止发布新消息。

In [2]:
from dataclasses import dataclass

from autogen_core.base import MessageContext
from autogen_core.components import DefaultTopicId, RoutedAgent, default_subscription, message_handler


@dataclass
class MyMessage:
 content: str


@default_subscription
class MyAgent(RoutedAgent):
 def __init__(self, name: str) -> None:
 super().__init__("My agent")
 self._name = name
 self._counter = 0

 @message_handler
 async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:
 self._counter += 1
 if self._counter > 5:
 return
 content = f"{self._name}: Hello x {self._counter}"
 print(content)
 await self.publish_message(MyMessage(content=content), DefaultTopicId())

现在我们可以设置工作代理运行时。我们使用 {py:class}`~autogen_core.application.WorkerAgentRuntime`。我们设置两个工作运行时。每个运行时托管一个代理。所有代理都发布和订阅默认主题,所以它们可以看到所有被发布的消息。

要运行代理,我们从一个worker发布消息。

In [3]:
import asyncio

from autogen_core.application import WorkerAgentRuntime

worker1 = WorkerAgentRuntime(host_address="localhost:50051")
worker1.start()
await MyAgent.register(worker1, "worker1", lambda: MyAgent("worker1"))

worker2 = WorkerAgentRuntime(host_address="localhost:50051")
worker2.start()
await MyAgent.register(worker2, "worker2", lambda: MyAgent("worker2"))

await worker2.publish_message(MyMessage(content="Hello!"), DefaultTopicId())

# Let the agents run for a while.
await asyncio.sleep(5)

worker1: Hello x 1
worker2: Hello x 1
worker2: Hello x 2
worker1: Hello x 2
worker1: Hello x 3
worker2: Hello x 3
worker2: Hello x 4
worker1: Hello x 4
worker1: Hello x 5
worker2: Hello x 5


我们可以看到每个代理恰好发布了 5 条消息。

要停止工作运行时,我们可以调用 {py:meth}`~autogen_core.application.WorkerAgentRuntime.stop`。

In [4]:
await worker1.stop()
await worker2.stop()

# To keep the worker running until a termination signal is received (e.g., SIGTERM).
# await worker1.stop_when_signal()

我们可以调用 {py:meth}`~autogen_core.application.WorkerAgentRuntimeHost.stop` 来停止主机服务。

In [5]:
await host.stop()

# To keep the host service running until a termination signal (e.g., SIGTERM)
# await host.stop_when_signal()

# 后续步骤
要查看使用分布式运行时的完整示例,请查看以下样例:

- [Distributed Workers](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/worker) 
- [Distributed Semantic Router](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/semantic_router) 
- [Distributed Group Chat](https://github.com/microsoft/autogen/tree/main/python/packages/autogen-core/samples/distributed-group-chat) 
