2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(下):多智能体开发

传送门:

  • 《2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(上):MetaGPT安装、单智能体开发》
  • 《2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(中):订阅智能体OSS实现》

文章目录

    • 一、 快速入门:软件公司
      • 1.1 定义动作
      • 1.2 定义角色
      • 1.3 定义团队
    • 二、多智能体交互机制
      • 2.1 Environment
      • 2.2 role
      • 2.3 Team
    • 三、示例
      • 3.1 智能写诗
      • 3.2 软件公司(略)
      • 3.3 智能体辩论
        • 3.3.1 定义动作
        • 3.3.2 定义角色
        • 3.3.3 创建团队并添加角色
    • 四、作业
      • 4.1 基础作业
      • 4.2 进阶作业:重写babyagi
        • 4.2.1 babyagi简介
        • 4.2.2 babyagi问答

  • 学习资料:项目地址——hugging-multi-agent、在线阅读、MetaGPT项目、MetaGPT中文文档
  • 优秀作业链接:《MetaGPT环境搭建和尝试》、Destory的《MetaGPT教程笔记》、乐正萌的《MetaGPT教程Task3 》、 GISer Liu的《基于MetaGPT构建单智能体》、《MetaGPT课程3作业》

一、 快速入门:软件公司

参考官方文档《多智能体入门》、软件公司完整代码:build_customized_multi_agents.py

  一些复杂的任务通常需要协作和团队合作,在MetaGPT框架下,用户可以通过少量代码实现多智能体交互,共同完成更复杂的任务。下面以软件公司为示例,开发一个智能体团队。

  1. 定义三个角色:一位编码人员(写代码)、一名测试人员(测试代码)和一名审阅人员(评价测试结果)。
  2. 基于标准作业程序(SOP)确保每个角色遵守它。通过使每个角色观察上游的相应输出结果,并为下游发布自己的输出结果,可以实现这一点。
  3. 初始化所有角色,创建一个带有环境的智能体团队,并使它们之间能够进行交互。

  在《2024.5组队学习——MetaGPT智能体理论与实战(上)》第三章单智能体入门中,我们实现了具有SimpleWriteCode 动作的SimpleCoder 角色,用于接收用户的指令并编写主要代码。我们可以以相同的方式定义其它两个角色SimpleTesterSimpleReviewer及其对应的动作SimpleWriteTestSimpleWriteReview

1.1 定义动作

class SimpleWriteCode(Action):PROMPT_TEMPLATE: str = """Write a python function that can {instruction}.Return ```python your_code_here ```with NO other texts,your code:"""name: str = "SimpleWriteCode"async def run(self, instruction: str):prompt = self.PROMPT_TEMPLATE.format(instruction=instruction)rsp = await self._aask(prompt)code_text = parse_code(rsp)return code_text
class SimpleWriteTest(Action):PROMPT_TEMPLATE: str = """Context: {context}Write {k} unit tests using pytest for the given function, assuming you have imported it.Return ```python your_code_here ```with NO other texts,your code:"""name: str = "SimpleWriteTest"async def run(self, context: str, k: int = 3):prompt = self.PROMPT_TEMPLATE.format(context=context, k=k)rsp = await self._aask(prompt)code_text = parse_code(rsp)return code_text
class SimpleWriteReview(Action):PROMPT_TEMPLATE: str = """Context: {context}Review the test cases and provide one critical comments:"""name: str = "SimpleWriteReview"async def run(self, context: str):prompt = self.PROMPT_TEMPLATE.format(context=context)rsp = await self._aask(prompt)return rsp

1.2 定义角色

接下来,我们需要定义三个具有各自动作的Role

  • SimpleCoder 具有 SimpleWriteCode 动作,接收用户的指令并编写主要代码
  • SimpleTester 具有 SimpleWriteTest 动作,从 SimpleWriteCode 的输出中获取主代码并为其提供测试套件
  • SimpleReviewer 具有 SimpleWriteReview 动作,审查来自 SimpleWriteTest 输出的测试用例,并检查其覆盖范围和质量

