RisingWave 用户定义函数 (二): Rust x WebAssembly

RisingWave 支持用户使用 Rust 语言编写自定义函数(UDF)。Rust UDF 会被编译到 WebAssembly,一种最初诞生于浏览器之中的虚拟机汇编语言。随后,这些 UDF 会在 RisingWave 进程中内嵌的 WebAssembly 虚拟机上被即时编译(JIT)执行。这种执行方式效率很高,相比原生指令只有少量性能损失,同时避免了远程通信带来的高延迟。因此,Rust UDF 非常适合编写计算密集型逻辑。在这篇文章中,我们将介绍 RisingWave Rust UDF 的设计与实现。

1. 应用场景

为什么要开发 Rust UDF 呢?事实上,RisingWave 首先支持的是 Python 语言的 UDF Server。Python 语言具有最广泛的用户群体,开发效率也很高。但是,随着越来越多的用户开始使用 Python UDF,我们也意识到它存在一些难以解决的痛点:

  • 最主要的是性能问题。因为 Python 本身的动态性和解释执行的特性,它是所有主流编程语言中运行速度最慢的一个。稍微复杂一点的处理逻辑都会运行很长时间。
  • 其次,数据库和 UDF Server 通过远程调用(RPC)的方式传递数据,一来一回会引入相当大的延迟(通常在毫秒级)。从而拖慢整个数据流的响应速度。
  • UDF Server 虽然非常灵活,但是用户需要自己额外部署和运维这套服务。如果负载比较大,还需要自己做负载均衡。而它不受 RisingWave 的管理,向系统中引入了很多不可控因素。

其实,大部分用户使用 UDF 的目的仅仅是实现一些内置函数不支持的处理逻辑,不值得为此承担额外的运维压力和性能风险。因此,我们需要一个高性能的、在进程中就地执行的 UDF 方案。

为了达到高性能,首先需要筛选编程语言。符合标准的语言有 C/C++/Rust/Go 等。我们选择了 Rust,一方面是因为它是 RisingWave 本身的编程语言,另一方面也因为它在 Arrow 和 WebAssembly 中有着成熟的生态。

在进程中就地执行,还对运行环境的隔离性有很强要求。因为用户可以编写任意代码,但无论它怎么写都不能影响 RisingWave 本身的运行。这就要求运行环境具备一定的沙盒特性,能够限制不可信代码的 CPU 和内存使用,对外部的访问。因此我们不能对 UDF 编译出的指令原生运行,而需要将其编译到一种虚拟机指令,通过 JIT 方式运行。WebAssembly 就是一个非常合适的选择。很多 WebAssembly 解释器都是用 Rust 写的,它们可以非常方便地嵌入到 RisingWave 中。

因此,我们最终选择了 Rust 作为编程语言、WebAssembly 作为执行环境。

2. 用户接口

2.1 内嵌 Rust 代码

要在 RisingWave 中创建 Rust UDF,只需通过 create function 命令定义函数名和数据类型,然后嵌入一段有着相同签名的 Rust 函数即可。

create function gcd(int, int) returns int language rust as $$fn gcd(mut a: i32, mut b: i32) -> i32 {while b != 0 {(a, b) = (b, a % b);}a}
$$;

对于表函数(Table Function),由于 Rust 的生成器(generator)特性尚未稳定,我们要求 Rust 函数返回一个迭代器(iterator),每次返回一行元素:

create function range(n int) returns table (x int) language rust as $$fn range(n: i32) -> impl Iterator<Item = i32> {(0..n).into_iter()}
$$;

这些 Rust 代码会在前端被编译成 WebAssembly 模块,然后在后端的运行时环境中执行。

2.2 上传 WebAssembly 模块

如果函数的实现比较复杂,比如需要依赖第三方库或者有多个文件、几行 SQL 写不下,用户也可以创建一个独立的 Rust 项目。通过我们提供的框架,自行编译出 WebAssembly 模块。最后直接把 WebAssembly 模块上传到 RisingWave 中运行。

比如上一篇中提到的使用 prost 库解析 protobuf 的场景:

// lib.rs
use arrow_udf::{function, types::StructType}; // 这是我们提供的 UDF 框架
use prost::{DecodeError, Message};            // 这是用户依赖的第三方库// 导入从 .proto 生成的 Rust 代码
pub mod proto {include!(concat!(env!("OUT_DIR"), "/proto.rs"));
}// 定义返回结构体
#[derive(StructType)]
struct DataKey {stream: String,pan: String,
}// 定义解析函数
#[function("decode_proto(bytea) -> struct DataKey")]
fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {let data_key = proto::DataKey::decode(data)?;Ok(DataKey {stream: data_key.stream,pan: data_key.pan,})
}

