【AI Agent系列】【MetaGPT多智能体学习】3. 开发一个简单的多智能体系统,兼看MetaGPT多智能体运行机制




  • 【AI Agent系列】【MetaGPT多智能体学习】0. 环境准备 - 升级MetaGPT 0.7.2版本及遇到的坑
  • 【AI Agent系列】【MetaGPT多智能体学习】1. 再理解 AI Agent - 经典案例和热门框架综述
  • 【AI Agent系列】【MetaGPT多智能体学习】2. 重温单智能体开发 - 深入源码,理解单智能体运行框架


  • 系列笔记
  • 0. 多智能体间互通的方式 - Enviroment组件
    • 0.1 多智能体间协作方式简介
    • 0.2 Environment组件 - 深入源码
      • 0.2.1 参数介绍
      • 0.2.2 add_roles函数 - 承载角色
      • 0.2.3 publish_message函数 - 接收角色发布的消息 / 环境中的消息被角色得到
      • 0.2.4 run函数 - 运行入口
  • 1. 开发一个简单的多智能体系统
    • 1.1 多智能体需求描述
    • 1.2 代码实现
      • 1.2.1 学生智能体
      • 1.2.2 老师智能体
      • 1.2.3 创建多智能体交流的环境
      • 1.2.4 运行
      • 1.2.5 完整代码
  • 2. 总结

0. 多智能体间互通的方式 - Enviroment组件

0.1 多智能体间协作方式简介

在上次课中,我也曾写过 多智能体运行机制 的笔记(这篇文章:【AI Agent系列】【MetaGPT】【深入源码】智能体的运行周期以及多智能体间如何协作)。其中我自己通过看源码,总结出了MetaGPT多智能体间协作的方式:


现在再结合《MetaGPT多智能体》课程的教案,发现还是理解地有些浅了,我只关注了智能体在关注自己想要的信息,观察到了之后就开始行动,而忽略了其背后的基础组件 - Enviroment。

0.2 Environment组件 - 深入源码


  • 承载角色
  • 接收角色发布的消息
  • 环境中的消息被角色得到


0.2.1 参数介绍

首先来看下 Environment 的参数:

class Environment(ExtEnv):"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles"""model_config = ConfigDict(arbitrary_types_allowed=True)desc: str = Field(default="")  # 环境描述roles: dict[str, SerializeAsAny["Role"]] = Field(default_factory=dict, validate_default=True)member_addrs: Dict["Role", Set] = Field(default_factory=dict, exclude=True)history: str = ""  # For debugcontext: Context = Field(default_factory=Context, exclude=True)
  • desc:环境的描述
  • roles:字典类型,指定当前环境中的角色
  • member_addrs:字典类型,表示当前环境中的角色以及他们对应的状态
  • history:记录环境中发生的消息记录
  • context:当前环境的一些上下文信息

0.2.2 add_roles函数 - 承载角色

def add_roles(self, roles: Iterable["Role"]):"""增加一批在当前环境的角色Add a batch of characters in the current environment"""for role in roles:self.roles[role.profile] = rolefor role in roles:  # setup system message with rolesrole.set_env(self)role.context = self.context


(1)给 Environment 的 self.roles 参数赋值,上面我们已经知道它是一个字典类型,现在看来,它的 key 为 role 的 profile,值为 role 本身。

疑问: role 的 profile 默认是空(profile: str = ""),可以没有值,那如果使用者懒得写profile,这里会不会最终只有一个 Role,导致无法实现多智能体?欢迎讨论交流。

(2)给添加进来的 role 设置环境信息

Role的 set_env 函数源码如下,它又给env设置了member_addrs。有点绕?

def set_env(self, env: "Environment"):"""Set the environment in which the role works. The role can talk to the environment and can also receivemessages by observing."""self.rc.env = envif env:env.set_addresses(self, self.addresses)self.llm.system_prompt = self._get_prefix()self.set_actions(self.actions)  # reset actions to update llm and prefix

通过 add_roles 函数,将 role 和 Environment 关联起来。

0.2.3 publish_message函数 - 接收角色发布的消息 / 环境中的消息被角色得到

Environment 中的 publish_message 函数是 Role 在执行完动作之后,将自身产生的消息发布到环境中调用的接口。Role的调用方式如下:

def publish_message(self, msg):"""If the role belongs to env, then the role's messages will be broadcast to env"""if not msg:returnif not self.rc.env:# If env does not exist, do not publish the messagereturnself.rc.env.publish_message(msg)


然后看下 Environment 中源码:

def publish_message(self, message: Message, peekable: bool = True) -> bool:"""Distribute the message to the recipients.In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already plannedin RFC 113 for the entire system, the routing information in the Message is only responsible forspecifying the message recipient, without concern for where the message recipient is located. How toroute the message to the message recipient is a problem addressed by the transport framework designedin RFC 113."""logger.debug(f"publish_message: {message.dump()}")found = False# According to the routing feature plan in Chapter of RFC 113for role, addrs in self.member_addrs.items():if is_send_to(message, addrs):role.put_message(message)found = Trueif not found:logger.warning(f"Message no recipients: {message.dump()}")self.history += f"\n{message}"  # For debugreturn True