整个软件公司的运作机制如下:
在这里插入图片描述
如上图的右侧部分所示,Role的机制可以用四步来表示:

  1. _observe:将从Environment中获取_observe Message。如果有一个Role _watch 的特定 Action 引起的 Message,那么这是一个有效的观察,触发Role的后续思考和操作。
  2. _thinkRole将选择其能力范围内的一个 Action 并将其设置为要做的事情。
  3. _act :执行,即运行 Action 并获取输出,并将输出封装在 Message
  4. _publish:发布 publish_messageEnvironment,由此完成了一个完整的智能体运行。

  在每个步骤中,无论是 _observe_think 还是 _actRole都将与其 Memory 交互,通过添加或检索来实现。此外,MetaGPT提供了 react 过程的不同模式。这些部分的详细内容,请参阅使用记忆 和 思考与行动,参考或者Role代码。

  如上图左侧部分所示,每个 Role 都需要遵守SOP(观察上游的相应输出结果,并为下游发布自己的输出),比如虚线框中, SimpleTester 同时 _watch SimpleWriteCodeSimpleWriteReview,则可以扩展 SOP

  接下来,我们将详细讨论如何根据 SOP 来定义Role。首先对于SimpleCoder,我们需要两件事:

  1. self._watch([UserRequirement]): 获取来自用户或其他智能体的重要上游消息(这里是UserRequirement引起的Message
  2. 使用 set_actionsRole配备适当的 Action,这与设置单智能体相同
class SimpleCoder(Role):name: str = "Alice"profile: str = "SimpleCoder"def __init__(self, **kwargs):super().__init__(**kwargs)self._watch([UserRequirement])self.set_actions([SimpleWriteCode])

与上述相似,对于 SimpleTester,我们:

  1. 使用 set_actionsSimpleTester配备 SimpleWriteTest 动作

  2. 获取来自其他智能体的重要上游消息,这里SimpleTester将从 SimpleCoder 中获取主代码。

    一个扩展的问题:想一想如果我们使用 self._watch([SimpleWriteCode, SimpleWriteReview]) 会意味着什么,可以尝试这样做

  3. 重写 _act 函数,就像我们在智能体入门中的单智能体设置中所做的那样。在这里,我们希望SimpleTester将所有记忆用作编写测试用例的上下文,并希望有5个测试用例,因此我们需要多个输入。

class SimpleTester(Role):name: str = "Bob"profile: str = "SimpleTester"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([SimpleWriteTest])self._watch([SimpleWriteCode])# self._watch([SimpleWriteCode, SimpleWriteReview])  # feel free to try this tooasync def _act(self) -> Message:logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")todo = self.rc.todo# context = self.get_memories(k=1)[0].content # use the most recent memory as contextcontext = self.get_memories()  # use all memories as contextcode_text = await todo.run(context, k=5)  # specify argumentsmsg = Message(content=code_text, role=self.profile, cause_by=type(todo))return msg

  在这里,我们调用get_memories() 函数为SimpleTester提供完整的历史记录。通过这种方式,如果 SimpleReviewer提供反馈,SimpleTester可以参考其先前版本修改测试用例。

  下面按照相同的过程定义 SimpleReviewer

class SimpleReviewer(Role):name: str = "Charlie"profile: str = "SimpleReviewer"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([SimpleWriteReview])self._watch([SimpleWriteTest])

1.3 定义团队

  定义完三个 Role之后,我们需要初始化所有角色,设置一个 Team,并hire 它们。运行 Team,我们应该会看到它们之间的协作!

import asyncio
import typer
from metagpt.logs import logger
from metagpt.team import Team
app = typer.Typer()@app.command()
def main(idea: str = typer.Argument(..., help="write a function that calculates the product of a list"),investment: float = typer.Option(default=3.0, help="Dollar amount to invest in the AI company."),n_round: int = typer.Option(default=5, help="Number of rounds for the simulation."),
):logger.info(idea)team = Team()team.hire([SimpleCoder(),SimpleTester(),SimpleReviewer(),])team.invest(investment=investment)team.run_project(idea)await team.run(n_round=n_round)if __name__ == '__main__':app()

运行结果:
在这里插入图片描述
在这里插入图片描述

  1. SimpleCoder角色(Alice)根据用户需求编写了一个Python函数product_of_list,该函数的作用是计算列表中所有数字的乘积。

  2. SimpleTester角色(Bob)编写了一系列测试用例,用于测试product_of_list函数的正确性。测试用例包括:

    • 测试空列表的情况
    • 测试只有一个数字的列表
    • 测试全为正数的列表
    • 测试全为负数的列表
    • 测试正负数混合的列表
  3. SimpleReviewer角色(Charlie)审阅了测试用例,并发现测试用例缺少一种重要情况:列表中包含0。他指出乘积运算中如果有0,不论其他数字是什么,结果都应该是0。因此,他建议增加一个新的测试用例,用于测试列表包含0的情况。

  4. SimpleTester(Bob)根据SimpleReviewer的反馈,补充了一个新的测试用例test_product_of_list_zero_present,用于测试列表中包含0的情况。

  5. SimpleReviewer(Charlie)再次审阅测试用例,发现缺失的测试用例已经被补充

完整代码见examples/build_customized_multi_agents.py,你也可以在直接使用以下代码运行:

python3 examples/build_customized_multi_agents.py --idea "write a function that calculates the product of a list"

二、多智能体交互机制

源码:Environment、Role、Team

在这里插入图片描述

2.1 Environment

  MetaGPT提供了一个标准的环境组件Environment,来管理agent的活动与信息交流,agents 必须按照环境中的规则进行活动。正如源码所说,Environment承载一批角色,角色可以向环境发布消息,可以被其他角色观察到。

class Environment(ExtEnv):"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles"""model_config = ConfigDict(arbitrary_types_allowed=True)desc: str = Field(default="")  # 环境描述roles: dict[str, SerializeAsAny["Role"]] = Field(default_factory=dict, validate_default=True)member_addrs: Dict["Role", Set] = Field(default_factory=dict, exclude=True)history: str = ""  # For debugcontext: Context = Field(default_factory=Context, exclude=True)
  • desc :描述当前的环境信息
  • role :指定当前环境中的角色
  • member_addrs:表示当前环境中的角色以及他们对应的状态
  • history:用于记录环境中发生的消息记录

  下面是env的run方法,由此可见,当env运行时,会依次读取环境中的role信息,默认按照声明 role 的顺序依次执行 rolerun 方法。

async def run(self, k=1):"""Process all Role runs at once(处理一次所有信息的运行)       """for _ in range(k):futures = []for role in self.roles.values():future = role.run()# 将role的运行缓存至 future list 中,在后续的 gather 方法中依次调用futures.append(future)await asyncio.gather(*futures)logger.debug(f"is idle: {self.is_idle}")

其它env方法:

def add_role(self, role: Role):"""增加一个在当前环境的角色Add a role in the current environment"""self.roles[role.profile] = rolerole.set_env(self)def add_roles(self, roles: Iterable[Role]):"""增加一批在当前环境的角色Add a batch of characters in the current environment"""for role in roles:self.roles[role.profile] = rolefor role in roles:  # setup system message with rolesrole.set_env(self)def get_roles(self) -> dict[str, Role]:"""获得环境内的所有角色Process all Role runs at once"""return self.rolesdef get_role(self, name: str) -> Role:"""获得环境内的指定角色get all the environment roles"""return self.roles.get(name, None)def role_names(self) -> list[str]:return [i.name for i in self.roles.values()]

2.2 role

  在 role 的run方法中,role 首先将会根据运行时是否传入信息(if with_message:,部分行动前可能需要前置知识消息),如果有,则将信息存入 rolecontext的 msg_buffer 中。

@role_raise_decorator
async def run(self, with_message=None) -> Message | None:"""Observe, and think and act based on the results of the observation"""if with_message:msg = Noneif isinstance(with_message, str):msg = Message(content=with_message)elif isinstance(with_message, Message):msg = with_messageelif isinstance(with_message, list):msg = Message(content="\n".join(with_message))if not msg.cause_by:msg.cause_by = UserRequirementself.put_message(msg)if not await self._observe():# If there is no new information, suspend and waitlogger.debug(f"{self._setting}: no news. waiting.")returnrsp = await self.react()# Reset the next action to be taken.self.set_todo(None)# Send the response message to the Environment object to have it relay the message to the subscribers.self.publish_message(rsp)return rsp
# rc: RoleContext = Field(default_factory=RoleContext)def put_message(self, message):"""Place the message into the Role object's private message buffer."""if not message:returnself.rc.msg_buffer.push(message)

  在多智能体环境运行中,Role的每次行动将从Environment中先_observe Message_observe的目的是从message buffer等消息源获取新的消息,过滤掉不相关的消息,记录最新状态,以供后续处理。

async def _observe(self, ignore_memory=False) -> int:"""Prepare new messages for processing from the message buffer and other sources."""# 从消息缓冲区(msg buffer)读取未处理的消息news = []if self.recovered:news = [self.latest_observed_msg] if self.latest_observed_msg else []if not news:news = self.rc.msg_buffer.pop_all()# 在内存在存储已读的 messages,防止重复处理old_messages = [] if ignore_memory else self.rc.memory.get()self.rc.memory.add_batch(news)# 过滤掉不感兴趣的messagesself.rc.news = [n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages]self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg# Design Rules:# If you need to further categorize Message objects, you can do so using the Message.set_meta function.# msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]if news_text:logger.debug(f"{self._setting} observed: {news_text}")return len(self.rc.news)
  1. 如果是从故障恢复状态 (self.recovered),则从 latest_observed_msg 获取上次观测到的最新消息(没有就是空)。
  2. 将新读取到的消息添加到内存 (self.rc.memory) 中,以防止重复处理。
  3. 过滤掉不感兴趣的消息,即消息的 cause_by 不在 self.rc.watch 列表中,且消息的 send_to 也不包含 self.name。同时也过滤掉已经处理过的消息 (old_messages)。
  4. 记录最新观测到的消息 self.latest_observed_msg
  5. 打印 debug 日志,显示观测到的新消息摘要。
  6. 最终返回新消息的数量 len(self.rc.news)

该函数还强调了一些设计规则:

  • 可以使用 Message.set_meta 函数对消息进一步分类(如果需要的话)。
  • msg_buffer 是接收缓冲区,应避免在其中添加或操作消息。

观察完毕后,采取行动:

# role.pyasync def run(self, with_message=None) -> Message | None:......rsp = await self.react()# Reset the next action to be taken.self.set_todo(None)# 将消息发送到Environment对象,让它将消息中继给所有订阅者self.publish_message(rsp)return rsp...
...
def publish_message(self, msg):"""If the role belongs to env, then the role's messages will be broadcast to env"""if not msg:returnif not self.rc.env:# 如果env不存在,就不发布messagereturnself.rc.env.publish_message(msg) 

  env.publish_message方法会遍历环境中所有角色,检查它们是否订阅了这条消息。如果订阅,则调用put_message方法将这条消息存入该角色的 msg_buffer中。

2.3 Team

  Team是基于Environment之上二次封装的结果,它提供了比Environment更多的组件。比如investment用于管理团队成本(tokens花费),idea给出团队目标。

class Team(BaseModel):"""Team: Possesses one or more roles (agents), SOP (Standard Operating Procedures), and a env for instant messaging,dedicated to env any multi-agent activity, such as collaboratively writing executable code."""model_config = ConfigDict(arbitrary_types_allowed=True)env: Optional[Environment] = None				# 一个可选的Environment对象,用于团队即时通讯investment: float = Field(default=10.0)			# 投资金额,默认为10.0idea: str = Field(default="")					# 团队的想法或主题,默认为空字符串def __init__(self, context: Context = None, **data: Any):super(Team, self).__init__(**data)ctx = context or Context()if not self.env:self.env = Environment(context=ctx)else:self.env.context = ctx  # The `env` object is allocated by deserializationif "roles" in data:self.hire(data["roles"])if "env_desc" in data:self.env.desc = data["env_desc"]

__init__函数中:

  • 调用父类BaseModel的构造函数,将传入的数据(**data)作为属性初始化
  • 检查并初始化env属性
  • 如果传入的数据中包含roles,它会调用hire方法添加这些角色。如果传入的数据中包含env_desc,它会将描述设置为环境的描述。

  另外hire方法用于添加员工(roles),invest方法控制预算,_check_balance检查是否超过预算

def hire(self, roles: list[Role]):"""Hire roles to cooperate"""self.env.add_roles(roles)def invest(self, investment: float):"""Invest company. raise NoMoneyException when exceed max_budget."""self.investment = investmentself.cost_manager.max_budget = investmentlogger.info(f"Investment: ${investment}.")

  在Team运行时,如果有idea,则先发布用户需求,然后重复n_round轮,每轮循环调用 self.env.run() 来运行env,最后返回env中角色的历史对话。

@serialize_decorator
async def run(self, n_round=3, idea="", send_to="", auto_archive=True):"""Run company until target round or no money"""if idea:self.run_project(idea=idea, send_to=send_to)while n_round > 0:n_round -= 1self._check_balance()await self.env.run()logger.debug(f"max {n_round=} left.")self.env.archive(auto_archive)return self.env.history
def run_project(self, idea, send_to: str = ""):"""Run a project from publishing user requirement."""self.idea = idea# Human requirement.self.env.publish_message(Message(role="Human", content=idea, cause_by=UserRequirement, send_to=send_to or MESSAGE_ROUTE_TO_ALL),peekable=False,)

_check_balance 函数用于检查公司的预算是否足够,超过则引发 NoMoneyException 异常。

def _check_balance(self):if self.cost_manager.total_cost >= self.cost_manager.max_budget:raise NoMoneyException(self.cost_manager.total_cost, f"Insufficient funds: {self.cost_manager.max_budget}")

另外Team还包含一些其它方法:

  • serialize:用于将Team对象序列化为JSON文件
  • deserialize:用于从JSON文件中反序列化Team对象

  尽管 Team 只是在 Env上的简单封装,但它向我们展示了,我们该如何向多智能体系统发布启动消息以及引入可能的人类反馈,进而开发属于自己的智能体团队。

三、示例

3.1 智能写诗

  下面以智能写诗为设定场景。我们需要一位student,根据我们要求的主题来写诗的;还需要一位精通诗文的teacher评价诗文,给出修改意见。之后学生根据此意见修改作品,直至循环结束。

  1. 设定环境
import asynciofrom metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.environment import Environmentfrom metagpt.const import MESSAGE_ROUTE_TO_ALL# 声明一个名为classroom的Env,所有的role都在classroom里活动
classroom = Environment()
  1. 编写WritePoemReviewPoem
class WritePoem(Action):name: str = "WritePoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Write a poem about the subject provided by human, Return only the content of the generated poem with NO other texts.If the teacher provides suggestions about the poem, revise the student's poem based on the suggestions and return.your poem:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass ReviewPoem(Action):name: str = "ReviewPoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Check student-created poems about the subject provided by human and give your suggestions for revisions. You prefer poems with elegant sentences and retro style.Return only your comments with NO other texts.your comments:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rsp
  1. 定义StudentTeacher
    与单智能体不同的是,定义每个角色时,需要声明其关注的动作(self._watch),只有当关注的动作发生时,角色才开始行动。
class Student(Role):name: str = "xiaoming"profile: str = "Student"def __init__(self, **kwargs):super().__init__(**kwargs)self._init_actions([WritePoem])self._watch([UserRequirement, ReviewPoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories()  # 获取所有记忆# logger.info(msg)poem_text = await WritePoem().run(msg)logger.info(f'student : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msgclass Teacher(Role):name: str = "laowang"profile: str = "Teacher"def __init__(self, **kwargs):super().__init__(**kwargs)self._init_actions([ReviewPoem])self._watch([WritePoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories()  # 获取所有记忆poem_text = await ReviewPoem().run(msg)logger.info(f'teacher : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msg
  1. 定义主函数
    将主题发布在classroom中并运行,env就开始工作了。你可以修改n_round数,直到达到你想要的效果。
async def main(topic: str, n_round=3):classroom.add_roles([Student(), Teacher()])classroom.publish_message(Message(role="Human", content=topic, cause_by=UserRequirement,send_to='' or MESSAGE_ROUTE_TO_ALL),peekable=False,)while n_round > 0:# self._save()n_round -= 1logger.debug(f"max {n_round=} left.")await classroom.run()return classroom.historyasyncio.run(main(topic='wirte a poem about moon'))

3.2 软件公司(略)

3.3 智能体辩论

  下面这个场景,模拟两位辩手互相辩论,这是一个展示如何设计多个智能体并促进它们之间的互动的理想例子。总体上,我们需要3个步骤来实现:

  1. 定义一个具有发言行为的辩手角色,我们建议参考智能体入门
  2. 处理辩手之间的通信,也就是让双方互听对方的发言
  3. 初始化两个辩手实例,创建一个带有环境的团队,并使它们能够相互交互。
3.3.1 定义动作
class SpeakAloud(Action):"""动作:在辩论中大声说话(争吵)"""PROMPT_TEMPLATE = """## BACKGROUNDSuppose you are {name}, you are in a debate with {opponent_name}.## DEBATE HISTORYPrevious rounds:{context}## YOUR TURNNow it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:"""def __init__(self, name="SpeakAloud", context=None, llm=None):super().__init__(name, context, llm)async def run(self, context: str, name: str, opponent_name: str):prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)rsp = await self._aask(prompt)return rsp
3.3.2 定义角色

  我们将定义一个通用的 Role,称为 Debator。我们设定其动作为SpeakAloud,还使用 _watch 监视了 SpeakAloudUserRequirement,因为我们希望每个辩手关注来自对手的 SpeakAloud 消息,以及来自用户的 UserRequirement(人类指令)。

class Debator(Role):def __init__(self,name: str,profile: str,opponent_name: str,**kwargs,):super().__init__(name, profile, **kwargs)self.set_actions([SpeakAloud])self._watch([UserRequirement, SpeakAloud])self.name = nameself.opponent_name = opponent_name

  接下来,我们使每个辩手听取对手的论点,通过重写 _observe 函数可以完成这一点。这点很重要,因为在环境中将会有来自双方 “SpeakAloud 消息”(由 SpeakAloud 触发的 Message)。 我们不希望一方处理自己上一轮的 “SpeakAloud 消息”,而是处理来自对方的消息。

async def _observe(self) -> int:await super()._observe()# accept messages sent (from opponent) to self, disregard own messages from the last roundself.rc.news = [msg for msg in self.rc.news if msg.send_to == self.name]return len(self.rc.news)

  最后,我们使每个辩手能够向对手发送反驳的论点。在这里,我们从消息历史中构建一个上下文,使 Debator 运行他拥有的 SpeakAloud 动作,并使用反驳论点内容创建一个新的 Message。请注意,我们定义每个 Debator 将把 Message 发送给他的对手。

async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todo # 一个 SpeakAloud 的实例memories = self.get_memories()context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)msg = Message(content=rsp,role=self.profile,cause_by=todo,sent_from=self.name,send_to=self.opponent_name,)return msg

  cause_by,sent_from,send_to分别表示产生Message的动作、角色以及要发生的角色。通过这种机制可以实现比watch更灵活的订阅机制。

3.3.3 创建团队并添加角色

  建立一个 Team 并hire两个角色。在这个例子中,我们将通过将我们的指令( UserRequirement)发送给Biden,通过run_project函数的send_to参数,指定Biden先发言。

async def debate(idea: str, investment: float = 3.0, n_round: int = 5):"""运行拜登-特朗普辩论,观看他们之间的友好对话 :) """Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")team = Team()team.hire([Biden, Trump])team.invest(investment)team.run_project(idea, send_to="Biden")  # 将辩论主题发送给拜登,让他先说话await team.run(n_round=n_round)import asyncio
import platform
import typer
from metagpt.team import Team
app = typer.Typer()@app.command()
def main(idea: str = typer.Argument(..., help="Economic Policy: Discuss strategies and plans related to taxation, employment, fiscal budgeting, and economic growth."),investment: float = typer.Option(default=3.0, help="Dollar amount to invest in the AI company."),n_round: int = typer.Option(default=5, help="Number of rounds for the simulation."),
):""":param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting"or "Trump: Climate change is a hoax":param investment: contribute a certain dollar amount to watch the debate:param n_round: maximum rounds of the debate:return:"""if platform.system() == "Windows":asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())asyncio.run(debate(idea, investment, n_round))if __name__ == '__main__':app()

以上完整代码见debate.py,也可运行以下命令:

python3 examples/debate.py --idea "Talk about how the U.S. should respond to climate change"

运行结果如下:
在这里插入图片描述

四、作业

4.1 基础作业

  基于 env 或 team 设计一个你的多智能体团队,尝试让他们完成 你画我猜文字版 。你需要要定义两个agent,一个负责接收来自用户提供的物体描述,并转告另一个agent;另一个agent将猜测用户给出的物体名称,两个agent将不断交互直到另一个给出正确的答案,只要完成基础作业就视为学习完成。

你也可以在系统之上继续扩展,比如引入一个agent来生成词语,而人类参与你画我猜的过程中

4.2 进阶作业:重写babyagi

4.2.1 babyagi简介

  衡量Agent的学习效果,关键在于能否将传统人工处理的问题SOP转换为Metaget框架下的Role和Action,并通过多智能体协作完成。如果能够做到这一点,则说明学习目标已经实现。BabyAGI的重写是一个合适的任务,因为它涉及到将人类决策过程和知识编码进智能体中,这是我们想要掌握的关键技能。

  babyagi是其作者yoheinakajima日常任务规划任务优先级的一套SOP,以下是babyagi的实现流程及代码,任务为三个agent进行协同组织。
在这里插入图片描述

  • 执行第一个任务并返回结果。
  • 从存储库中检索上下文并将其存储在向量数据库中。
  • 创建新的任务并将它们添加到待办事项列表中。
  • 对任务进行重新排序并优先级调整。
import openai
import pinecone
import time
from collections import deque
from typing import Dict, List# 设置API密钥和环境变量
OPENAI_API_KEY = ""
PINECONE_API_KEY = ""
PINECONE_ENVIRONMENT = "us-east1-gcp" #Pinecone Environment (eg. "us-east1-gcp")# 定义变量
YOUR_TABLE_NAME = "test-table"
OBJECTIVE = "Solve world hunger."  			# 解决世界饥饿问题
YOUR_FIRST_TASK = "Develop a task list."    # 制定任务清单# 打印目标
print("\033[96m\033[1m"+"\n*****OBJECTIVE*****\n"+"\033[0m\033[0m")
print(OBJECTIVE)# 配置OpenAI和Pinecone
openai.api_key = OPENAI_API_KEY
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT)# 创建Pinecone索引
table_name = YOUR_TABLE_NAME
dimension = 1536
metric = "cosine"
pod_type = "p1"# 检查在Pinecone中是否已经存在指定的索引(表名为table_name)
if table_name not in pinecone.list_indexes():pinecone.create_index(table_name, dimension=dimension, metric=metric, pod_type=pod_type)# 连接到索引
index = pinecone.Index(table_name)# 初始化任务列表:
task_list = deque([])# 添加任务
def add_task(task: Dict):task_list.append(task)# 获取文本嵌入
def get_ada_embedding(text):text = text.replace("\n", " ")return openai.Embedding.create(input=[text], model="text-embedding-ada-002")["data"][0]["embedding"]
  • pinecone.list_indexes():返回当前在Pinecone中存在的所有索引的列表。
  • pinecone.create_index(...):创建一个新的索引,名称为table_name,索引的维度为dimension(每个向量的长度),比较向量相似度的度量方法为“cosine”(余弦相似度),指定Pinecone的计算资源类型为“p1”
