socketioxide的axum集成
- 启动socketio依靠examle里的layer
- 使用可变State依靠axum里的example
- 提取client,IP
- 非代理,tcp,socket对方地址
- 代理SocketRef里socket.req_parts.
- axum的get,or,post,的handle中请求处理中使用emit发送消息.
- 演示几个自己用的 消息处理
- 1 ,消息mess, 通知签到. null ,签到空提醒,update_online 上下线更新.
- 2.web 中shell命令的异步输出.
启动socketio依靠examle里的layer
https://github.com/Totodore/socketioxide
原版echo代码
use axum::routing::get;
use serde_json::Value;
use socketioxide::{extract::{AckSender, Bin, Data, SocketRef},SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;fn on_connect(socket: SocketRef, Data(data): Data<Value>) {info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);socket.emit("auth", data).ok();socket.on("message",|socket: SocketRef, Data::<Value>(data), Bin(bin)| {info!("Received event: {:?} {:?}", data, bin);socket.bin(bin).emit("message-back", data).ok();},);socket.on("message-with-ack",|Data::<Value>(data), ack: AckSender, Bin(bin)| {info!("Received event: {:?} {:?}", data, bin);ack.bin(bin).send(data).ok();},);
}#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {tracing::subscriber::set_global_default(FmtSubscriber::default())?;let (layer, io) = SocketIo::new_layer();io.ns("/", on_connect);io.ns("/custom", on_connect);let app = axum::Router::new().route("/", get(|| async { "Hello, World!" })).layer(layer);info!("Starting server");let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();axum::serve(listener, app).await.unwrap();Ok(())
}
通过文档
https://docs.rs/socketioxide/latest/socketioxide/index.html
和一些搜素出来代码,需要
- 允许跨域
use tower::ServiceBuilder;
use tower_http::{cors::CorsLayer,cors::Any, services::ServeDir,add_extension::AddExtensionLayer };let app = axum::Router::new().route("/", get( list_keys)).route("/postmsg", post( handl_emit)).route("/ioonline", get( list_keys)).layer( ServiceBuilder::new().layer(CorsLayer::permissive())// Enable CORS policy.layer(layer))
使用可变State依靠axum里的example
all example axum axum github example readme
我要用的:key_vaule_store.rs axum Arc<RWloc>example
使用了,Arc,这个synce机制,有加锁的办法.
主要逻辑代码
声明结构
type SharedState = Arc<RwLock<State>>;#[derive(Default)]
struct State {db: HashMap<String, Bytes>,
}
引用实现初始化
.layer(ServiceBuilder::new().load_shed().concurrency_limit(1024).timeout(Duration::from_secs(10)).layer(TraceLayer::new_for_http()).layer(AddExtensionLayer::new(SharedState::default())).into_inner(),)
在hangdle里使用
async fn list_keys(Extension(state): Extension<SharedState>) -> String {let db = &state.read().unwrap().db;db.keys().map(|key| key.to_string()).collect::<Vec<String>>().join("\n")
}
axum 的handle参数传递是各种组合,据说可以获得客户IP我也没找到特别合适的,下面介绍.
提取client,IP
为了了解每个socketio客户的在线状态,需要提取其IP.作为一个身份标识,最后发现ip不能做id,每个socket client有.自己的id.根据id的connect,disconnect,记录到上面的state.db. id做key, ip@time做value.一个IP可以在刷新时,造成,socket.id的相关的state快速变动,而disconect事件,往往有推后到,新id上线. 如果IP做id, 新的socket,会被旧socket的disconnect搞下线
非代理,tcp,socket对方地址
在建立服务时获得引用,依靠HTTPExtension,这是原始tcp套接字的获取,
日常的app:::server
axum::serve(listener, app).await.unwrap();
获取地址信息的app::server
use axum::{extract::ConnectInfo,routing::get,Router,
};
use std::net::SocketAddr;let app = Router::new().route("/", get(handler));async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {format!("Hello {addr}")
}let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
信息来自,socketioxide的作者,他提供了很及时的帮助.
https://github.com/Totodore/socketioxide/issues/101
这种办法可行,随axum主版本更新,好过,第三方的extractor
比如这个,现在很新,但是我没有验证
https://github.com/imbolc/axum-client-ip
代理SocketRef里socket.req_parts.
适应docket容器,非HOST模式,和反向代理的复杂情况…
在代理模式下
在handle里的缺省参数,socketRef的函数socket.req_parts().headers.get(IPKEY)提取到请求信息其中的headers,中的,x-forwarded-for,代理补充的remote-ip数据 ,具体名字可以在调试后确定下来.这是一个通用协议
static IPKEY:&str="x-forwarded-for";
//static IPKEY:&str="host";
static UP_ON:&str="update_online";
fn on_connect(socket: SocketRef, Data(data): Data<Value>,HttpExtension(state): HttpExtension<SharedState>) {info!(ns = socket.ns(), ?socket.id, "Socket.IO connected");let clientip= match socket.req_parts().headers.get(IPKEY){Some(ipaddr)=> ipaddr.to_str().unwrap(),None=> "127.0.0.1"};let mut stalock =state.write().unwrap();stalock.db.insert(socket.id.to_string(), format!("{}@{}",clientip,day()));
axum的get,or,post,的handle中请求处理中使用emit发送消息.
在连接建立时,把socketRef.clone()存入,共享State.然后在get,orpost的axum route handle获取并使用.
主要用于通过url发送广播消息, 不同服务器间的消息传递.
flask–> rust,socketio->socketio client.
因为要完成flask的socketio的解耦.目前只想到了这个办法.
上面的方法,可能造成socketRef的过早释放.不适合做正常的用法.
下面是最终,使用, io,存入State解决.
#[derive(Default)]
struct State {db: HashMap<String, String>,socket:Option<SocketIo>,}
type SharedState = Arc<RwLock<State>>;io.ns("/", on_connect);io.ns("/chat", on_connect);let mut newstate= SharedState::default();newstate.write().unwrap().socket=Some(io);let app = axum::Router::new().route("/", get( list_keys)).route("/postmsg", post( handl_emit)).route("/ioonline", get( list_keys)).layer( ServiceBuilder::new().layer(AddExtensionLayer::new(newstate)).layer(CorsLayer::permissive())// Enable CORS policy.layer(layer));
得到io以后, 在handle使用,注意io.of是设定namespace, 就是在io.ns里,第一个参数.锁定某个空间.
async fn handl_emit(Extension(state): Extension<SharedState>,extract::Json(payload): extract::Json<emit_body>) {if let Some(io)= &state.read().unwrap().socket{io.of("/chat").unwrap().emit(&payload.room,&payload.msg).ok();}println!("/r.n /postmsg to room:{},msg:{}",&payload.room,&payload.msg);
}
async fn list_keys(Extension(state): Extension<SharedState>) -> Json<HashMap<String, String>> {let db = &state.read().unwrap().db;// let d:emit_body=emit_body{room:String::from("ddd"),msg:String::from("msg") };// db.keys()// .map(|key| format!("{}@{}|",key,db.get(key).unwrap()))// .collect::<Vec<String>>()// .join("\n");Json(db.clone())
}
这样在另一个web服务,flask里调用
requests.post('http://127.0.0.1:3002/postmsg', json={'room':'backmsg','msg':"asdfsdf"}).text
演示几个自己用的 消息处理
1 ,消息mess, 通知签到. null ,签到空提醒,update_online 上下线更新.
离线的处理
socket.on_disconnect(|socket: SocketRef, reason: DisconnectReason,HttpExtension(state): HttpExtension<SharedState>| async move {let clientip= match socket.req_parts().headers.get(IPKEY){Some(ipaddr)=> ipaddr.to_str().unwrap(),None=> "127.0.0.1"};state.write().unwrap().db.remove(&socket.id.to_string());if let Some((key, value)) =state.read().unwrap().db.iter().find(|(_, v)| (**v).contains(&clientip)){println!("找到了第一个符合条件的元素: key={}, value={}", key, value);} else {println!("没有找到符合条件的元素");socket.broadcast().emit(UP_ON,&make_stamap(&clientip,false)).ok();}// println!("Socket {:?} on ns {} disconnected, reason: {:?}","", socket.ns(), reason);});
2.web 中shell命令的异步输出.
- flask用线程调用了shell命令,获得标准输出,
- 调用回调函数,request post .
requests.post('http://127.0.0.1:3002/postmsg', json={'room':'backmsg','msg':"asdfsdf"}).text
- axum handle,调用state中的io,emit
if let Some(io)= &state.read().unwrap().socket{io.of("/chat").unwrap().emit(&payload.room,&payload.msg).ok();}println!("/r.n /postmsg to room:{},msg:{}",&payload.room,&payload.msg);
以上
更新到这里,主要注重 axum的socketio结合.而前期的是flask-socketio实现签到系统状态更新的前端后端的介绍.这里不再重复.
在或变实现过程中,为了省事,修改了html中的js代码. 精简了流程,梳理了过程.发现了一个在线汇总列表的漏洞.就是上面说的从ip为标识,变更为socket id为标识.
别的应该是性能和灵活度的提升. 二进制不依赖环境, 随地可运行. 算是一个进步.
再见.