【socketioxide和axum集成-实现websocket实时通信-Rust点滴】

socketioxide的axum集成

  • 启动socketio依靠examle里的layer
  • 使用可变State依靠axum里的example
  • 提取client,IP
    • 非代理,tcp,socket对方地址
    • 代理SocketRef里socket.req_parts.
  • axum的get,or,post,的handle中请求处理中使用emit发送消息.
  • 演示几个自己用的 消息处理
    • 1 ,消息mess, 通知签到. null ,签到空提醒,update_online 上下线更新.
    • 2.web 中shell命令的异步输出.

启动socketio依靠examle里的layer

https://github.com/Totodore/socketioxide
原版echo代码

use axum::routing::get;
use serde_json::Value;
use socketioxide::{extract::{AckSender, Bin, Data, SocketRef},SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;fn on_connect(socket: SocketRef, Data(data): Data<Value>) {info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);socket.emit("auth", data).ok();socket.on("message",|socket: SocketRef, Data::<Value>(data), Bin(bin)| {info!("Received event: {:?} {:?}", data, bin);socket.bin(bin).emit("message-back", data).ok();},);socket.on("message-with-ack",|Data::<Value>(data), ack: AckSender, Bin(bin)| {info!("Received event: {:?} {:?}", data, bin);ack.bin(bin).send(data).ok();},);
}#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {tracing::subscriber::set_global_default(FmtSubscriber::default())?;let (layer, io) = SocketIo::new_layer();io.ns("/", on_connect);io.ns("/custom", on_connect);let app = axum::Router::new().route("/", get(|| async { "Hello, World!" })).layer(layer);info!("Starting server");let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();axum::serve(listener, app).await.unwrap();Ok(())
}

通过文档
https://docs.rs/socketioxide/latest/socketioxide/index.html
和一些搜素出来代码,需要

  • 允许跨域