def task_creation_agent(objective: str, result: Dict, task_description: str, task_list: List[str]):prompt = f"You are an task creation AI that uses the result of an execution agent to create new tasks with the following objective: {objective}, The last completed task has the result: {result}. This result was based on this task description: {task_description}. These are incomplete tasks: {', '.join(task_list)}. Based on the result, create new tasks to be completed by the AI system that do not overlap with incomplete tasks. Return the tasks as an array."response = openai.Completion.create(engine="text-davinci-003",prompt=prompt,temperature=0.5,max_tokens=100,top_p=1,frequency_penalty=0,presence_penalty=0)new_tasks = response.choices[0].text.strip().split('\n')return [{"task_name": task_name} for task_name in new_tasks]def prioritization_agent(this_task_id:int):global task_list# 从全局任务列表task_list中提取所有任务的名称,生成一个任务名称列表。task_names = [t["task_name"] for t in task_list]next_task_id = int(this_task_id)+1  # 计算下一个任务的ID"""prompt用于指导OpenAI的语言模型进行任务的格式化和重新排序,提示包括:1. 当前任务列表(task_names)。2. 团队的最终目标(OBJECTIVE)。要求最终返回一个编号列表,从next_task_id开始。"""prompt = f"""You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing the following tasks: {task_names}. Consider the ultimate objective of your team:{OBJECTIVE}. Do not remove any tasks. Return the result as a numbered list, like:#. First task#. Second taskStart the task list with number {next_task_id}."""response = openai.Completion.create(engine="text-davinci-003",prompt=prompt,temperature=0.5,max_tokens=1000,top_p=1,frequency_penalty=0,presence_penalty=0)# 将生成的任务列表按行分割成单独的任务new_tasks = response.choices[0].text.strip().split('\n')task_list = deque()				# 初始化一个新的任务队列task_listfor task_string in new_tasks:   # 遍历生成的任务列表,将每个任务拆分为任务ID和任务名称,并重新添加到任务队列中task_parts = task_string.strip().split(".", 1)if len(task_parts) == 2:task_id = task_parts[0].strip()task_name = task_parts[1].strip()task_list.append({"task_id": task_id, "task_name": task_name})
  • task_creation_agent:根据执行结果生成新任务
  • prioritization_agent:对任务列表进行格式化和重新排序,然后更新全局任务队列。
