做纯粹的自己。“你要搞清楚自己人生的剧本——不是父母的续集,不是子女的前传,更不是朋友的外篇。对待生命你不妨再大胆一点,因为你好歹要失去它。如果这世上真有奇迹,那只是努力的另一个名字”。
一、crossbeam_channel
参考 crossbeam_channel - Rust
crossbeam_channel 是一个多生产者多消费者通道,用于消息传递,它是std::sync::mpsc的替代品,具有更多的功能和更好的性能。
二、Channel 类型
通道可以使用两个函数创建:
- bounded 函数创建一个容量有限的信道,即一个信道一次可以容纳的消息数量是有限制的。
- unbounded 函数创建一个容量无界的信道,即它一次可以容纳任意数量的消息。
这两个函数都返回一个发送方 Sender 和一个接收方 Receiver,它们代表通道的相反两端。
创建一个有界 Channel:
use crossbeam_channel::bounded;// Create a channel that can hold at most 5 messages at a time.
let (s, r) = bounded(5);// Can send only 5 messages without blocking.
for i in 0..5 {s.send(i).unwrap();
}// Another call to `send` would block because the channel is full.
// s.send(5).unwrap();
创建一个无界 Channel:
use crossbeam_channel::unbounded;// Create an unbounded channel.
let (s, r) = unbounded();// Can send any number of messages into the channel without blocking.
for i in 0..1000 {s.send(i).unwrap();
}
三、通过 JNI 使用 Channel
Java 端可以通过 JNI 调用 getSender
获取发送端指针,调用 sendMessage
发送消息到 Rust 中的处理线程,由 Rust 负责处理核心逻辑。
1、新建一个 Rust 库项目
cargo new rust_jni_channel_test --lib
添加依赖包,
# Cargo.toml[dependencies]
jni = "0.21.1"
lazy_static = "1.5.0"
crossbeam-channel = "0.5.13"
#log = "0.4"
#env_logger = "0.11"[lib]
crate_type = ["cdylib"]
实现 JNI 模块函数,
// lib.rs#[macro_use]
extern crate lazy_static;use jni::objects::{JClass, JObject};
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::thread;lazy_static! {static ref SENDER: Sender<String> = {let (sender, receiver) = unbounded();// Spawn a thread to handle the receiverthread::spawn(move || {for message in receiver.iter() {println!("Received message: {}", message);}});sender};
}#[no_mangle]
pub extern "system" fn Java_com_yushanma_MyResultHandler_getSender(_env: JNIEnv,_class: JClass,
) -> jlong {let sender_ptr = Box::into_raw(Box::new(SENDER.clone())) as jlong;sender_ptr
}#[no_mangle]
pub extern "system" fn Java_com_yushanma_MyResultHandler_sendMessage(mut env: JNIEnv,_class: JClass,sender_ptr: jlong,message: JObject,
) {let sender = unsafe { &*(sender_ptr as *mut Sender<String>) };let message: String = env.get_string(&message.into()).expect("Couldn't get java string!").into();match sender.send(message) {Ok(_) => println!("Message sent successfully"),Err(e) => eprintln!("Failed to send message: {:?}", e),}
}
以上代码主要做了三件事情:
1、定义静态变量
- 使用
lazy_static!
宏定义一个静态变量SENDER
,它是一个Sender<String>
类型的通道发送端。 - 创建一个无界通道(unbounded channel),返回一个发送端和接收端。
- 启动一个新线程,在该线程中不断从接收端读取消息并打印出来。
- 返回发送端
sender
。
2、实现JNI函数:获取发送端:Java_com_yushanma_MyResultHandler_getSender
- 这个函数名遵循JNI命名规则,表示这是一个供Java调用的本地方法。
- 函数参数包括JNI环境指针
_env
和Java类_class
。 - 将静态变量
SENDER
克隆后转换为原始指针,再将其转换为jlong
类型返回给Java。
3、实现JNI函数:发送消息:Java_com_yushanma_MyResultHandler_sendMessage
- 这个函数同样遵循JNI命名规则。
- 参数包括JNI环境指针
env
、Java类_class
、发送端指针sender_ptr
和Java字符串对象message
。 - 将
sender_ptr
转换回Sender<String>
类型的引用。 - 从Java字符串对象中提取出Rust字符串。
- 尝试通过发送端发送消息,如果成功则打印成功信息,否则打印错误信息。
2、新建一个 Maven 项目
package com.yushanma;import java.io.IOException;/*** @version: V1.0* @author: 余衫马* @description: 主函数* @data: 2024-11-22 19:24**/public class MyResultHandler {private static native long getSender();private static native void sendMessage(long senderPtr, String message);// 用于加载包含本地方法实现的本地库。static {System.loadLibrary("rust_jni_channel_test");}public static void main(String[] args) throws IOException {long senderPtr = getSender();new Thread(() -> {for (int i = 0; i < 100; i++) {sendMessage(senderPtr, String.format("Hello from Java! COUNT * %d", i + 1));}}).start();new Thread(() -> {for (int i = 100; i < 200; i++) {sendMessage(senderPtr, String.format("Hello from Java! COUNT * %d", i + 1));}}).start();System.in.read();}
}
我们声明了两个本地方法getSender
和sendMessage
,它们分别用于获取发送端指针和发送消息,对应 Rust 库中封装的两个 Native 方法。然后在 main 函数中,启动两个新线程,每个线程发送100条消息到Rust端,并且使用System.in.read()
阻塞主线程,以防止程序过早退出。
3、添加外部 libs 与 VM 启动参数
-Djava.library.path=D:\JWorkSpace\ChannelDeomo\libs
通过 VM 参数指定外部库路径,否则会直接使用默认路径,会报错找不到 dll 文件。
4、运行效果
四、封装
1、新建 Callback 类
package com.yushanma.callback;import java.time.LocalDateTime;/*** @version: V1.0* @author: 余衫马* @description: 回调类* @data: 2024-11-22 19:30**/
public class MyCallback {// 用于加载包含本地方法实现的本地库。static {System.loadLibrary("rust_jni_channel_test");}/*** Sender 指针*/private final long senderPtr;/*** 获取 Sender 指针** @return*/private static native long getSender();/*** 发送信息** @param senderPtr Sender 指针* @param message 信息内容*/private static native void sendMessage(long senderPtr, String message);/*** 默认构造方法*/public MyCallback() {senderPtr = getSender();}/*** 回调方法*/public void callback(String out) {sendMessage(senderPtr, String.format("%s callback data: %s, senderPtr %d", LocalDateTime.now(), out, senderPtr));}}
package com.yushanma;import com.yushanma.callback.MyCallback;import java.io.IOException;
import java.util.UUID;/*** @version: V1.0* @author: 余衫马* @description: 主函数* @data: 2024-11-22 19:24**/public class MyResultHandler {public static void main(String[] args) throws IOException {new Thread(() -> {MyCallback myCallback = new MyCallback();for (int i = 0; i < 10; i++) {myCallback.callback(UUID.randomUUID().toString());}}).start();new Thread(() -> {MyCallback myCallback = new MyCallback();for (int i = 0; i < 10; i++) {myCallback.callback(UUID.randomUUID().toString());}}).start();System.in.read();}
}
2、Rust 修改命名路径
#[macro_use]
extern crate lazy_static;use jni::objects::{JClass, JObject};
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::thread;lazy_static! {static ref SENDER: Sender<String> = {let (sender, receiver) = unbounded();// Spawn a thread to handle the receiverthread::spawn(move || {for message in receiver.iter() {println!("Received message: {}", message);}});sender};
}#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_getSender(_env: JNIEnv,_class: JClass,
) -> jlong {let sender_ptr = Box::into_raw(Box::new(SENDER.clone())) as jlong;sender_ptr
}#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_sendMessage(mut env: JNIEnv,_class: JClass,sender_ptr: jlong,message: JObject,
) {let sender = unsafe { &*(sender_ptr as *mut Sender<String>) };let message: String = env.get_string(&message.into()).expect("Couldn't get java string!").into();match sender.send(message) {Ok(_) => println!("Message sent successfully"),Err(e) => eprintln!("Failed to send message: {:?}", e),}
}
路径要保持一致,否则调用本地方法时会报错找不到函数。
3、运行效果
五、内存释放
在 Java 中,JVM(Java Virtual Machine)管理着内存的分配和释放。对于纯 Java 对象,JVM 的垃圾回收机制会自动处理对象的生命周期。然而,当涉及到与本地代码(如 Rust 或 C/C++)交互时,需要特别注意资源的管理。
在这个示例中,MyCallback
是一个通过 JNI(Java Native Interface)或类似技术与 Rust 代码交互的类。senderPtr
是一个指向本地(Rust)资源的指针。当 MyCallback
实例 lib
被垃圾回收时,JVM 并不会自动知道如何释放 senderPtr
所指向的本地资源。这意味着我们需要手动管理这些资源,以避免内存泄漏。
1、显式释放资源
在 MyCallback
类中提供一个方法,用于显式释放本地资源。
package com.yushanma.callback;import java.time.LocalDateTime;/*** @version: V1.0* @author: 余衫马* @description: 回调类* @data: 2024-11-22 19:30**/
public class MyCallback {// 用于加载包含本地方法实现的本地库。static {System.loadLibrary("rust_jni_channel_test");}/*** Sender 指针*/private long senderPtr;/*** 获取 Sender 指针** @return*/private static native long getSender();/*** 发送信息** @param senderPtr Sender 指针* @param message 信息内容*/private static native void sendMessage(long senderPtr, String message);/*** 释放资源* @param senderPtr Sender 指针*/private static native void releaseSender(long senderPtr);/*** 默认构造方法*/public MyCallback() {senderPtr = getSender();}/*** 回调方法*/public void callback(String out) {sendMessage(senderPtr, String.format("%s callback data: %s, senderPtr %d", LocalDateTime.now(), out, senderPtr));}/*** 释放资源*/public void release(){releaseSender(senderPtr);senderPtr = 0;}/*** 重载 Object 类的 finalize 方法* 不推荐依赖 finalize 方法,因为它的执行时间是不确定的,但作为最后的防线,可以在 finalize 方法中释放资源。* @throws Throwable*/@Overrideprotected void finalize() throws Throwable {try {if (senderPtr != 0) {releaseSender(senderPtr);}} finally {super.finalize();}}
}
2、使用 AutoCloseable
接口
package com.yushanma.callback;import java.time.LocalDateTime;/*** @version: V1.0* @author: 余衫马* @description: 回调类* @data: 2024-11-22 19:30**/
public class MyCallback implements AutoCloseable {// 用于加载包含本地方法实现的本地库。static {System.loadLibrary("rust_jni_channel_test");}/*** Sender 指针*/private long senderPtr;/*** 获取 Sender 指针** @return*/private static native long getSender();/*** 发送信息** @param senderPtr Sender 指针* @param message 信息内容*/private static native void sendMessage(long senderPtr, String message);/*** 释放资源* @param senderPtr Sender 指针*/private static native void releaseSender(long senderPtr);/*** 默认构造方法*/public MyCallback() {senderPtr = getSender();}/*** 回调方法*/public void callback(String out) {sendMessage(senderPtr, String.format("%s callback data: %s, senderPtr %d", LocalDateTime.now(), out, senderPtr));}// /**
// * 释放资源
// */
// public void release(){
// releaseSender(senderPtr);
// senderPtr = 0;
// }
//
// /**
// * 重载 Object 类的 finalize 方法
// * 不推荐依赖 finalize 方法,因为它的执行时间是不确定的,但作为最后的防线,可以在 finalize 方法中释放资源。
// * @throws Throwable
// */
// @Override
// protected void finalize() throws Throwable {
// try {
// if (senderPtr != 0) {
// releaseSender(senderPtr);
// }
// } finally {
// super.finalize();
// }
// }/*** 释放资源*/@Overridepublic void close() {if (senderPtr != 0) {releaseSender(senderPtr);senderPtr = 0;}}}
try (MyCallback myCallback = new MyCallback()) {for (int i = 0; i < 10; i++) {myCallback.callback(UUID.randomUUID().toString());}
} // 自动调用 myCallback.close() 方法
3、Rust 释放内存逻辑
添加一个新的 JNI 函数来释放 Sender
,并确保在释放时不会发生内存泄漏或其他问题。
#[macro_use]
extern crate lazy_static;use jni::objects::{JClass, JObject};
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::thread;lazy_static! {static ref SENDER: Sender<String> = {let (sender, receiver) = unbounded();// Spawn a thread to handle the receiverthread::spawn(move || {for message in receiver.iter() {println!("Received message: {}", message);}});sender};
}#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_getSender(_env: JNIEnv,_class: JClass,
) -> jlong {let sender_ptr = Box::into_raw(Box::new(SENDER.clone())) as jlong;sender_ptr
}#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_sendMessage(mut env: JNIEnv,_class: JClass,sender_ptr: jlong,message: JObject,
) {let sender = unsafe { &*(sender_ptr as *mut Sender<String>) };let message: String = env.get_string(&message.into()).expect("Couldn't get java string!").into();match sender.send(message) {Ok(_) => println!("Message sent successfully"),Err(e) => eprintln!("Failed to send message: {:?}", e),}
}#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_releaseSender(_env: JNIEnv,_class: JClass,sender_ptr: jlong,
) {if sender_ptr != 0 {unsafe {// Convert the raw pointer back to a Box and drop itlet _ = Box::from_raw(sender_ptr as *mut Sender<String>);}println!("Sender released");}
}
释放内存的关键在这里:
let _ = Box::from_raw(sender_ptr as *mut Sender<String>);
这行代码的作用是将一个原始指针(raw pointer)转换为一个 Box
,从而恢复对堆上分配的内存的所有权。在 Rust 中,Box
是一个智能指针类型,用于管理堆上的内存。具体来说,这行代码执行了以下操作:
- 类型转换:
sender_ptr as *mut Sender<String>
将sender_ptr
转换为一个可变的原始指针,指向Sender<String>
类型的数据。 - 从原始指针创建 Box:
Box::from_raw
接受一个原始指针,并返回一个Box
,从而接管该指针所指向的内存的所有权。
完整的解释如下:
sender_ptr
是一个原始指针,通常是通过某种方式获得的,例如通过Box::into_raw
方法将一个Box
转换为原始指针。as *mut Sender<String>
是一个类型转换,将sender_ptr
转换为一个指向Sender<String>
的可变原始指针。Box::from_raw(sender_ptr as *mut Sender<String>)
使用这个原始指针创建一个Box<Sender<String>>
,这样 Rust 的所有权系统就可以正确地管理这块内存。
需要注意的是,使用 Box::from_raw
时要确保:
- 原始指针确实指向有效的堆内存。
- 该内存之前是通过
Box
分配的。 - 在调用
Box::from_raw
后,不再使用原始指针,因为Box
会负责释放这块内存。