大模型系列11-ray
- Plasma
- PlasmaStore
- 启动监听
- 处理请求 ProcessMessage
- PlasmaCreateRequest请求
- PlasmaCreateRetryRequest请求
- PlasmaGetRequest请求
- PlasmaReleaseRequest
- PlasmaDeleteRequest
- PlasmaSealRequest
- ObjectLifecycleManager
- GetObject
- SealObject
- ObjectStoreRunner
- PlasmaStoreRunner
- ObjectStatsCollector
- OnObjectRefIncreased
- LocalObject
- ObjectManager
- PlasmaClient
- Plasma编译依赖项
- plasma_store_server_lib
- 代码文件
- 依赖库
- PlasmaClient
- 代码文件
- 依赖库
- ray_common
- 依赖库
- plasma_fbs
- 代码文件
- 无依赖库
- 其它
- Ray编译运行demo
- 编译
- 安装
- 如何查看当前目录的ray包
- bazel build基础教程
- 生成wheel并测试
- 编译问题
- 安装 bazel
- 安装bazel 6.5.0
- 报错 pack不是class template
- 报错:error: unused variable 'tag_key' [-Werror=unused-variable]
- 报错:main/test-filesystem.cc:2:10: fatal error: filesystem: No such file or directory #include <filesystem>
- 报错:sed: cannot rename python/ray/serve/generated/sedEFwvtV: Permission denied
- Ray
- 设计哲学
Plasma
PlasmaStore
启动监听
acceptor启动异步接收新的连接请求,
void PlasmaStore::DoAccept() {acceptor_.async_accept(socket_,boost::bind(&PlasmaStore::ConnectClient, this, boost::asio::placeholders::error));
}
在 Boost.Asio 中,异步操作(如 async_accept)不会自动重新启动,这是由其设计理念决定的。Boost.Asio 的异步操作通常是一次性的,即使一个操作完成了,如果你希望继续处理更多类似的操作,你必须显式地再次调用相应的函数。因此,需要在ConnectClient中再次 DoAccept。
void PlasmaStore::ConnectClient(const boost::system::error_code &error) {if (!error) {// Accept a new local client and dispatch it to the node manager.auto new_connection = Client::Create(// NOLINTNEXTLINE : handler must be of boost::AcceptHandler type.boost::bind(&PlasmaStore::ProcessMessage, this, ph::_1, ph::_2, ph::_3),std::move(socket_));}if (error != boost::asio::error::operation_aborted) {// We're ready to accept another client.DoAccept();}
}
下面是一个使用boost::asio构建tcp异步服务器监听的例子
#include <boost/asio.hpp>
#include <iostream>using boost::asio::ip::tcp;void handle_accept(tcp::socket socket, const boost::system::error_code& error) {if (!error) {std::string message = "Hello from server!\n";boost::asio::write(socket, boost::asio::buffer(message));std::cout << "Sent message to client." << std::endl;} else {std::cerr << "Error: " << error.message() << std::endl;}
}int main() {try {boost::asio::io_context io_context;tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 12345));std::cout << "Server is listening on port 12345..." << std::endl;while (true) {tcp::socket socket(io_context);acceptor.async_accept(socket, [&socket](const boost::system::error_code& error) {handle_accept(std::move(socket), error);});io_context.run();}} catch (std::exception& e) {std::cerr << "Error: " << e.what() << std::endl;}return 0;
}
处理请求 ProcessMessage
Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,fb::MessageType type,const std::vector<uint8_t> &message)
PlasmaCreateRequest请求
从请求的 message 字符串中解析成 Request类型,获取相应的请求的属性,如object_id,object_size等
const auto &object_id = GetCreateRequestObjectId(message);const auto &request = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(input);const size_t object_size = request->data_size() + request->metadata_size();
如果是 try_immediately == true,则原地执行 TryRequestImmediately,并将结果通过 client->SendFd 返回;否则,将其加入到 create_request_queue_ 队列中,并立刻尝试原地处理,如果本次请求处理有错误,则会触发retry with timeout,该重试请求之后的请求会因为该请求在等待retry中而被自动排队到后面(某个请求被retry,则create_timer_为true),对于排队的请求,会向客户端发送SendUnfinishedCreateReply的回复。
PlasmaCreateRetryRequest请求
客户端收到Unfinished的回复后,可以继续重试,当服务器端收到重试请求后,直接从当前的回复中查找该object_id是否已经完成,如果完成则回复object的location信息,如果未完成,继续回复 SendUnfinishedCreateReply。
PlasmaGetRequest请求
ReadGetRequest 目标是将收到的input字符串解析为PlasmaGetRequest,会拿到其属性,并将请求入队到 get_request_queue_
PlasmaReleaseRequest
调用 ReleaseObject 将 object_id 从client中移除。它会检查 object_lifecycle_mgr_ 中是否有该object_id,如果没有,则直接返回false;如果有,则调用 RemoveFromClientObjectIds 将该object_id移除,这会通过 client->MarkObjectAsUnused(object_id)将object从client的object_ids中移除,同时会将 object_lifecycle_mgr_ 中的object_id的引用计数减1。
PlasmaDeleteRequest
逐个遍历收到的object_id,调用 object_lifecycle_mgr_->DeleteObject将它们移除。 object_lifecycle_mgr_ 内部管理所有的 object_id以及object entry对象。
PlasmaSealRequest
逐个遍历收到的object,调用 object_lifecycle_mgr_.SealObject(object_ids[i]) 将它们seal掉,同时通过 add_object_callback_ 来告知。另外,对pending的get请求标识sealed:get_request_queue_.MarkObjectSealed
MarkObjectSealed
- 从 object_get_requests_ 获取指定object_id 对应的 get_requests
- 遍历get_requests,对每个request,将其object_id对应的plasma_object填充(通过object_lifecycle_mgr_.GetObject拿到的entry来填充),调用 object_satisfied_callback_ 来通知外部。如果对于这个get_request,它的所有的object_ids都满足之后,则通过 all_objects_satisfied_callback_ 来通知该get_request。
ObjectLifecycleManager
GetObject
从 object_store_ 获取对象 LocalObject
SealObject
向 object_store_->SealObject(object_id) 发起Seal对象,如果成功,则返回对应的entry。有个状态收集器来通过 stats_collector_->OnObjectSealed(*entry) 来感知各种对象的变化
ObjectStoreRunner
隶属于object_manager.h文件,它创建一个 store_thread_ 线程,启动PlasmaStoreRunner,注意完全单线程逻辑
PlasmaStoreRunner
检查 系统内存, plasma_directory, fallback_directory等信息,然后启动PlasmaStore的run loop,等待外部客户端连接。
构造函数和参数
class PlasmaStoreRunner {public:PlasmaStoreRunner(std::string socket_name,int64_t system_memory,bool hugepages_enabled,std::string plasma_directory,std::string fallback_directory);void Start(ray::SpillObjectsCallback spill_objects_callback,std::function<void()> object_store_full_callback,ray::AddObjectCallback add_object_callback,ray::DeleteObjectCallback delete_object_callback);
主要启动调用
store_.reset(new PlasmaStore(main_service_,*allocator_,*fs_monitor_,socket_name_,RayConfig::instance().object_store_full_delay_ms(),spill_objects_callback,object_store_full_callback,add_object_callback,delete_object_callback));
store_->Start();
main_service_.run();
ObjectStatsCollector
ObjectStatsCollector 是分布式系统中一个 轻量级但非常重要的组件,专注于对象生命周期的跟踪,空间的使用情况等。
OnObjectRefIncreased
Ray 的设计在引用计数变化的关键节点(0→1 和 1→2)进行了状态更新,主要是为了在性能与资源使用之间取得平衡:
-
引用计数从 0 到 1:对象引用计数为 0 时,通常意味着它是空闲的,可能被系统标记为可逐出(evictable)或不活跃。当引用计数变为 1 时,说明它开始被使用。标志对象从空闲变为活跃。
增加正在使用的统计,减少可逐出的统计。
如果对象封存且由 Worker 创建,标记为可溢出对象,以支持后续的溢出策略(例如,将这些对象移到磁盘以节省内存)
这是对象从“潜在可清理状态”变为“不可清理状态”的分界点,系统需要在此刻更新相关状态。 -
引用计数从 1 到 2
标志对象的重要性提升,进入多任务共享状态。
溢出操作会涉及磁盘 I/O,代价较高。引用计数增加意味着对象可能正在高频使用,因此通过及时更新“可溢出对象”统计,系统可以避免在不适当的时机溢出对象。
if (obj.GetRefCount() == 1) {num_objects_in_use_++;num_bytes_in_use_ += kObjectSize;if (kSource == plasma::flatbuf::ObjectSource::CreatedByWorker && kSealed) {num_objects_spillable_++;num_bytes_spillable_ += kObjectSize;}if (kSealed) {num_objects_evictable_--;num_bytes_evictable_ -= kObjectSize;}}// object ref count bump from 1 to 2if (obj.GetRefCount() == 2 &&kSource == plasma::flatbuf::ObjectSource::CreatedByWorker && kSealed) {num_objects_spillable_--;num_bytes_spillable_ -= kObjectSize;}
LocalObject
唯一标识一个对象,可定位该对象,以及
- object_info: 对象的信息如数据长度,owner地址信息
- allocation: 对象所隶属的mmaped chunk的起始位置,包括fd,map_size, offset等信息
- ref_count: 这个对象的引用计数
- state: 对象状态,open or sealed
- source: object来源,用于debug
- create_time & construct_duration: 创建时间戳以及耗时
ObjectManager
ObjectManager 是 Ray 分布式系统中用于管理 对象的存储、传输和生命周期 的核心组件。它在节点之间协调对象的流动,并与对象存储(如 Plasma Store)交互,确保对象的创建、删除、拉取和推送操作得以高效进行。
- 支持将对象从一个节点推送(push)到另一个节点(Push 和 PushLocalObject 方法)。
当节点请求对象时,从其他节点或磁盘拉取(pull)所需的对象(Pull 和 SendPullRequest 方法)。 - 支持溢出和逐出策略,通过溢出和逐出策略管理内存使用,确保内存资源不会耗尽。通过与 Plasma Store 的交互实现对象溢出(spillable)到磁盘,或者逐出(evictable)以释放内存资源。提供 IsPlasmaObjectSpillable 方法检查对象是否可以溢出。
- 使用异步 I/O 和多线程(如 rpc_threads_ 和 buffer_pool_),保证高吞吐量和低延迟。通过 buffer_pool_ 管理对象的内存分配和释放,动态调整拉取和推送操作的频率以适应节点的内存状况(UpdatePullsBasedOnAvailableMemory 方法)。对大对象进行分片,支持多线程传输以提升效率(PushObjectInternal 和 SendObjectChunk 方法)。在接收端,处理分片的写入和拼接(ReceiveObjectChunk 方法)。
- 当对象被创建时,调用 HandleObjectAdded,将对象信息注册到本地记录(如 local_objects_),并通知其他模块。
对象的推送(Push)
如果节点上的某个对象需要传输到其他节点,调用 Push 方法:检查对象是本地的(PushLocalObject)还是在磁盘上(PushFromFilesystem)。对对象进行分片,并通过 RPC 将分片逐一发送(PushObjectInternal 和 SendObjectChunk)。
对象的拉取(Pull)
如果本节点需要一个远程对象,调用 Pull 方法:向目标节点发送拉取请求(SendPullRequest)。
监听对象的位置信息更新,并根据最新信息决定拉取策略。
ObjectManager 和 PlasmaStore的核心区别
ObjectManager 是全局管理者,负责在分布式系统中调度和协调对象的使用。PlasmaStore 是本地存储引擎,专注于高效地管理单节点的共享内存对象。任务请求一个对象,ObjectManager 检查对象是否在本地 PlasmaStore 中。如果存在,直接通过共享内存访问;如果么有,则ObjectManager 向其他节点发送拉取请求,远程节点通过 PlasmaStore 提供数据。
PlasmaClient
Plasma编译依赖项
plasma_store_server_lib
代码文件
srcs = ["src/ray/object_manager/plasma/create_request_queue.cc","src/ray/object_manager/plasma/dlmalloc.cc","src/ray/object_manager/plasma/eviction_policy.cc","src/ray/object_manager/plasma/get_request_queue.cc","src/ray/object_manager/plasma/object_lifecycle_manager.cc","src/ray/object_manager/plasma/object_store.cc","src/ray/object_manager/plasma/plasma_allocator.cc","src/ray/object_manager/plasma/stats_collector.cc","src/ray/object_manager/plasma/store.cc","src/ray/object_manager/plasma/store_runner.cc",],hdrs = ["src/ray/object_manager/common.h","src/ray/object_manager/plasma/allocator.h","src/ray/object_manager/plasma/create_request_queue.h","src/ray/object_manager/plasma/eviction_policy.h","src/ray/object_manager/plasma/get_request_queue.h","src/ray/object_manager/plasma/object_lifecycle_manager.h","src/ray/object_manager/plasma/object_store.h","src/ray/object_manager/plasma/plasma_allocator.h","src/ray/object_manager/plasma/stats_collector.h","src/ray/object_manager/plasma/store.h","src/ray/object_manager/plasma/store_runner.h","src/ray/thirdparty/dlmalloc.c",],
依赖库
":plasma_client",":stats_lib","//src/ray/common:network",
PlasmaClient
代码文件
srcs = ["src/ray/object_manager/common.cc","src/ray/object_manager/plasma/client.cc","src/ray/object_manager/plasma/connection.cc","src/ray/object_manager/plasma/malloc.cc","src/ray/object_manager/plasma/plasma.cc","src/ray/object_manager/plasma/protocol.cc","src/ray/object_manager/plasma/shared_memory.cc",] + select({"@platforms//os:windows": [],"//conditions:default": ["src/ray/object_manager/plasma/fling.cc",],}),hdrs = ["src/ray/object_manager/common.h","src/ray/object_manager/plasma/client.h","src/ray/object_manager/plasma/common.h","src/ray/object_manager/plasma/compat.h","src/ray/object_manager/plasma/connection.h","src/ray/object_manager/plasma/malloc.h","src/ray/object_manager/plasma/plasma.h","src/ray/object_manager/plasma/plasma_generated.h","src/ray/object_manager/plasma/protocol.h","src/ray/object_manager/plasma/shared_memory.h",] + select({"@platforms//os:windows": [],"//conditions:default": ["src/ray/object_manager/plasma/fling.h",],}),
依赖库
":plasma_fbs",":ray_common","//src/ray/protobuf:common_cc_proto","//src/ray/util","@msgpack",
ray_common
依赖库
ray_cc_library(name = "ray_common",deps = [":stats_metric","//src/ray/common:asio","//src/ray/common:constants","//src/ray/common:event_stats","//src/ray/common:file_system_monitor","//src/ray/common:grpc_util","//src/ray/common:id","//src/ray/common:memory_monitor","//src/ray/common:network","//src/ray/common:ray_config","//src/ray/common:ray_syncer","//src/ray/common:status","//src/ray/common:task_common","//src/ray/common:test_util","//src/ray/protobuf:gcs_cc_proto","@com_google_googletest//:gtest",],
)
plasma_fbs
代码文件
src/ray/object_manager/plasma/plasma.fbs
无依赖库
其它
"//src/ray/protobuf:common_cc_proto","//src/ray/util",
Ray编译运行demo
编译
经过6个多小时的奋战,./build.sh
终于编译成功,其实回过头来只有两件事情:
- 安装bazel6.5
- 安装gcc-9
官网参考:https://docs.ray.io/en/latest/ray-contribute/development.html
下载源代码,然后在根目录下执行 ./build.sh
(pytorch_gpu) ➜ /mnt/c/workspace/llm/ray ./build.sh
- 报错说,找不到’/root/bin/bazel’,于是参照下面的编译问题来安装 bazel,要求安装bazel6.5版本。
- 进一步执行
./build.sh
报错说 pack 不是class template,需要升级到c++17,升级g++到9 - 切换思路,决策使用小模块编译方案,只编译特定模块,找到根目录的BUILD.bazel文件
/mnt/c/workspace/llm/ray/cpp/BUILD.bazel
,执行编译grpc lib- 编译grpc:在ray源代码的根目录下执行,
bazel build //:grpc_common_lib
报错:error: unused variable ‘tag_key’ [-Werror=unused-variable]
- 编译grpc:在ray源代码的根目录下执行,
- 重新编译,启用copt参数:
bazel build //:grpc_common_lib --copt=-Wno-error=unused-variable
。报错:main/test-filesystem.cc:2:10: fatal error: filesystem: No such file or directory #include <filesystem>
。 同样需要安装gcc-9
, - 安装g+±9
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
apt install gcc-9 ## 自动安装gcc-9和g++-9
- 替换默认的g++ 7.5.0
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 50
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-9 50然后输入g++ --version 可以看到变成9.4.0版本其它命令
update-alternatives --query g++
update-alternatives --list gcc
sudo update-alternatives --config gccbazel info cxx # 查看当前bazel是否使用的g++-9
运行单个目录,更容易缩小问题范围
bazel build //cpp:ray_cpp_pkg --verbose_failures
./build.sh还失败,程序被莫名杀死,dmesg
查看内存不足导致,调整机器提供给WSL的内存,直接搜索 wsl 即可进行设置内存,内存设置完毕最终编译成功
安装
安装nvm
curl https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash
nvm install 14
nvm use 14
编译dashboard
cd python/ray/dashboard/client
npm ci
npm run build
cd ../..
安装ray
# Install Ray.
cd python/
# Install required dependencies.
pip install -r requirements.txt
# You may need to set the following two env vars if you have a macOS ARM64(M1) platform.
# See https://github.com/grpc/grpc/issues/25082 for more details.
# export GRPC_PYTHON_BUILD_SYSTEM_OPENSSL=1
# export GRPC_PYTHON_BUILD_SYSTEM_ZLIB=1
pip install -e . --verbose # Add --user if you see a permission denied error.
命令 pip install -e . --verbose
是一个用于安装当前目录下 Python 包的命令
- -e (可编辑模式):以“可编辑模式”安装当前目录下的 Python 包。这意味着,包的源代码不会被复制到 Python 的 site-packages 目录,而是创建一个指向当前目录的符号链接。
适用于开发环境,当你对代码进行修改时,这些修改会立即反映在安装的包中。 - . : 表示安装当前目录下的包,当前目录应该包含 setup.py 文件或 pyproject.toml 文件,以指定包的构建和安装配置
验证ray
python3 -c "import ray"
如何查看当前目录的ray包
如果你已经以开发模式安装了当前目录的包,可以用 pip list
查看安装的包:
pip list | grep ray
,还有一个是 pip show ray
如果当前目录包含 setup.py,可以执行以下命令来查看定义的包名称:
python setup.py --name
site-packages中有指向ray包的软链接
cat /root/anaconda3/envs/pytorch_gpu/lib/python3.11/site-packages/ray.egg-link
bazel build基础教程
bazel build //main:hello-world
-
在当前目录下从项目根目录的 main 子目录开始查找。
:hello-world
表示 main 目录下的一个构建目标,在main目录下有对应的 BUILD 文件定义。 -
bazel build:
命令告诉 Bazel 构建指定目标。
构建结果通常存放在 bazel-out/ 目录下。
教程链接:https://bazel.build/reference/be/c-cpp?hl=zh-cn
生成wheel并测试
组装轮子
cd ~/ray/python
python3 setup.py bdist_wheel
那么最终的轮子就在~/ray/python/dist里了
安装轮子:
cd dist
pip3 install *.whl
测试模块是否工作正常:
cd ~/ray
python3 -m pytest -v python/ray/tests/test_mini.py
编译问题
执行编译期间遇到了很多问题,一一解决
安装 bazel
问题1: FileNotFoundError: [Errno 2] No such file or directory: ‘/root/bin/bazel’
查看bazel的安装文档: https://bazel.build/install/ubuntu?hl=zh-cn
安装bazel
sudo apt install apt-transport-https curl gnupg -y
curl -fsSL https://bazel.build/bazel-release.pub.gpg | gpg --dearmor >bazel-archive-keyring.gpg
sudo mv bazel-archive-keyring.gpg /usr/share/keyrings
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/bazel-archive-keyring.gpg] https://storage.googleapis.com/bazel-apt stable jdk1.8" | sudo tee /etc/apt/sources.list.d/bazel.listsudo apt update && sudo apt install bazel -y
安装bazel 6.5.0
ERROR: The project you’re trying to build requires Bazel 6.5.0 (specified in /mnt/c/workspace/llm/ray/.bazelversion), but it wasn’t found in /usr/bin.
You can install the required Bazel version via apt:
sudo apt update && sudo apt install bazel-6.5.0
报错 pack不是class template
需要使用c++17编译
ray_cpp_lib/ray/api/msgpack_adaptor.h:27:8: error: 'pack' is not a class templatestruct pack<std::any> {
报错:error: unused variable ‘tag_key’ [-Werror=unused-variable]
重新编译,启用copt参数: bazel build //:grpc_common_lib --copt=-Wno-error=unused-variable
报错:main/test-filesystem.cc:2:10: fatal error: filesystem: No such file or directory #include
安装gcc 9
报错:sed: cannot rename python/ray/serve/generated/sedEFwvtV: Permission denied
sudo chmod -R u+w python/ray/serve/generated/
Ray
设计哲学
不需要将o1以及o2的数据返回给Driver层,只提前设计好框架,数据在workers之间自动按需流转