def execution_agent(objective:str,task: str) -> str:#context = context_agent(index="quickstart", query="my_search_query", n=5)context=context_agent(index=YOUR_TABLE_NAME, query=objective, n=5)#print("\n*******RELEVANT CONTEXT******\n")#print(context)response = openai.Completion.create(engine="text-davinci-003",prompt=f"You are an AI who performs one task based on the following objective: {objective}. Your task: {task}\nResponse:",temperature=0.7,max_tokens=2000,top_p=1,frequency_penalty=0,presence_penalty=0)# 提取生成的文本结果,并去除前后的空白字符return response.choices[0].text.strip()def context_agent(query: str, index: str, n: int):query_embedding = get_ada_embedding(query)index = pinecone.Index(index_name=index)results = index.query(query_embedding, top_k=n,include_metadata=True)#print("***** RESULTS *****")#print(results)sorted_results = sorted(results.matches, key=lambda x: x.score, reverse=True)    return [(str(item.metadata['task'])) for item in sorted_results]
  • context_agent:从Pinecone索引中检索与给定查询最相关的上下文信息。
    • get_ada_embedding:将查询文本转换为嵌入向量
    • pinecone.Index:连接到指定的Pinecone索引
    • index.query:在Pinecone索引中进行相似性搜索,返回与query_embedding最相关的前n个结果,include_metadata=True表示在结果中包含元数据
    • return语句:从排序后的结果中提取每个匹配项的元数据中的任务信息,最后返回一个包含任务信息的列表。
  • execution_agent:根据给定的目标和任务执行具体的操作,并生成任务的结果。它首先检索与目标相关的上下文信息,然后生成执行任务的响应。
    • context_agent...:从指定的Pinecone索引中检索与目标(objective)相关的前5个上下文信息。
    • prompt包含执行任务的总体目标objective和任务描述task。
