Java通过Netty,实现Websocket消息推送只需要简单几步

前言

曾几何时,不知道大家有没有在项目里遇到过需要服务端给客户端推送消息的需求,是否曾经苦恼过、纠结过,我们知道要想实现这样的需求肯定离不开websocket长连接方式,那么到底是该选原生的websocket还是更加高级的netty框架呢?

在此我极力推荐netty,因为一款好的框架一般都是在原生的基础上进行包装成更好、更方便、更实用的东西,很多我们需要自己考虑的问题都基本可以不用去考虑,不过此文不会去讲netty有多么的高深莫测,因为这些概念性的东西随处可见,而是通过实战来达到推送消息的目的。

实战

一、逻辑架构图

从图中可以看出本次实战的基本流程是客户端A请求服务端核心模块,核心模块生产一条消息到消息队列,然后服务端消息模块消费消息,消费完之后就将消息推送给客户端B,流程很简单,没有太多技巧,唯一的巧妙之处就在消息模块这边的处理上,本文的重点也主要讲解消息模块这一块,主要包括netty server、netty client、channel的存储等等。

二、代码

1、添加依赖

<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.6.Finalversion>
dependency>

2、NettyServer类

@Service
public class NettyServer {public void run(int port){new Thread(){public void run(){runServer(port);}}.start();}private void runServer(int port){Print.info("===============Message服务端启动===============");EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(new ChannelInitializer() {protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("codec-http", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(65536));pipeline.addLast("handler", new MyWebSocketServerHandler());}});Channel ch = b.bind(port).sync().channel();Print.info("Message服务器启动成功:" + ch.toString());ch.closeFuture().sync();} catch (Exception e){Print.error("Message服务运行异常:" + e.getMessage());e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();Print.info("Message服务已关闭");}}
}

3、MyWebSocketServerHandler类

