babyAGI中有cooperative模式,其核心是调用ray库,实现分布式多进程执行任务。
从BabyAGI的源码中,我们可以学习ray core的使用。
1. Ray核心概念
1.1 Tasks
Ray 允许在单独的 Python 工作线程上异步执行任意函数。 这些异步执行的函数被称为“tasks”。 Ray 使任务能够根据 CPU、GPU 或自定义其资源需求。 Ray的集群调度程序根据资源请求在集群中分配任务实现并行执行。
1.2 Actors
一个actor本质上是一个有状态的worker或者服务。当一个新的 Actor 被实例化时,就会创建一个新的 Worker,并且 Actor 的方法会调度到该特定的 Worker 上,并且可以访问和改变该 Worker 的状态。 与任务一样,参与者支持 CPU、GPU 和自定义资源要求。
1.3 Objects
在 Ray 中,Tasks和 Actors在Object上创建和计算。 我们将这些对象称为远程对象,因为它们可以存储在 Ray 集群中的任何位置,并且我们使用对象引用来引用它们。
远程对象缓存在 Ray 的分布式共享内存对象存储中,集群中的每个节点都有一个对象存储。
在集群设置中,远程对象可以存在于一个或多个节点上,与持有对象引用的人无关。
1.4 Placement Group
组允许用户跨多个节点自动保留资源组(即组调度)。 然后,它们可用于安排 Ray 任务和 actor,使其尽可能的靠近本地(PACK)或分散(SPREAD)。 组通常用于调度actor,但也用于支持task。
1.5 环境依赖
当 Ray 在远程计算机上执行任务和 Actor 时,它们的环境依赖项(例如 Python 包、本地文件、环境变量)必须可供代码运行。 为了解决这个问题,可以
- 使用 Ray Cluster Launcher 提前准备集群上的依赖项
- 使用 Ray 的运行时环境即时安装它们
2. BabyAGI中的Ray代码
ray.init这个函数作用主要是连接到一个已经存在ray集群或者创建一个集群然后连接到它
使用ray.remote装饰python类,会将此类变为一个actors。每一个actor都会运行在自有的python进程中。
下面的代码定义一个cooperative模式的多个目标的存储actor。
使用队列存储多个目标,然后将目标调度到不同的远程对象上执行。
ACTOR_NAME="BabyAGI Objectives"try:ray.init(address="auto", namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
except:ray.init(namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)@ray.remote
class CooperativeObjectivesListStorageActor:def __init__(self):self.objectives = deque([])def append(self, objective: str):if not objective in self.objectives:self.objectives.append(objective)def is_empty(self):return False if self.objectives else Truedef get_objective_names(self):return [t for t in self.objectives]class CooperativeObjectivesListStorage:def __init__(self):try:self.actor = ray.get_actor(name=ACTOR_NAME, namespace="babyagi")except ValueError:self.actor = CooperativeObjectivesListStorageActor.options(name=ACTOR_NAME, namespace="babyagi", lifetime="detached").remote()def append(self, objective: str):self.actor.append.remote(objective)def is_empty(self):return ray.get(self.actor.is_empty.remote())def get_objective_names(self):return ray.get(self.actor.get_objective_names.remote())
但是,这段代码并没有放到主体的程序当中,估计是作者没有实现多目标的调度执行。
接下来我们看看,用到的协同的任务清单代码。
import sys
import logging
import ray
from collections import deque
from typing import Dict, Listfrom pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent.parent))
from extensions.ray_objectives import CooperativeObjectivesListStoragetry:ray.init(address="auto", namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
except:ray.init(namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)@ray.remote
class CooperativeTaskListStorageActor:def __init__(self):self.tasks = deque([])self.task_id_counter = 0def append(self, task: Dict):self.tasks.append(task)def replace(self, tasks: List[Dict]):self.tasks = deque(tasks)def popleft(self):return self.tasks.popleft()def is_empty(self):return False if self.tasks else Truedef next_task_id(self):self.task_id_counter += 1return self.task_id_counterdef get_task_names(self):return [t["task_name"] for t in self.tasks]class CooperativeTaskListStorage:def __init__(self, name: str):self.name = nametry:self.actor = ray.get_actor(name=self.name, namespace="babyagi")except ValueError:self.actor = CooperativeTaskListStorageActor.options(name=self.name, namespace="babyagi", lifetime="detached").remote()objectives = CooperativeObjectivesListStorage()objectives.append(self.name)def append(self, task: Dict):self.actor.append.remote(task)def replace(self, tasks: List[Dict]):self.actor.replace.remote(tasks)def popleft(self):return ray.get(self.actor.popleft.remote())def is_empty(self):return ray.get(self.actor.is_empty.remote())def next_task_id(self):return ray.get(self.actor.next_task_id.remote())def get_task_names(self):return ray.get(self.actor.get_task_names.remote())
基本就是SingleTaskListStorage的翻版,不过多增加远程调度的功能,而且任务不再是以dict形式存储,而是存放在集群当中。
因此,需要每次都进行优先级的排序,否则任务会被重复执行。
以上,就是cooperation模式的代码,整个看起来用处不大,可能任务比较多的时候有效,但瓶颈还是会在优先级排列的任务上。
下一篇文章,我们可以看看作者的另一段代码,babycoder实现的coder的agi