# 添加第一个任务
first_task = {"task_id": 1,"task_name": YOUR_FIRST_TASK
}add_task(first_task)
# 主循环
task_id_counter = 1
while True:if task_list:# Print the task listprint("\033[95m\033[1m"+"\n*****TASK LIST*****\n"+"\033[0m\033[0m")for t in task_list:print(str(t['task_id'])+": "+t['task_name'])# Step 1: 拉取第一个任务task = task_list.popleft()print("\033[92m\033[1m"+"\n*****NEXT TASK*****\n"+"\033[0m\033[0m")print(str(task['task_id'])+": "+task['task_name'])# 根据目标(OBJECTIVE)和任务名称执行任务,获取当前任务的task_id并打印结果。result = execution_agent(OBJECTIVE,task["task_name"])this_task_id = int(task["task_id"])print("\033[93m\033[1m"+"\n*****TASK RESULT*****\n"+"\033[0m\033[0m")print(result)# Step 2: 丰富结果并存储到Pineconeenriched_result = {'data': result}  result_id = f"result_{task['task_id']}"vector = enriched_result['data']  # extract the actual result from the dictionaryindex.upsert([(result_id, get_ada_embedding(vector),{"task":task['task_name'],"result":result})])# Step 3: 创建新任务并重新调整任务列表优先级new_tasks = task_creation_agent(OBJECTIVE,enriched_result, task["task_name"], [t["task_name"] for t in task_list])for new_task in new_tasks:task_id_counter += 1new_task.update({"task_id": task_id_counter})add_task(new_task)prioritization_agent(this_task_id)time.sleep(1)  # Sleep before checking the task list again
  • Step 1:拉取并执行第一个任务

  • Step 2

    • 将任务执行的结果封装在一个字典中(如有必要,可以在这里对结果进行丰富处理)
    • 生成result_id ,并获取结果的嵌入向量vector
    • 将结果及其元数据(任务名称和结果)存储到Pinecone索引中。
  • Step 3

    • 调用task_creation_agent函数,根据目标、任务结果和当前任务名称创建新任务。
    • 为每个新任务分配一个新的任务ID,并将其添加到任务列表中。
    • 调用prioritization_agent函数,对任务列表进行重新排序。

  babyagi的效果演示见babyagi-ui,可以先体验一下了解一下babyagi的输入输出工作流,然后结合上图,用MetaGPT进行重写(MG已经抽象好了许多上层类,以及react的规划模式和actions列表)。你不一定要完全依据源码的逻辑进行重写,尝试找到更优秀的SOP.