public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{private static final String WEBSOCKET_PATH = "";private WebSocketServerHandshaker handshaker;@Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest){//以http请求形式接入,但是走的是websockethandleHttpRequest(ctx, (FullHttpRequest) msg);}else if (msg instanceof  WebSocketFrame){//处理websocket客户端的消息handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {//要求Upgrade为websocket,过滤掉get/Postif (!req.decoderResult().isSuccess()|| (!"websocket".equals(req.headers().get("Upgrade")))) {//若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:9502/websocket", null, false);handshaker = wsFactory.newHandshaker(req);if (handshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else {handshaker.handshake(ctx.channel(), req); }}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {// Check for closing frame if (frame instanceof CloseWebSocketFrame) {handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; }if (frame instanceof PingWebSocketFrame) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; }if (!(frame instanceof TextWebSocketFrame)) {Print.error("数据帧类型不支持!"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); }// Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); Print.info("Netty服务器接收到的信息: " + request); if (request.equals(Const.HEARTBEAT)){ctx.channel().write(new TextWebSocketFrame(request)); return; }JSONObject jsonData = JSONObject.parseObject(request); String eventType = jsonData.getString("event_type"); String apiToken = jsonData.getString("api_token"); if (Const.FRONT.equals(eventType)){Print.info("front event"); ChannelSupervise.updateChannel(apiToken, ctx.channel()); }else if (Const.BEHIND.equals(eventType)){Print.info("behind event"); Channel chan = ChannelSupervise.findChannel(apiToken); if (null == chan){Print.error("目标用户不存在"); }else {JSONObject jsonMsg = new JSONObject(); jsonMsg.put("type", jsonData.get("type")); jsonMsg.put("child_type", jsonData.get("child_type")); jsonMsg.put("title", jsonData.get("title")); jsonMsg.put("body", jsonData.get("body")); ChannelSupervise.sendToSimple(apiToken, new TextWebSocketFrame(jsonMsg.toString())); Print.info("向目标用户发送成功"); }}else{Print.error("event type error"); }}private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {// 返回应答给客户端 if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); }ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 if (!isKeepAlive(req) || res.status().code() != 200) {f.addListener(ChannelFutureListener.CLOSE); }}@Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace(); ctx.close(); }private static String getWebSocketLocation(FullHttpRequest req) {return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH; }/** * 接收客户端连接事件 */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {Print.info("客户端与服务端连接开启:" + ctx.channel()); ChannelSupervise.addChannel(null, ctx.channel()); }/** * 接收客户端关闭事件 */    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {Print.info("客户端与服务端连接关闭:" + ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); }}

4、ChannelSupervise类

public class ChannelSupervise {private   static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private  static ConcurrentMapChannelMap = new ConcurrentHashMap();public  static void addChannel(String apiToken, Channel channel){GlobalGroup.add(channel);if (null != apiToken) {ChannelMap.put(apiToken, channel.id());}}public static void updateChannel(String apiToken, Channel channel){Channel chan = GlobalGroup.find(channel.id());if (null == chan){addChannel(apiToken, channel);}else {ChannelMap.put(apiToken, channel.id());}}public static void removeChannel(Channel channel){GlobalGroup.remove(channel);Collectionvalues = ChannelMap.values();values.remove(channel.id());}public static Channel findChannel(String apiToken){ChannelId chanId = ChannelMap.get(apiToken);if (null == chanId){return null;}return GlobalGroup.find(ChannelMap.get(apiToken));}public static void sendToAll(TextWebSocketFrame tws){GlobalGroup.writeAndFlush(tws);}public static void sendToSimple(String apiToken, TextWebSocketFrame tws){GlobalGroup.find(ChannelMap.get(apiToken)).writeAndFlush(tws);}
}

5、NettyClient类

@Servicepublic class NettyClient {private Channel channel;public void run(String strUri){new Thread(){public void run(){runClient(strUri);}}.start();private void runClient(String strUri) {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();URI uri = new URI(strUri);String protocol = uri.getScheme();if (!"ws".equals(protocol)) {throw new IllegalArgumentException("Unsupported protocol: " + protocol);}HttpHeaders customHeaders = new DefaultHttpHeaders();customHeaders.add("MyHeader", "MyValue");// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.// If you change it to V00, ping is not supported and remember to change// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.final MyWebSocketClientHandler handler =new MyWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, customHeaders)); b.group(group); b.channel(NioSocketChannel.class); b.handler(new ChannelInitializer() {@Overpublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); pipeline.addLast("ws-handler", handler); }}); Print.info("===============Message客户端启动==============="); channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); channel.closeFuture().sync(); } catch (Exception e){Print.error(e.getMessage()); } finally {group.shutdownGracefully(); }}

6、MyWebSocketClientHandler类

public class MyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {private final WebSocketClientHandshaker handshaker;private ChannelPromise handshakeFuture;public MyWebSocketClientHandler(WebSocketClientHandshaker handshaker) {this.handshaker = handshaker;}public ChannelFuture handshakeFuture() {return handshakeFuture;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {handshakeFuture = ctx.newPromise();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {handshaker.handshake(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Print.info("webSocket client disconnected!");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Channel ch = ctx.channel();if (!handshaker.isHandshakeComplete()) {handshaker.finishHandshake(ch, (FullHttpResponse) msg);Print.info("websocket client connected!");handshakeFuture.setSuccess();return;}if (msg instanceof FullHttpResponse) {FullHttpResponse response = (FullHttpResponse) msg;throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');}WebSocketFrame frame = (WebSocketFrame) msg;if (frame instanceof TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;Print.info("客户端收到消息: " + textFrame.text());} else if (frame instanceof PongWebSocketFrame) {Print.info("websocket client received pong");} else if (frame instanceof CloseWebSocketFrame) {Print.info("websocket client received closing");ch.close();}}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();if (!handshakeFuture.isDone()) {handshakeFuture.setFailure(cause);}ctx.close();}}

7、启动类

@SpringBootApplication
@Servicepublic
class MessageApplication {@Autowiredprivate NettyServer server;@Autowiredprivate NettyClient client;public static void main(String[] args) {SpringApplication.run(MessageApplication.class, args);}@PostConstructpublic void initMessage(){server.run(9502);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}client.run("ws://localhost:" + 9502);}

8、客户端B测试页面

<html><head><meta charset="UTF-8"><title>WebSocket Chattitle>head><body><script type="text/javascript">var socket;if (!window.WebSocket) {window.WebSocket = window.MozWebSocket;}if (window.WebSocket) {socket = new WebSocket("ws://localhost:9502");socket.onmessage = function(event) {var ta = document.getElementById('responseText');ta.value = ta.value + '\n' + event.data};socket.onopen = function(event) {var ta = document.getElementById('responseText');ta.value = "连接开启!";};socket.onclose = function(event) {var ta = document.getElementById('responseText');ta.value = ta.value + "连接被关闭";};} else {alert("你的浏览器不支持 WebSocket!");}function send(message) {if (!window.WebSocket) {return;}if (socket.readyState == WebSocket.OPEN) {socket.send(message);} else {alert("连接没有开启.");}}script><form onsubmit="return false;"><h3>WebSocket:h3><textarea id="responseText" style="width: 500px; height: 300px;">textarea><br><input type="text" name="message"  style="width: 300px" value="1"><input type="button" value="发送消息" onclick="send(this.form.message.value)"><input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">form><br>body>
html>

三、测试

1、先运行启动类,此时会先启动netty服务器,然后启动一个netty客户端,然后过30s模拟客户端A进行消息发送

2、打开测试页面,在底下的输入框输入:{"event_type":"front", "api_token":"11111"},表示客户端B连接上netty服务器

测试结果如下:

消息模块:

客户端B:

四、结束语

本文只是抛砖引玉,主要启发有类似需求的朋友知道怎么去存储channel,进而怎么给指定客户推送消息,如果想要进行大型项目的高并发、可靠稳定地使用,还需进一步地改进。

作者:都市心声

来源:toutiao.com/i6794445371457143307

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/271822.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

53.Maximum Subarray

/** 53.Maximum Subarray * 2016-5-7 by Mingyang * 如果我们从头遍历这个数组。对于数组中的其中一个元素&#xff0c;它只有两个选择&#xff1a; 1.* 要么加入之前的数组加和之中&#xff08;跟别人一组&#xff09; * 2. 要么自己单立一个数组&#xff08;自己单开一组&…

java 创建者设计模式_Java设计模式之创建者模式分享热爱编程,程序人生

PS:今天的23中设计模式中的创建者方式&#xff0c;至此告一段落。我今天带来的技术分享为创建者模式以及原型模式。当然在Java中这两种方式很常见&#xff0c;只不过我们写的次数确实有点低而已&#xff0c;但是这不是我不学它的借口&#xff01;&#xff01;&#xff01;创建者…

一文读懂电感器的原理、结构、作用及分类

电感器是能够把电能转化为磁能而存储起来的元件。电感器的结构类似于变压器&#xff0c;但只有一个绕组。电感器具有一定的电感&#xff0c;它只阻碍电流的变化。 如果电感器在没有电流通过的状态下&#xff0c;电路接通时它将试图阻碍电流流过它&#xff1b;如果电感器在有电流…

final关键字与static对比

final关键字与static对比 static关键字修饰变量时&#xff0c;会使该变量在类加载时就会被初始化&#xff0c;不会因为对象的创建再次被加载&#xff0c;当变量被static 修饰时就代表该变量只会被初始化一次 例如图中所示&#xff0c;被static修饰的变量j&#xff0c;虽然创建…

juce中的BailOutChecker

界面库中值得注意的一点就是对象响应事件的时候自身被删除了&#xff0c;那么后续的访问自然就会出问题&#xff0c;所以需要在响应事件之后先添加引用&#xff0c;相关处理之后再查看自身是否已经被删除&#xff0c;如果已经被删除那么就直接退出。juce中通过BailOutChecker来…

java quartz 跳过_Java Quartz计划作业-禁止同时执行作业

我正在使用Quartz Job执行特定任务。我也在我的Main应用程序类中安排它的执行&#xff0c;而我试图完成的工作是不允许同时执行此作业的实例。因此&#xff0c;调度程序仅应在其先前实例完成后才执行作业。这是我的工作班级&#xff1a;public class MainJob implements Job {s…

mac USB串口工具配置

安装USB serial 驱动 我的usb serial芯片是 pl2303, 先到官网上下载对应驱动&#xff0c;并安装。安装完成之后会要求重启。 http://www.prolific.com.tw/admin/Technology/GetFile.ashx?fileID238 安装 minicom https://alioth.debian.org/projects/minicom/ 下载源码&…

macpro生成公钥并查看公钥

打开macpro的终端输入以下命令&#xff1a; $ cd ~/.ssh $ ls 此时发现没有那个id_rsa.pub文件&#xff0c;没有&#xff0c;就需要创建公钥 用ssh-keygen创建公钥 此时已经有了

java join 源码_join on 和where 一起使用的细节

left join :左连接&#xff0c;返回左表中所有的记录以及右表中连接字段相等的记录。right join :右连接&#xff0c;返回右表中所有的记录以及左表中连接字段相等的记录。inner join: 内连接&#xff0c;又叫等值连接&#xff0c;只返回两个表中连接字段相等的行。full join:外…

SSIS 学习之旅 FTP访问类

这章把脚本任务访问FTP的方法 全部给大家。 控件的使用大家如果有不懂得可以看下我之前的文章。第一章&#xff1a;SSIS 学习之旅 第一个SSIS 示例&#xff08;一&#xff09;&#xff08;上&#xff09; 第二章&#xff1a;SSIS 学习之旅 第一个SSIS 示例&#xff08;二&#…

Spring Cloud Feign 使用Apache的HTTP Client替换Feign原生httpclient

http 连接池能提升性能 http 的背景原理 a. 两台服务器建立 http 连接的过程是很复杂的一个过程&#xff0c;涉及到多个数据包的交换&#xff0c;并且也很耗时间。 b. Http 连接需要的 3 次握手 4 次分手开销很大&#xff0c;这一开销对于大量的比较小的 http 消息来说更大。…

Java容器坐标起点_Java的屏幕坐标是以像素为单位,容器的左下角被确定为坐标的起点...

【单选题】【单选题】【单选题】class A{ int x1; void func1(int x1){ this.x1 x1; } } 关于上述程序,说法错误的是( )【单选题】浏览器的作用是( )。【判断题】构建大学生心理危机预警及干预工作机制,更好地帮助有严重心理问题的学生度过心理难关,及早预防、及时疏导、有效干…

自媒体工具:文本内容转音频文件实用小工具

目录 ​编辑 1、软件介绍 2、软件技术框架 3、使用说明 4、核心代码文件 5、注意事项 1、软件介绍 文本内容转转音频文件小工具&#xff0c;采用C#编程语言&#xff0c;基于Framework4.5开发&#xff0c;主要采用百度语音识别SDK&#xff0c;实现了在线文本内容转音频文件的功能…

IDEA 创建 SpringCloud项目-多项目方式

SpringCloud 虽然可以用多模块化的方式来创建&#xff0c;但是&#xff0c;SpirngCloud本身就是为分布式而准备的&#xff0c;如果使用多模块的话&#xff0c;那就是一个项目&#xff0c;偏离了分布式的概念。所以工程上还是常用多项目的方式&#xff0c;这样才可以分开布署各个…

php位运算重要吗,PHP位运算的用途

下面为大家带来一篇PHP位运算的用途。现在就分享给大家&#xff0c;也给大家做个参考。一起过来看看吧在实际应用中可以做用户权限的应用我这里说到的权限管理办法是一个普遍采用的方法&#xff0c;主要是使用到”位运行符”操作&#xff0c;& 位与运算符、| 位或运行符。参…

盘点6款实用的文件对比工具,你都用过吗?

❤️作者主页&#xff1a;IT技术分享社区 ❤️作者简介&#xff1a;大家好,我是IT技术分享社区的博主&#xff0c;从事C#、Java开发九年&#xff0c;对数据库、C#、Java、前端、运维、电脑技巧等经验丰富。 ❤️个人荣誉&#xff1a; 数据库领域优质创作者&#x1f3c6;&#x…

aggregations 详解1(概述)

aggregation分类 aggregations —— 聚合&#xff0c;提供了一种基于查询条件来对数据进行分桶、计算的方法。有点类似于 SQL 中的 group by 再加一些函数方法的操作。 聚合可以嵌套&#xff0c;由此可以组成复杂的操作&#xff08;Bucketing聚合可以包含sub-aggregation&#…

IDEA开发中,类的头位置生成作者时间信息

点击 File > Settings > File and Code Templates > Class按照图中步骤添加如下信息 #if (${PACKAGE_NAME} && ${PACKAGE_NAME} ! "")package ${PACKAGE_NAME};#end #parse("File Header.java") /** * Author WangZeyu * Date ${…

提现接口网站 php,API提现接口

>获取提现积分的类型&#xff0c;在后台可以设置某种积分可被提现&#xff0c;此处获取的数据为可提现积分的类型~~~[api]get:/index.php/accounts/Apipoint/member_withdrawal_listint:type 0#是否智能限制提现积分类型&#xff0c;0&#xff1a;不智能&#xff0c;1&#…

数据库:PostgreSQL 和 MySQL对比

比较版本&#xff1a;PostgreSQL 11 VS MySQL5.7&#xff08;innodb引擎&#xff09; Oracle官方社区版版权情况&#xff1a;PostgreSQL 11&#xff08;免费开源&#xff09;、MySQL5.7 Oracle官方社区版&#xff08;免费开源&#xff09; 1. CPU限制 PGSQL没有CPU核心数限制&a…