用过android的同学对于handler应该都很了解,用起来比较方便。这里用rust设计了一个简单的rust。
1.处理接口
pub trait ProcessMessage {fn handleMessage(&self,msg:Message);
}
2.Message结构体
pub struct Message {pub what:u32,pub arg1:i32,pub arg2:i32,pub next_time:u64,pub next:Option<Rc<RefCell<Message>>>, //1pub target:Option<Arc<Box<dyn ProcessMessage>>>//2
}
1.指向下一个message,整个消息队列采用链表结构,使用option主要是为了确认是否还有message
2.target是一个回调实现,每个message都需要带有target,这样message就能由指定的handler处理了。
关键函数
impl Message {....fn execute(&self) {match &self.target {None=> {},Some(handler) => {handler.handleMessage(Message{what:self.what,arg1:self.arg1,arg2:self.arg2,next_time:self.next_time,next:None,target:None})} //1}}
}
1.message处理的时候调用了ProcessMessage,这里主要考虑到使用的方便(不要再用什么unwrap),所以传出去的数据重新构造了一个新数据,性能上会有损耗
3.消息队列
pub struct MessageQueue {head:Mutex<Message>,//1cond:Condvar//2
}
1.消息队列头,为了减少option,这里规定了第一个消息一直不使用,所有消息从head的next开始排队
2.cond:如果没有消息,或者消息需要延迟发送,则使用这个做等待处理。每次有新消息加入的时候,重新针对消息队列排序,然后cond.notify一下,唤醒等待线程
4.Looper
struct Looper {queue:Arc<RefCell<MessageQueue>>,
}
Looper主要就是一个queue,
关键函数:
fn loop_self(&self) {loop {println!("loop self start");let msg = self.queue.clone().try_borrow_mut().unwrap().dequeue_message();//1println!("loop self trace1");match msg {None=>{},Some(m) => {let _msg = m.clone();_msg.try_borrow_mut().unwrap().execute();//2}}}}
这里loop_self主要是给一个线程做死循环用的。
1.从queue中获取message,如果没有则会在这里卡死
2.message执行
5.HandlerThread
struct HandlerThread {looper:Arc<Looper>
}
主要函数:
fn start(&self) {loop {self.looper.loop_self();//1}}
1.循环处理messagequeue的数据,这个start函数最后会在一个线程中调用
6.Handler
pub struct Handler {queue:Arc<RefCell<MessageQueue>>,processor:Arc<Box<dyn ProcessMessage>>,
}
关键函数:
pub fn new(processor:Box<dyn ProcessMessage>)->Self {println!("handler construct trace1", );let handler_th = HandlerThread::new();let handler = Handler {queue:handler_th.get_looper().queue.clone(),processor:Arc::new(processor)};println!("handler construct trace2", );thread::spawn(move||{handler_th.start();//1});handler}
1.上面数据结构的消息队列是在这里被驱动的。
测试代码:
struct MyProcessor {}impl ProcessMessage for MyProcessor {fn handleMessage(&self,msg:Message) {println!("msg is {}",msg.what);//thread::sleep(Duration::from_secs(10));}
}pub fn test_handler_send_empty_message() {println!("test_handler_send_empty_message trace1");let h = handler::Handler::new(Box::new(MyProcessor{}));println!("test_handler_send_empty_message trace2");h.send_empty_message(1);h.send_empty_message_delayed(2,1000);thread::sleep(Duration::from_secs(100));
}
全代码:
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::cell::RefCell;
use std::cell::Cell;
use std::ptr::NonNull;use std::rc::Rc;
use std::sync::Arc;
use std::sync::{Mutex,Condvar};
use std::thread;
use std::time::Duration;use tokio::time::Interval;use crate::system;pub trait ProcessMessage {fn handleMessage(&self,msg:Message);
}//---- Message ----
pub struct Message {pub what:u32,pub arg1:i32,pub arg2:i32,pub next_time:u64,pub next:Option<Rc<RefCell<Message>>>,pub target:Option<Arc<Box<dyn ProcessMessage>>>
}impl Message {pub fn new(what:u32)->Self {Message {what:what,arg1:0,arg2:0,next_time:0,next:None,target:None}}fn set_next(&mut self,next:Message)->&mut Self {self.next = Some(Rc::new(RefCell::new(next)));//Some(RefCell::new(Rc::new(Box::new(next))));self}// pub fn set_target(&mut self,target:Box<dyn ProcessMessage>)->&mut Self {// self.target = Some(target);// self// }fn execute(&self) {match &self.target {None=> {},Some(handler) => {handler.handleMessage(Message{what:self.what,arg1:self.arg1,arg2:self.arg2,next_time:self.next_time,next:None,target:None})}}}
}//---- MessageQueue ----
pub struct MessageQueue {head:Mutex<Message>,cond:Condvar
}impl MessageQueue {pub fn new()->Self {MessageQueue {head:Mutex::new(Message::new(0)),cond:Condvar::new()}}pub fn dequeue_message(&mut self)->Option<Rc<RefCell<Message>>> {println!("dequeue_message start");loop {let mut head = self.head.lock().unwrap();println!("dequeue_message trace1");match head.next.clone() {None => {println!("dequeue_message trace2");self.cond.wait(head);continue;},Some(msg)=>{let next_time = msg.try_borrow().unwrap().next_time;println!("msg next time is {},current is {}", next_time,system::currentMillis());if next_time <= system::currentMillis() {head.next = msg.try_borrow().unwrap().next.clone();return Some(msg.clone());}self.cond.wait_timeout(head,Duration::from_millis(next_time - system::currentMillis())); }}}}pub fn enqueue_message(&mut self,mut msg:Message) {let mut head = self.head.lock().unwrap();println!("enqueue message trace1", );if let Some(v) = &head.next {let mut prev = v.clone();let mut current = v.clone();let mut pos:u32 = 0;println!("enqueue message trace2", );loop {if current.try_borrow().unwrap().next_time > msg.next_time {println!("enqueue message trace3", );if pos == 0 {println!("enqueue message trace4", );msg.next = Some(current);head.next = Some(Rc::new(RefCell::new(msg)));} else {println!("enqueue message trace5", );msg.next = Some(current);prev.try_borrow_mut().unwrap().next = Some(Rc::new(RefCell::new(msg)));}self.cond.notify_one();return;} else {println!("enqueue message trace6", );pos += 1;let mut is_none = false;match current.try_borrow().unwrap().next {None => {is_none = true;},Some(_) => {}}if is_none {current.try_borrow_mut().unwrap().next = Some(Rc::new(RefCell::new(msg)));self.cond.notify_one();return;}prev = current.clone();let tmp: Rc<RefCell<Message>> = current.try_borrow_mut().unwrap().next.clone().unwrap().clone();current = tmp;}}} else {println!("enqueue message trace8", );head.next = Some(Rc::new(RefCell::new(msg)));self.cond.notify_one();return}}
}//
struct Looper {queue:Arc<RefCell<MessageQueue>>,
}impl Looper {fn new()->Self {Looper {queue:Arc::new(RefCell::new(MessageQueue::new())),}}fn get_queue(&self)->Arc<RefCell<MessageQueue>> {self.queue.clone()}fn loop_self(&self) {loop {println!("loop self start");let msg = self.queue.clone().try_borrow_mut().unwrap().dequeue_message();println!("loop self trace1");match msg {None=>{},Some(m) => {let _msg = m.clone();_msg.try_borrow_mut().unwrap().execute();}}}}
}
struct HandlerThread {looper:Arc<Looper>
}impl HandlerThread {fn new()->Self {HandlerThread {looper:Arc::new(Looper::new()),}}fn new_with_looper(looper:Arc<Looper>)->Self {HandlerThread {looper:looper.clone(),}}fn start(&self) {loop {self.looper.loop_self();}}pub fn get_looper(&self)->Arc<Looper> {self.looper.clone()}
}pub struct Handler {queue:Arc<RefCell<MessageQueue>>,processor:Arc<Box<dyn ProcessMessage>>,
}unsafe impl Send for HandlerThread{}impl Handler {pub fn new(processor:Box<dyn ProcessMessage>)->Self {println!("handler construct trace1", );let handler_th = HandlerThread::new();let handler = Handler {queue:handler_th.get_looper().queue.clone(),processor:Arc::new(processor)};println!("handler construct trace2", );thread::spawn(move||{handler_th.start();});handler}pub fn send_empty_message(&self,what:u32) {println!("send_empty_message start");let mut msg = Message::new(what);msg.target = Some(self.processor.clone());self.queue.try_borrow_mut().unwrap().enqueue_message(msg);}pub fn send_empty_message_delayed(&self,what:u32,interval:u64) {let mut msg = Message::new(what);msg.next_time = system::currentMillis() + interval;msg.target = Some(self.processor.clone());self.queue.try_borrow_mut().unwrap().enqueue_message(msg);}
}