4.2.2 babyagi问答
  1. 什么是enrich

  在代码和任务管理系统中,“enrich”通常指的是对数据或结果进行补充和改进,以增加其价值和有用性。在本示例中,enrich义可以总结为:

  • 提取和整理结果
    将执行代理生成的结果封装在一个字典enriched_result中,方便后续处理和存储
  • 准备和处理数据
    准备好需要存储的结果数据,如果需要,可以对结果进行进一步处理或补充,以提高数据的完整性和质量。
  • 存储到数据库
    生成结果的嵌入表示并将其与任务信息一起存储到Pinecone索引中,以便后续检索和使用
  1. 何时应该creat new task,何时应该排序任务优先级?

  每当一个任务执行完毕并获得结果后(即在调用execution_agent函数并处理结果之后),就会调用task_creation_agent函数,根据当前任务的结果生成新任务。

  任务优先级的排序发生在创建新任务之后(prioritization_agent(this_task_id)),对任务列表进行重新排序,以确保任务按照优先级顺序执行。

  1. 新的new task应该观察什么作为创建的依据(当前任务列表/目标/已完成的任务结果)

在创建新的任务时,系统需要观察和考虑以下几个关键因素(在prompt中体现):

  • Objective:任务的总体目标。新任务应该始终与总体目标(在本例中是“解决世界饥饿”)保持一致
  • Result:当前任务的结果。新任务的创建应基于当前任务的结果,当前任务的结果可以揭示接下来的步骤需要做什么,以及下一步的具体内容
  • Incomplete Tasks:未完成的任务列表。创建新任务时,必须避免与当前未完成的任务重复
  • Task Description:任务描述。当前任务的具体描述有助于决定新任务的内容和方向
  1. 人类是否可以介入这个流程,比如新任务的合入审核,任务执行时的拆解.