其中重点是这三行代码,检查环境中的所有Role是否订阅了该消息(或该消息是否应该发送给环境中的某个Role),如果订阅了,则调用Role的put_message函数,Role的 put_message函数的作用是将message放到本身的msg_buffer中:

for role, addrs in self.member_addrs.items():if is_send_to(message, addrs):role.put_message(message)


0.2.4 run函数 - 运行入口


async def run(self, k=1):"""处理一次所有信息的运行Process all Role runs at once"""for _ in range(k):futures = []for role in self.roles.values():future = role.run()futures.append(future)await asyncio.gather(*futures)logger.debug(f"is idle: {self.is_idle}")

k = 1, 表示处理一次消息?为什么要有这个值,难道还能处理多次?意义是什么?


Role的run里面,就是用该环境观察信息,行动,发布信息到环境。这样多个Role的运行就通过 Environment 串起来了,这些之前已经写过了,不再赘述,见:【AI Agent系列】【MetaGPT】【深入源码】智能体的运行周期以及多智能体间如何协作。

1. 开发一个简单的多智能体系统


1.1 多智能体需求描述

  • 两个智能体:学生 和 老师
  • 任务及预期流程:学生写诗 —> 老师给改进意见 —> 学生根据意见改进 —> 老师给改进意见 —> … n轮 … —> 结束

1.2 代码实现

1.2.1 学生智能体




那么它什么时候开始动作呢?通过 self._watch([UserRequirement, ReviewPoem]) 设置其观察的信息。当它观察到环境中有了 UserRequirement 或者 ReviewPoem 产生的信息之后,开始动作。UserRequirement为用户的输入信息类型。

class WritePoem(Action):name: str = "WritePoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Write a poem about the subject provided by human, Return only the content of the generated poem with NO other texts.If the teacher provides suggestions about the poem, revise the student's poem based on the suggestions and return.your poem:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass Student(Role):name: str = "xiaoming"profile: str = "Student"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([WritePoem])self._watch([UserRequirement, ReviewPoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories()  # 获取所有记忆# logger.info(msg)poem_text = await WritePoem().run(msg)logger.info(f'student : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msg

1.2.2 老师智能体





class ReviewPoem(Action):name: str = "ReviewPoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Check student-created poems about the subject provided by human and give your suggestions for revisions. You prefer poems with elegant sentences and retro style.Return only your comments with NO other texts.your comments:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass Teacher(Role):name: str = "laowang"profile: str = "Teacher"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([ReviewPoem])self._watch([WritePoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories()  # 获取所有记忆poem_text = await ReviewPoem().run(msg)logger.info(f'teacher : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msg

1.2.3 创建多智能体交流的环境

前面我们已经知道了多智能体之间的交互需要一个非常重要的组件 - Environment。

因此,创建一个供多智能体交流的环境,通过 add_roles 加入智能体。


classroom = Environment()classroom.add_roles([Student(), Teacher()])classroom.publish_message(Message(role="Human", content=topic, cause_by=UserRequirement,send_to='' or MESSAGE_ROUTE_TO_ALL),peekable=False,

1.2.4 运行


import asynciofrom metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.environment import Environmentfrom metagpt.const import MESSAGE_ROUTE_TO_ALLasync def main(topic: str, n_round=3):while n_round > 0:# self._save()n_round -= 1logger.debug(f"max {n_round=} left.")await classroom.run()return classroom.historyasyncio.run(main(topic='wirte a poem about moon'))

1.2.5 完整代码

import asynciofrom metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.environment import Environmentfrom metagpt.const import MESSAGE_ROUTE_TO_ALLclassroom = Environment()class WritePoem(Action):name: str = "WritePoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Write a poem about the subject provided by human, Return only the content of the generated poem with NO other texts.If the teacher provides suggestions about the poem, revise the student's poem based on the suggestions and return.your poem:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass ReviewPoem(Action):name: str = "ReviewPoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Check student-created poems about the subject provided by human and give your suggestions for revisions. You prefer poems with elegant sentences and retro style.Return only your comments with NO other texts.your comments:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass Student(Role):name: str = "xiaoming"profile: str = "Student"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([WritePoem])self._watch([UserRequirement, ReviewPoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories()  # 获取所有记忆# logger.info(msg)poem_text = await WritePoem().run(msg)logger.info(f'student : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msgclass Teacher(Role):name: str = "laowang"profile: str = "Teacher"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([ReviewPoem])self._watch([WritePoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories()  # 获取所有记忆poem_text = await ReviewPoem().run(msg)logger.info(f'teacher : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msgasync def main(topic: str, n_round=3):classroom.add_roles([Student(), Teacher()])classroom.publish_message(Message(role="Human", content=topic, cause_by=UserRequirement,send_to='' or MESSAGE_ROUTE_TO_ALL),peekable=False,)while n_round > 0:# self._save()n_round -= 1logger.debug(f"max {n_round=} left.")await classroom.run()return classroom.historyasyncio.run(main(topic='wirte a poem about moon'))
  • 运行结果


2. 总结


从最外围环境 classroom开始,用户输入的信息通过 publish_message 发送到所有Role中,通过Role的 put_message 放到自身的 msg_buffer中。