最终以 base64 编码的方式将编译出的 WebAssembly 模块导入 RisingWave。

\set wasm_binary `base64 -i target/release/decode.wasm`
create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
language wasm using base64 :'wasm_binary';

注意这里的 language 已经变成了 wasm 而不是 rust。因为实际输入的是 wasm 指令。未来我们可能还会支持更多语言编译到 WebAssembly,而这些原始的编程语言是不会被 RisingWave 感知到的。

3. 内部实现

介绍完了用户接口,让我们来看看 RisingWave 背后都做了哪些事情。

3.1 前端编译

当接收到一条 create function ... language rust ... 语句时,RisingWave 前端会在本地临时文件夹中生成一个 Rust 项目。语句中内嵌的 Rust 代码片段会被提取出来,并补充上必要的辅助代码。

例如,对于如下函数:

create function gcd(int, int) returns int language rust as $$fn gcd(mut a: i32, mut b: i32) -> i32 {while b != 0 {(a, b) = (b, a % b);}a}
$$;

前端会为它补充 use 语句和 SQL 签名,生成这样的 lib.rs 文件:

use arrow_udf::{function, types::StructType}; // import prelude#[function("gcd(int, int) -> int")]
fn gcd(mut a: i32, mut b: i32) -> i32 {while b != 0 {(a, b) = (b, a % b);}a
}

之后,前端会用 cargo release 模式编译出 .wasm 文件,并尝试通过 wasm-strip 去除其中的符号信息,降低二进制大小(通常可以降低到 1MB 左右)。

后面的工作和处理 language wasm 的情形一样。wasm 文件会被压缩后作为函数定义的一部分存储到 catalog 元数据中,等实际执行时再读取出来解压缩。

3.2 编译用户函数

那么这个函数到底是怎样被编译成可以实际执行的 WebAssembly 模块呢?

如果你曾看过我们之前介绍内置函数框架的文章,就会发现它们之间存在非常相似的设计。因为它们都是用 Rust 实现 SQL 函数,没有本质区别。因此我们复用了同一套基于过程宏的代码生成框架,最终将用户函数包起来,生成基于 Apache Arrow 的列式求值函数:

fn gcd_eval(input: &RecordBatch) -> Result<RecordBatch> {...}

此时,如果它是内置函数的话,就已经能被 Rust 调用执行了。但是,由于我们要编译到 WebAssembly,而 WebAssembly 解释器只能通过标准的 C ABI 来调用其中的函数(Rust 至今没有稳定的 ABI)。因此我们还需要做一些额外的工作,将其包装成可以被 FFI 调用的 C 函数。最终生成的代码长这个样子:

#[export_name = "arrowudf_Base64EncodedSignature"]
unsafe extern "C" fn gcd_int4_int4_int4_ffi(ptr: *const u8,len: usize,out: *mut arrow_udf::ffi::CSlice,
) -> i32 {// decode input RecordBatch from the buffer specified by `ptr` and `len`let input: RecordBatch = ...;// call Rust functionlet result = gcd_eval(&input);// encode output RecordBatch or error message to `out`match result {Ok(o) => { ...; 0 }Err(e) => { ...; -1 }}
}

其中 ptrlen 表示一个输入 buffer,buffer 中包含了以文件格式编码的输入 RecordBatch。这一编码是有标准格式的,因此可以作为 ABI 的一部分。 相应地,out 是这个函数的返回值,表示一个由内部分配内存的 buffer,其中包含以同样格式编码的输出 RecordBatch。这个 buffer 需要由调用者在读取完成后手动回收。因此 WebAssembly 模块还需要暴露自己的 allocdealloc 函数。最后函数返回值是错误码,0 表示成功,-1 表示出错。如果出错,那么 out 指向的 buffer 中包含了错误信息以供读取。

比较有意思的一点设计是这个函数的符号名,即 #[export_name] 中的字符串。当 UDF 加载器拿到这个 .wasm 文件时,需要首先从某处读取所有 UDF 的元数据信息。元数据会告诉它这个模块包含哪些函数,每个函数的签名是什么,以及该从哪个位置调用。第一个和第三个问题很简单。因为每个 extern "C" fn 都会在模块的导出符号表(symbol table)中出现。至于第二个问题,我们可以将函数签名(即 "gcd(int4, int4) -> int4")以字符串的形式编码在符号名中。但由于符号名中不能出现括号和空格等字符,我们还需要将字符串通过 base64 重新编码到合法字符,并加上 arrowudf_ 的固定前缀。这样 UDF 加载器只需要从符号表中过滤出包含这个前缀的符号,然后 base64 解码后面的签名即可。(注:这里用到的 base64 并非标准 base64,原因可参考代码)