人类可以在这个SOP流程中介入,以下是一些可能的介入点:

  1. 新任务合入审核
    task_creation_agent函数中生成新任务列表后,可以引入人工审核环节,人工审核新生成的任务是否合理、是否重复、是否需要调整等。审核通过后再将新任务加入task_list

  2. 任务执行前的拆解
    在执行代理execution_agent执行任务前,可以让人工介入,对当前待执行的任务进行审核和拆解。如果任务过于复杂,可以由人工将其拆解为多个子任务,再将这些子任务加入task_list中。

  3. 任务执行结果审核
    execution_agent完成任务后,可以让人工审核执行结果的合理性和质量,并决定是否需要重新执行该任务或调整后续生成的新任务。

  4. 优先级调整
    prioritization_agent重新确定任务优先级后,可以让人工介入审核和调整新的优先级排序。

  5. 知识库维护
    定期由人工审核和更新Pinecone索引index中存储的知识库数据,确保其准确性和时效性。

  要实现人工介入,可以在相应的函数中添加人工审核和调整的接口,例如在Web UI上提供审核入口等。根据具体需求,可以对流程进行定制化调整,以最大程度发挥人机协作的效能。

  以下是一个示例,review_task_result函数,它会创建一个GUI窗口,显示任务描述和执行结果。用户可以选择"通过"或"拒绝"来审核结果。如果结果被拒绝,可以在reject_result函数中添加重新执行任务的逻辑(以下代码是claude AI生成,未审核)。

import tkinter as tk
from tkinter import scrolledtext# 任务执行结果审核函数
def review_task_result(result, task):# 创建审核窗口review_window = tk.Toplevel()review_window.title(f"审核任务 {task['task_id']}: {task['task_name']}")# 显示任务描述task_label = tk.Label(review_window, text=f"任务: {task['task_name']}", font=('Arial', 12, 'bold'))task_label.pack(pady=10)# 显示执行结果result_label = tk.Label(review_window, text="执行结果:", font=('Arial', 12, 'bold'))result_label.pack(pady=5)result_text = scrolledtext.ScrolledText(review_window, width=60, height=10)result_text.insert('end', result)result_text.configure(state='disabled')result_text.pack()# 审核选项def approve_result():review_window.destroy()print(f"任务 {task['task_id']} 执行结果已审核通过")def reject_result():review_window.destroy()print(f"任务 {task['task_id']} 执行结果已拒绝,需重新执行")# 在这里可以添加重新执行任务的逻辑approve_button = tk.Button(review_window, text="通过", command=approve_result)reject_button = tk.Button(review_window, text="拒绝", command=reject_result)approve_button.pack(side='left', padx=10, pady=10)reject_button.pack(side='left', padx=10, pady=10)# 在执行代理中调用审核函数
def execution_agent(objective: str, task: str) -> str:# ... (执行任务的代码)result = "这是任务的执行结果"# 调用审核函数review_task_result(result, task)return result# 启动GUI
root = tk.Tk()
root.withdraw()  # 隐藏主窗口

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/16112.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ModelBuilder之GDP空间化——批量值提取