use tower::ServiceBuilder;
use tower_http::{cors::CorsLayer,cors::Any, services::ServeDir,add_extension::AddExtensionLayer };let app = axum::Router::new().route("/", get( list_keys)).route("/postmsg", post( handl_emit)).route("/ioonline", get( list_keys)).layer(     ServiceBuilder::new().layer(CorsLayer::permissive())// Enable CORS policy.layer(layer))

使用可变State依靠axum里的example

all example axum axum github example readme

我要用的:key_vaule_store.rs axum Arc<RWloc>example
使用了,Arc,这个synce机制,有加锁的办法.
主要逻辑代码
声明结构


type SharedState = Arc<RwLock<State>>;#[derive(Default)]
struct State {db: HashMap<String, Bytes>,
}

引用实现初始化

  .layer(ServiceBuilder::new().load_shed().concurrency_limit(1024).timeout(Duration::from_secs(10)).layer(TraceLayer::new_for_http()).layer(AddExtensionLayer::new(SharedState::default())).into_inner(),)

在hangdle里使用

async fn list_keys(Extension(state): Extension<SharedState>) -> String {let db = &state.read().unwrap().db;db.keys().map(|key| key.to_string()).collect::<Vec<String>>().join("\n")
}

axum 的handle参数传递是各种组合,据说可以获得客户IP我也没找到特别合适的,下面介绍.

提取client,IP

为了了解每个socketio客户的在线状态,需要提取其IP.作为一个身份标识,最后发现ip不能做id,每个socket client有.自己的id.根据id的connect,disconnect,记录到上面的state.db. id做key, ip@time做value.一个IP可以在刷新时,造成,socket.id的相关的state快速变动,而disconect事件,往往有推后到,新id上线. 如果IP做id, 新的socket,会被旧socket的disconnect搞下线

非代理,tcp,socket对方地址

在建立服务时获得引用,依靠HTTPExtension,这是原始tcp套接字的获取,
日常的app:::server

 axum::serve(listener, app).await.unwrap();

获取地址信息的app::server

use axum::{extract::ConnectInfo,routing::get,Router,
};
use std::net::SocketAddr;let app = Router::new().route("/", get(handler));async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {format!("Hello {addr}")
}let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();

信息来自,socketioxide的作者,他提供了很及时的帮助.
https://github.com/Totodore/socketioxide/issues/101
这种办法可行,随axum主版本更新,好过,第三方的extractor
比如这个,现在很新,但是我没有验证
https://github.com/imbolc/axum-client-ip

代理SocketRef里socket.req_parts.

适应docket容器,非HOST模式,和反向代理的复杂情况…

在代理模式下
在handle里的缺省参数,socketRef的函数socket.req_parts().headers.get(IPKEY)提取到请求信息其中的headers,中的,x-forwarded-for,代理补充的remote-ip数据 ,具体名字可以在调试后确定下来.这是一个通用协议

static IPKEY:&str="x-forwarded-for";
//static IPKEY:&str="host";
static UP_ON:&str="update_online";
fn on_connect(socket: SocketRef, Data(data): Data<Value>,HttpExtension(state): HttpExtension<SharedState>) {info!(ns = socket.ns(), ?socket.id,   "Socket.IO connected");let clientip= match socket.req_parts().headers.get(IPKEY){Some(ipaddr)=> ipaddr.to_str().unwrap(),None=> "127.0.0.1"};let mut stalock =state.write().unwrap();stalock.db.insert(socket.id.to_string(), format!("{}@{}",clientip,day()));

axum的get,or,post,的handle中请求处理中使用emit发送消息.

在连接建立时,把socketRef.clone()存入,共享State.然后在get,orpost的axum route handle获取并使用.
主要用于通过url发送广播消息, 不同服务器间的消息传递.
flask–> rust,socketio->socketio client.

因为要完成flask的socketio的解耦.目前只想到了这个办法.
上面的方法,可能造成socketRef的过早释放.不适合做正常的用法.
下面是最终,使用, io,存入State解决.

#[derive(Default)] 
struct State {db: HashMap<String, String>,socket:Option<SocketIo>,}
type SharedState = Arc<RwLock<State>>;io.ns("/", on_connect);io.ns("/chat", on_connect);let mut newstate=  SharedState::default();newstate.write().unwrap().socket=Some(io);let app = axum::Router::new().route("/", get( list_keys)).route("/postmsg", post( handl_emit)).route("/ioonline", get( list_keys)).layer(     ServiceBuilder::new().layer(AddExtensionLayer::new(newstate)).layer(CorsLayer::permissive())// Enable CORS policy.layer(layer));

得到io以后, 在handle使用,注意io.of是设定namespace, 就是在io.ns里,第一个参数.锁定某个空间.

async fn handl_emit(Extension(state): Extension<SharedState>,extract::Json(payload): extract::Json<emit_body>)  {if let  Some(io)= &state.read().unwrap().socket{io.of("/chat").unwrap().emit(&payload.room,&payload.msg).ok();}println!("/r.n /postmsg to room:{},msg:{}",&payload.room,&payload.msg); 
}
async fn list_keys(Extension(state): Extension<SharedState>) -> Json<HashMap<String, String>> {let db = &state.read().unwrap().db;// let d:emit_body=emit_body{room:String::from("ddd"),msg:String::from("msg") };// db.keys()//     .map(|key| format!("{}@{}|",key,db.get(key).unwrap()))//     .collect::<Vec<String>>()//     .join("\n");Json(db.clone())
}

这样在另一个web服务,flask里调用

requests.post('http://127.0.0.1:3002/postmsg', json={'room':'backmsg','msg':"asdfsdf"}).text

演示几个自己用的 消息处理

1 ,消息mess, 通知签到. null ,签到空提醒,update_online 上下线更新.

离线的处理

  socket.on_disconnect(|socket: SocketRef, reason: DisconnectReason,HttpExtension(state): HttpExtension<SharedState>| async move {let clientip= match socket.req_parts().headers.get(IPKEY){Some(ipaddr)=> ipaddr.to_str().unwrap(),None=> "127.0.0.1"};state.write().unwrap().db.remove(&socket.id.to_string());if let Some((key, value)) =state.read().unwrap().db.iter().find(|(_, v)| (**v).contains(&clientip)){println!("找到了第一个符合条件的元素: key={}, value={}", key, value);} else {println!("没有找到符合条件的元素");socket.broadcast().emit(UP_ON,&make_stamap(&clientip,false)).ok();}//  println!("Socket {:?} on ns {} disconnected, reason: {:?}","", socket.ns(), reason);});

2.web 中shell命令的异步输出.

  • flask用线程调用了shell命令,获得标准输出,
  • 调用回调函数,request post .
requests.post('http://127.0.0.1:3002/postmsg', json={'room':'backmsg','msg':"asdfsdf"}).text
  • axum handle,调用state中的io,emit
 if let  Some(io)= &state.read().unwrap().socket{io.of("/chat").unwrap().emit(&payload.room,&payload.msg).ok();}println!("/r.n /postmsg to room:{},msg:{}",&payload.room,&payload.msg); 

以上
更新到这里,主要注重 axum的socketio结合.而前期的是flask-socketio实现签到系统状态更新的前端后端的介绍.这里不再重复.
在或变实现过程中,为了省事,修改了html中的js代码. 精简了流程,梳理了过程.发现了一个在线汇总列表的漏洞.就是上面说的从ip为标识,变更为socket id为标识.
别的应该是性能和灵活度的提升. 二进制不依赖环境, 随地可运行. 算是一个进步.

再见.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/892079.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

湖南引力:低代码技术助力军工企业实现设备管理系统创新

背景介绍 在核工业相关生产领域&#xff0c;随着技术的持续进步&#xff0c;生产活动对设备的依赖性日益增强。随着企业规模的不断扩大&#xff0c;所涉及的设备数量和种类也在急剧增长&#xff0c;这使得传统的设备管理模式逐渐显得力不从心。企业当前的设备管理主要依赖人工…

【701. 二叉搜索树中的插入操作 中等】

题目&#xff1a; 给定二叉搜索树&#xff08;BST&#xff09;的根节点 root 和要插入树中的值 value &#xff0c;将值插入二叉搜索树。 返回插入后二叉搜索树的根节点。 输入数据 保证 &#xff0c;新值和原始二叉搜索树中的任意节点值都不同。 注意&#xff0c;可能存在多…

VR+智慧消防一体化决策平台

随着科技的飞速发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术与智慧城市建设的结合越来越紧密。在消防安全领域&#xff0c;VR技术的应用不仅能够提升消防训练的效率和安全性&#xff0c;还能在智慧消防一体化决策平台中发挥重要作用。本文将探讨“VR智慧消防一体化…

nginx http反向代理

系统&#xff1a;Ubuntu_24.0.4 1、安装nginx sudo apt-get update sudo apt-get install nginx sudo systemctl start nginx 2、配置nginx.conf文件 /etc/nginx/nginx.conf&#xff0c;但可以在 /etc/nginx/sites-available/ 目录下创建一个新的配置文件&#xff0c;并在…

arcgisPro加载CGCS2000天地图后,如何转成米单位

1、导入加载的天地图影像服务&#xff0c;一开始是经纬度显示的。 2、右键地图&#xff0c;选择需要调整的投影坐标&#xff0c;这里选择坐标如下&#xff1a; 3、点击确定后&#xff0c;就可以调整成米单位的了。 4、切换后结果如下&#xff1a; 如有需要&#xff0c;可调整成…

计算机的错误计算(二百零四)

摘要 利用两个大模型判断&#xff1a;在(0, ) 范围内&#xff0c; 和 等价吗&#xff1f;实验表明&#xff0c;两个大模型&#xff08;其中一个是数学大模型&#xff09;均在输出幻觉&#xff0c;均说等价&#xff01; 例1. 在(0, ) 范围内&#xff0c; 和 等价吗&#xf…

简单的jmeter数据请求学习

简单的jmeter数据请求学习 1.需求 我们的流程服务由原来的workflow-server调用wfms进行了优化&#xff0c;将wfms服务操作并入了workflow-server中&#xff0c;去除了原来的webservice服务调用形式&#xff0c;增加了并发处理&#xff0c;现在想测试模拟一下&#xff0c;在一…

Unity3D仿星露谷物语开发17之空库存栏UI

1、目标 将库存栏放在游戏界面中&#xff0c;一般情况下角色居中展示时库存栏在底部&#xff0c;当角色位于界面下方时库存栏展示在顶部避免遮挡。 2、CanvasGroup组件 用于集中控制UI元素的透明度、交互性和射线投射行为。CanvasGroup的Alpha属性允许渐变效果&#xff0c;I…

现代谱估计的原理及MATLAB仿真(二)(AR模型法、MVDR法、MUSIC法)

现代谱估计的原理及MATLAB仿真AR参数模型法&#xff08;参数模型功率谱估计&#xff09;、MVDR法&#xff08;最小方差无失真响应法&#xff09;、MUSIC法&#xff08;多重信号分类法&#xff09; 文章目录 前言一、AR参数模型1 原理2 MATLAB仿真 二、MVDR法1 原理2 MATLAB仿真…

交换机划分Vlan配置

交换机划分Vlan配置 实验目标 理解虚拟LAN(VLAN)基本配置&#xff1b;掌握一般交换机按端口划分VLAN的配置方法&#xff1b;掌握Tag VLAN配置方法。 实验背景 某一公司内财务部、销售部的PC通过2台交换机实现通信&#xff1b;要求财务部和销售部的PC可以互通&#xff0c;但…

《Opencv》信用卡信息识别项目

目录 一、项目介绍 二、数据材料介绍 1、模板图片&#xff08;1张&#xff09; 2、需要处理的信用卡图片&#xff08;5张&#xff09; 三、实现过程 1、导入需要用到的库 2、设置命令行参数 3、模板图像中数字的定位处理 4、信用卡图像处理 5、模板匹配 四、总结 一…

.NET AI 开发人员库 --AI Dev Gallery简单示例--问答机器人

资源及介绍接上篇 nuget引用以下组件 效果展示&#xff1a; 内存和cpu占有&#xff1a; 代码如下&#xff1a;路径换成自己的模型路径 模型请从上篇文尾下载 internal class Program{private static CancellationTokenSource? cts;private static IChatClient? model;privat…

特种设备安全管理人员免费题库限时练习(判断题)

56.(判断题)特别重大事故、重大事故、较大事故和一般事故,负责事故调查的人民政府应当自收到事故调查报告之日起15日内做出批复。 A.正确 B.错误 答案:错误 57.(判断题)每一类事故灾难的应急救援措施可能千差万别,因此其基本应急模式是不一致的。 A.正确 B.错误 答案:错…

在Virtuoso中使用Clisoft SOS

在Virtuoso中使用Clisoft SOS 由于本人也是刚接触&#xff0c;后续用到其他的再进行更新&#xff0c;博客中可能有地方写的不好&#xff0c;欢迎大佬指点。 一、打开virtuoso 创建一个cds.lib&#xff08;不受SOS版本控制&#xff09; [bhlumaster /proj/trinity/work/cds/bh…

Android Audio基础(53)——PCM逻辑设备Write数据

1. 前言 本文,我们将以回放(Playback,播放音频)为例,讲解PCM Data是如何从用户空间到内核空间,最后传递到Codec。 在 ASoC音频框架简介中,我们给出了回放(Playback)PCM数据流示意图。: 对于Linux来说,由于分为 user space 和kernel space,而且两者之间数据不能随便…

算命网站源码PHP框架_附2025新版设计书教程

算命网站源码PHP设计书 1. 项目概述 1.1 项目背景 随着互联网的发展&#xff0c;越来越多的人对命理和占卜产生了兴趣。算命网站可以为用户提供个性化的命理分析、运势预测等服务。本项目旨在设计一个基于PHP的算命网站&#xff0c;方便用户在线获取命理服务。 1.2 项目目标…

【Linux】硬链接和软连接(符号连接)

目录 硬链接 软连接 硬链接和软连接的区别 硬链接 ln根据linux系统分配给文件inode(ls -li)进行建立&#xff0c;没办法跨越文件系统 格式&#xff1a;ln 被链接的文件(源文件) 生成的链接文件(目标文件) 1) 硬链接的属性 - 相当于生成一个副本 起别名 2) 修改内容都变化…

后台管理系统全屏功能实现

后台管理系统中有一个比较常见的功能就是全屏显示&#xff0c;以方便用最大的屏幕查看系统&#xff0c;特别是在小屏模式下。 对于 screenfull 而言&#xff0c;浏览器本身已经提供了对用的 API&#xff0c;点击这里即可查看&#xff0c;这个 API 中&#xff0c;主要提供了两个…

Ensp基础实验---同网段PC以及网关之间的通信

通过安装ENSP&#xff0c;可以模拟搭建网络仿真环境&#xff0c;初步了解ENSP之后&#xff0c;可以进行一些简单的网络拓扑搭建&#xff0c;通过对相关设备的配置&#xff0c;实现网络畅通的目的 此次模拟的是同一个网段内&#xff0c;两台PC之间的通信情况&#xff0c;同时选用…

WinDbg内存泄露追踪

随着win sdk一并安装了&#xff0c;可以在C:\Program Files (x86)\Windows Kits\10\Debuggers\x64 里找到 管理员运行cmd 配置跟踪 cd C:\Program Files (x86)\Windows Kits\10\Debuggers\x64 gflags.exe设置待跟踪的应用程序 gflags.exe /i D:\XXXX.exe ust运行应用程序&am…