如果用户函数返回 struct 类型,那么 struct 本身的定义将会编码到独立的符号中。例如这个函数:

#[derive(StructType)]
struct KeyValue<'a> {key: &'a str,value: &'a str,
}
#[function("key_value(string) -> struct KeyValue")]
fn key_value(kv: &str) -> Option<KeyValue<'_>> {let (key, value) = kv.split_once('=')?;Some(KeyValue { key, value })
}

过程宏会分别导出两个符号:(中括号内的部分会被 base64 编码)

arrowudf_[key_value(string)->struct KeyValue]
arrowudt_[KeyValue=key:string,value:string]

其中定义 struct 的符号会以 arrowudt_ 作为前缀。这个符号不指向任何实体,仅通过名称编码其 schema。加载器找到所有函数和类型符号后,即可恢复出完整的函数签名。

如果你已经编译出一个 WebAssembly UDF 模块,不妨通过以下命令来查看其中的导出符号:

wasm-objdump -x udf.wasm | grep arrowud

3.3 加载执行

最后到了数据库加载 WebAssembly 模块和执行其中函数的环节。我们使用 wasmtime 作为运行时。它首先加载解压后的 WebAssembly 模块,然后依据上面约定好的协议扫描所有函数和类型。用户调用函数时,它根据函数名找到对应函数的入口点。通过模块提供的 alloc 函数在其中动态分配一段内存保存输入数据。然后调用函数本身,获取输出。最后通过相反的步骤解析输出,释放内存。

由于每个 WebAssembly 实例只能单线程运行,为了支持多线程并行执行,我们还额外维护了一个实例池。每个线程每次从池子中取出一个空闲实例调用函数,如果没有就创建一个新的,用完后再归还回来。这样即可保证按需实例化容器,避免占用过多内存。(每个实例最少要消耗几 MB 的内存)

4. 相关话题讨论

最后,我们来探讨一下上述基于 WebAssembly 实现 Rust UDF 方案中,大家可能关心的一些技术问题。

4.1 性能

Rust UDF 是否能实现真正的高性能?如果不套一层 WebAssembly 提供隔离性的话,理论上可以通过动态链接库加载,直接运行原生指令。这样的 UDF 和内置函数性能是完全一样的。如果用户拥有对 RisingWave 的完全控制权,并追求极致性能,这可能是最适合的方案。但代价是 UDF 可能直接导致 RisingWave 阻塞甚至崩溃。用户需自行承担风险。

如果使用 WebAssembly JIT 运行,预热后纯函数的执行时间大约是原生的 1.5-2x 左右。但是考虑到数据传输的额外开销(外部 Arrow 编码 - 复制到 WASM 内部内存 - WASM 内部解码),实际端到端执行时间可能会更长。根据我们的 benchmark 结果,对于 gcd 这种简单计算,WebAssembly 运行时间是原生的 10x 左右。

我认为数据传输这块还有优化空间。未来可以尝试通过将 host 内存映射到 wasm 内部的方式,直接零拷贝地传递 Arrow 数组。这样可以避免很多编解码和内存拷贝开销。

4.2 外部访问

目前,WebAssembly 模块仅支持纯计算操作,几乎无法进行任何外部访问,包括访问网络、文件系统等。因为 wasmtime 默认就是沙箱模式。但为了支持模块使用 std,必须为它接入 WASI(可以理解为 wasm 的系统调用)。这样即使 UDF panic,至少可以通过 stderr 打印错误信息。不过,我们依然限制了 stdout 和 stderr 的缓冲区容量,以避免攻击者耗尽主机内存。

理论上说,我们可以逐步开放各种接口,让模块受控地访问网络(例如指定白名单地址)。这样我们甚至可以用 UDF 来实现自定义 source(参考社区用户的尝试)。但是目前这些都没有实现,有待未来进一步探索。

5. 总结

本文讨论了 RisingWave 中 Rust UDF 的设计与实现。Rust UDF 通过编译到 WebAssembly 实现隔离性,同时保持高性能。我们复用了内部函数框架中的 #[function] 过程宏,向用户提供了非常简洁的实现接口。用户函数会在编译期被包装成 C 接口,通过编码后的 Arrow RecordBatch 与调用者交换数据。用户函数签名和类型定义被编码在符号中,加载器通过扫描符号表即可找到所有函数和类型。最终通过 wasmtime 完成对函数的 JIT 执行。