一、前言 前面明确说到对于空间化过程中其实只有两个过程可以进行批量操作,一个是我们灯光指数提取过程和批量进行值提取,这里补充一点,对于灯光指数计算可以实现批量计算总灯光指数和平均灯光指数,综合灯光指数需要用平均灯光指数乘以面积占比求得,面积比就是(DN大于0的…

VS2022通过C++网络库Boost.asio搭建一个简单TCP异步服务器和客户端

基本介绍 上一篇博客我们介绍了通过Boost.asio搭建一个TCP同步服务器和客户端,这次我们再通过asio搭建一个异步通信的服务器和客户端系统,由于这是一个简单异步服务器,所以我们的异步特指异步服务器而不是异步客户端,同步服务器在…

BGP选路规则

配置地址,AS123使用ospf保证通讯,修改接口类型保证ospf学习环回20.0,30.0,100.0 地址时,是以24位掩码学习,R1,R2,R3都处于BGP边界,各自都需要宣告三者的私网环回 1, [R4]ip ip-prefi…

【Qnx 】Qnx IPC通信PPS

Qnx IPC通信PPS Qnx自带PPS服务,PPS全称Persistent Publish/Subscribe Service,就是常见的P/S通信模式。 Qnx PPS的通信模式是异步的,Publisher和Subscriber也无需关心对方是否存在。 利用Qnx提供的PPS服务,Publisher可以通知多…

嵌入式进阶——LED呼吸灯(PWM)

🎬 秋野酱:《个人主页》 🔥 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 PWM基础概念STC8H芯片PWMA应用PWM配置详解占空比 PWM基础概念 PWM全称是脉宽调制(Pulse Width Modulation&#xff09…

Arduino下载与安装(Windows 10)

Arduino下载与安装(Windows 10) 官网 下载安装 打开官网,点击SOFTWARE,进入到软件下载界面,选择Windows 选择JUST DOWNLOAD 在弹出的界面中,填入电子邮件地址,勾选Privacy Policy,点击JUST DOWNLOAD即可 …

【脚本篇】---spyglass lint脚本

目录结构 sg_lint.tcl (顶层) #1.source env #date set WORK_HOME . set REPORT_PATH ${WORK_HOME}/reports puts [clock format [clock second] -format "%Y-%m-%d %H:%M:%S"] #2.generate source filelist #3.set top module puts "##…

qt-C++笔记之QThread使用

qt-C笔记之QThread使用 ——2024-05-26 下午 code review! 参考博文: qt-C笔记之使用QtConcurrent异步地执行槽函数中的内容,使其不阻塞主界面 qt-C笔记之QThread使用 文章目录 qt-C笔记之QThread使用一:Qt中几种多线程方法1.1. 使用 QThread 和 Lambda…

ubuntu server 24.04 网络 SSH等基础配置

1 安装参考上一篇: VMware Workstation 虚拟机安装 ubuntu 24.04 server 详细教程 服务器安装图形化界面-CSDN博客 2 网络配置 #安装 sudo apt install net-tools#查看 ifconfig #修改网络配置 sudo vim /etc/netplan/50-cloud-init.yaml network:version: 2ethernets:en…

飞鸡:从小训练飞行的鸡能飞行吗?为什么野鸡能飞吗?是同一品种吗?今天自由思考

鸡的飞行能力在很大程度上受到其生理结构的限制。尽管鸡有翅膀,但与能够长时间飞行的鸟类相比,鸡的翅膀相对较小,且胸部肌肉较弱。再加上鸡的身体较重,这些因素共同限制了鸡的飞行能力。通常,鸡只能进行短暂的、低空的…

【wiki知识库】01.wiki知识库前后端项目搭建(SpringBoot+Vue3)

📝个人主页:哈__ 期待您的关注 🌼环境准备 想要搭建自己的wiki知识库,要提前搭建好自己的开发环境,后端我使用的是SpringBoot,前端使用的是Vue3,采用前后端分离的技术实现。同时使用了Mysql数…

单工无线发射接收系统

1 绪论 随着无线电技术的发展,通讯方式也从传统的有线通讯逐渐转向无线通讯。由于传统的有线传输系统有配线的问题,较不便利,而无线通讯具有成本廉价、建设工程周期短、适应性好、扩展性好、设备维护容易实现等特点,故未来通讯方式将向无线传输系统方向发展。同时,实现系…

mfc140.dll丢失原因和mfc140.dll丢失修复办法分享

mfc140.dll是与微软基础类库(Microsoft Foundation Classes, MFC)紧密相关的动态链接库(DLL)文件。MFC是微软为C开发者设计的一个应用程序框架,用于简化Windows应用程序的开发工作。以下是mfc140.dll文件的一些关键属性…

栈的实现(C语言)

文章目录 前言1.栈的概念及结构2.栈的实现3.具体操作3.1.初始化栈(StackInit)和销毁栈(StackDestory)3.2.入栈(StackPush)和出栈(StackPop)3.3.获得栈的个数(StackSize)、获得栈顶元素(StackTop)以及判空(StackEmpty) 前言 前段时间我们学习过了链表和顺序表等相关操作&#x…

go-zero 实战(4)

中间件 在 userapi 项目中引入中间件。go项目中的中间可以处理请求之前和之后的逻辑。 1. 在 userapi/internal目录先创建 middlewares目录,并创建 user.go文件 package middlewaresimport ("github.com/zeromicro/go-zero/core/logx""net/http&q…

经济寒冬下的黄金跳板:方案、活动、竞标一手掌握

推荐策划人必备的宝藏地产策划资源平台, 订阅浩叫:地产营销策划圈。这个平台简直是地产策划人的百宝箱,里面藏着无数的策划秘籍,等着你来挖掘。 这个平台就像是一个大型的方案库,里面收录了众多知名地产企业的内部资料…

leetcode:计数质数

class Solution { public:// 如果 x 是质数&#xff0c;那么大于 x 的 x 的倍数 2x,3x… 一定不是质数int countPrimes(int n) {vector<int> isPrime(n, 1);int ans 0;for (int i 2; i < n; i) {if (isPrime[i]) {ans 1;if ((long long)i * i < n) {for (int j …

leetcode-55 跳跃游戏

leetcode Problem: 55. 跳跃游戏 思路 假设我们是一个小人&#xff0c;从第一个下标开始&#xff0c;每次经过一个位置&#xff0c;我们就可以根据当前位置的数值nums[i]和位置下标i计算出该位置所能到达的后续位置的最大值rnums[i]i。而这个r之前的区域一定都是可以经过的。…

AI 谈“浔川AI翻译机”

在天工AI&#xff0c;天工AI在全网搜索“浔川AI翻译机”。 1 创作助手谈“浔川AI翻译机”&#xff1a; “浔川AI翻译机”是一个利用人工智能技术进行语言翻译的设备或应用程序。它可以将一种语言的文字或口语翻译成另一种语言&#xff0c;以实现不同语言之间的沟通和理解。浔…