不过依然要说明的是,这个工作其实和 RisingWave 并没有强绑定关系。任何 Rust 编写的数据处理系统都可以引入 arrow_udf_wasm 这个库来瞬间获得执行 Rust UDF 的能力。

6. 关于 RisingWave

RisingWave 是一款开源的分布式流处理数据库,旨在帮助用户降低实时应用的开发成本。RisingWave 采用存算分离架构,提供 Postgres-style 使用体验,具备比 Flink 高出 10 倍的性能以及更低的成本。

👨‍🔬加入 RW 社区,欢迎关注公众号:RisingWave 中文开源社区

🧑‍💻快速上手 RisingWave,欢迎体验入门教程:github.com/risingwave

💻深入使用 RisingWave,欢迎阅读用户文档:zh-cn.risingwave.com/docs

🔍更多常见问题及答案,欢迎搜索留言: risingwavelabs/discussions

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/35287.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【QT】Widget

目录 widget常用属性及其作用 enabled geomtry window frame window frame的影响 相关API windowTitle windowIcon qrc机制 qrc使用方式 自定义鼠标图片 设置字体样式 设置鼠标悬停提示 toolTip 控件获取焦点 styleSheet widget常用属性及其作用 属性作用…

手把手教你打造高精度STM32数字时钟,超详细步骤解析

STM32数字时钟项目详解 1. 项目概述 STM32数字时钟是一个集成了时间显示、闹钟功能、温湿度检测等多功能于一体的小型电子设备。它利用STM32的实时时钟(RTC)功能作为核心,配合LCD显示屏、按键输入、温湿度传感器等外设,实现了一个功能丰富的数字时钟系统。 2. 硬件组成 STM…

关于FreeRTOS在MCU(微控制器)和PC(个人计算机)上的源代码的区别

关于FreeRTOS在MCU&#xff08;微控制器&#xff09;和PC&#xff08;个人计算机&#xff09;上的源代码是否相同&#xff0c;我们可以从以下几个方面来详细分析和总结&#xff1a; 核心源代码&#xff1a; FreeRTOS的核心源代码&#xff08;如tasks.c和list.c等&#xff09;在…

IND83081芯片介绍(二)

七、典型应用 上面显示了独立的CAN收发器&#xff0c;而下面则显示了多个iND83081可以共享同一个CAN收发器的应用场景。通过这些连接&#xff0c;iND83081可以实现对多个LED的驱动和控制&#xff0c;同时与外部MCU进行通信 。 八、ELINS接口 1.ELINS简介 ELINS是一种从接口&a…

resume不严格加载model、避免某些层维度不一致导致错误

默认的&#xff0c;我们最常用的resume方式&#xff1a; if args.resume:checkpoint torch.load(resume_path, map_locationcpu)model_without_ddp.load_state_dict(checkpoint[model])print("Resume checkpoint %s" % resume_path)if optimizer in checkpoint and…

Oracle 19C19.3 rac安装并RU升级到19.14

19C支持RU补丁安装。 下载好19.14的RU补丁 [rootrac1 soft]# ll total 9830404 -rw-r--r-- 1 grid oinstall 3059705302 Jun 18 15:26 LINUX.X64_193000_db_home.zip -rw-r--r-- 1 grid oinstall 2889184573 Jun 18 15:27 LINUX.X64_193000_grid_home.zip -rw-r--r-- 1 grid …

C语言—文件IO相关操作

注&#xff1a;若有需要请查看官方文档&#xff1a;头文件#include<stdio.h> 注&#xff1a;要想学会&#xff0c;自己的练几遍&#xff0c;并且详细查看官方文档&#xff1b;一通百通&#xff1b; 1.fopen()函数 FILE * fopen ( const char * filename, const char * …

Antivirus Zap Pro :苹果 mac 电脑全面的系统安全解决方案

Antivirus Zap Pro 是一个全面的系统安全解决方案&#xff0c;它可以扫描和删除 Mac 中的恶意软件和其他恶意软件&#xff0c;还可以检测 Mac 上已经存在的威胁或可疑文件&#xff0c;并保护您的 Mac 免遭日后的威胁&#xff0c;满足用户不同的 Mac 电脑杀毒需求&#xff0c;有…

Firewalld防火墙基础

Firewalld 支持网络区域所定义的网络连接以及接口安全等级的动态防火墙管理工具 支持IPv4、IPv6防火墙设置以及以太网桥 支持服务或应用程序直接添加防火墙规则接口 拥有两种配置模式 运行时配置&#xff1a;临时生效&#xff0c;一旦重启或者重载即不生效 永久配置&#xff1a…

2024年【焊工(初级)】实操考试视频及焊工(初级)考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 焊工&#xff08;初级&#xff09;实操考试视频根据新焊工&#xff08;初级&#xff09;考试大纲要求&#xff0c;安全生产模拟考试一点通将焊工&#xff08;初级&#xff09;模拟考试试题进行汇编&#xff0c;组成一…

vue页面中,通过接口获取json返回值,并导出到Excel中;

vue页面中&#xff0c;通过接口获取json返回值&#xff0c;并导出到Excel中&#xff1b; 注意事项&#xff1a; 1、json中的key翻译成对应标题&#xff1b;2、过滤掉json中不需要的字段&#xff1b; 1、接口返回的json&#xff1a; {"errcode": 0,"data&quo…

【人工智能学习之图像操作(二)】

【人工智能学习之图像操作&#xff08;二&#xff09;】 图像上的运算图像混合按位运算 图像的几何变换仿射变换透视变换膨胀操作腐蚀操作开操作闭操作梯度操作礼帽操作黑帽操作 图像上的运算 图像上的算术运算&#xff0c;加法&#xff0c;减法&#xff0c;图像混合等。 加减…

【FPGA项目】System Generator算法板级验证-快速搭建外围测试电路

&#x1f389;欢迎来到FPGA专栏~System Generator算法板级验证-快速搭建外围测试电路 ☆* o(≧▽≦)o *☆嗨~我是小夏与酒&#x1f379; ✨博客主页&#xff1a;小夏与酒的博客 &#x1f388;该系列文章专栏&#xff1a;FPGA学习之旅 文章作者技术和水平有限&#xff0c;如果文…

Unity接入微信小游戏授权

官方授权流程图&#xff1a; https://res.wx.qq.com/wxdoc/dist/assets/img/authorize.4981f7f3.png 这里是一个非自定义隐私授权弹窗模式的流程Demo //获取玩家微信头像等数据 public static void GetWXUserInfo() {//授权Action action null;action new Action(()>{Get…

【微服务网关——中间件实现】

1.中间件的意义 避免成为if狂魔提高复用、隔离业务调用清晰、组合随意 2.实现原理 中间件一般都封装在路由上&#xff0c;路由是URL请求分发的管理器中间件选型 基于链表构建中间件 基于责任链的实现缺点&#xff1a;实现复杂&#xff0c;调用方式不灵活 使用数组构建中间件 控…

大模型笔记1: Longformer环境配置

论文: https://arxiv.org/abs/2004.05150 首先保证电脑上配置了git. git环境配置: https://blog.csdn.net/Andone_hsx/article/details/87937329 3.1、找到git安装路径中bin的位置&#xff0c;如&#xff1a;D:\Program Files\Git\bin 找到git安装路径中git-core的…

PostgreSQL 连接器:在 SeaTunnel 中的应用与优势

在现代企业中&#xff0c;数据已经成为核心资产&#xff0c;基于开源数据集成平台SeaTunnel&#xff0c;工程师如何高效地连接和管理这些数据源&#xff0c;直接关系到企业的竞争力和运营效率。 本文将给大家介绍如何通过 JDBC PostgreSQL 数据源连接器&#xff0c;在 SeaTunne…

第15周:RNN心脏病预测

目录 前言 二、前期准备 2.1 设置GPU 2.2 导入数据 2.2.1 数据介绍 2.2.2 导入代码 2.2.3 检查数据 三、数据预处理 3.1 划分训练集与测试集 3.2 标准化 四、构建RNN模型 4.1 基本概念 4.2 搭建代码 五、编译模型 六、训练模型 七、模型评估 总结 前言 &#…

直播怎么录制视频?直播视频,3种录制方法

“今晚我最喜欢的游戏博主要进行直播&#xff0c;但我可能还要加班。怎么办&#xff0c;不想错过直播的内容&#xff01;电脑怎么才能进行直播录制视频啊&#xff1f;谁能教教我&#xff1f;” 在数字化的今天&#xff0c;直播已经成为人们获取信息和娱乐的重要途径。有时&…

执行yum命令报错Could not resolve host: mirrors.cloud.aliyuncs.com; Unknown error

执行yum命令报错 [Errno 14] curl#6 - "Could not resolve host: mirrors.cloud.aliyuncs.com; Unknown error 修改图中所示两个文件&#xff1a; vim epel.repo vim CentOS-Base.repo 将所有的http://mirrors.cloud.aliyuncs.com 修改为http://mirrors.aliyun.com。 修改…