基于Netty实现WebSocket客户端

本文是基于Netty快速上手WebSocket客户端,不涉及WebSocket的TLS/SSL加密传输。

WebSocket原理参考【WebSocket简介-CSDN博客】,测试用的WebSocket服务端也是用Netty实现的,参考【基于Netty实现WebSocket服务端-CSDN博客】

一、基于Netty快速实现WebSocket客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;import java.net.URI;
import java.util.concurrent.CountDownLatch;/*** https://blog.csdn.net/a1053765496/article/details/130701218* 基于Netty快速实现WebSocket客户端,不手动处理握手*/
@Slf4j
public class SimpleWsClient {final CountDownLatch latch = new CountDownLatch(1);public static void main(String[] args) throws Exception {SimpleWsClient client = new SimpleWsClient();client.test();}public void test() throws Exception {Channel dest = dest();latch.await();dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));}public Channel dest() throws Exception {final URI webSocketURL = new URI("ws://127.0.0.1:7070/helloWs");EventLoopGroup group = new NioEventLoopGroup();Bootstrap boot = new Bootstrap();boot.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).group(group).handler(new LoggingHandler(LogLevel.INFO)).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {ChannelPipeline pipeline = sc.pipeline();pipeline.addLast(new HttpClientCodec());pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(64 * 1024));pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())));pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)throws Exception {System.err.println(" 客户端收到消息======== " + msg.text());}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE.equals(evt)) {log.info(ctx.channel().id().asShortText() + " 握手完成!");latch.countDown();send(ctx.channel());}super.userEventTriggered(ctx, evt);}});}});ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();return cf.channel();}public static void send(Channel channel) {final String textMsg = "握手完成后直接发送的消息";if (channel != null && channel.isActive()) {TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {if (channelFuture.isDone() && channelFuture.isSuccess()) {log.info("     ================= 发送成功.");} else {channelFuture.channel().close();log.info("     ================= 发送失败. cause = " + channelFuture.cause());channelFuture.cause().printStackTrace();}});} else {log.error("消息发送失败! textMsg = " + textMsg);}}}

这里我们不手动进行握手,由Netty通过WebSocketClientProtocolHandler进行握手,但是我们要知道何时握手完成了。握手完成了我们才能进行正常的消息读写。

握手事件是在自定义的Handler中实现的,这里为了方便使用CountDownLatch,使用了匿名内部类SimpleChannelInboundHandler的方式。userEventTriggered这个方法会接收到所有的事件,其中就包括握手完成事件。

二、基于Netty,手动处理WebSocket握手信息:

客户端启动代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;/*** https://gitcode.com/ddean2009/learn-netty4/tree/master/src/main/java/com/flydean25/socketclient* https://www.flydean.com/25-netty-websocket-client* https://blog.csdn.net/superfjj/article/details/120648434* https://blog.csdn.net/twypx/article/details/84543518*/
public final class NettyWsClient {static final String URL = System.getProperty("url", "ws://127.0.0.1:7070/helloWs");public static void main(String[] args) throws Exception {URI uri = new URI(URL);final int port = uri.getPort();EventLoopGroup group = new NioEventLoopGroup();try {WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);}});Channel ch = b.connect(uri.getHost(), port).sync().channel();handler.handshakeFuture().sync();BufferedReader console = new BufferedReader(new InputStreamReader(System.in));while (true) {String msg = console.readLine();if (msg == null) {break;} else if ("再见".equalsIgnoreCase(msg)) {ch.writeAndFlush(new CloseWebSocketFrame());ch.closeFuture().sync();break;} else if ("ping".equalsIgnoreCase(msg)) {WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));ch.writeAndFlush(frame);} else {WebSocketFrame frame = new TextWebSocketFrame(msg);ch.writeAndFlush(frame);}}} finally {group.shutdownGracefully();}}
}

客户端Handler

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {private final WebSocketClientHandshaker handshaker;private ChannelPromise handshakeFuture;public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {this.handshaker = handshaker;}public ChannelFuture handshakeFuture() {return handshakeFuture;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {handshakeFuture = ctx.newPromise();}@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("channelActive, 进行handshake");handshaker.handshake(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.info("channelInactive!");}@Overridepublic void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {Channel ch = ctx.channel();if (!handshaker.isHandshakeComplete()) {try {handshaker.finishHandshake(ch, (FullHttpResponse) msg);log.info("websocket Handshake 完成!");handshakeFuture.setSuccess();} catch (WebSocketHandshakeException e) {log.info("websocket连接失败!");handshakeFuture.setFailure(e);}return;}if (msg instanceof FullHttpResponse) {FullHttpResponse response = (FullHttpResponse) msg;throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="+ response.content().toString(CharsetUtil.UTF_8) + ')');}WebSocketFrame frame = (WebSocketFrame) msg;if (frame instanceof TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;log.info("接收到TXT消息: " + textFrame.text());} else if (frame instanceof PongWebSocketFrame) {log.info("接收到pong消息");} else if (frame instanceof CloseWebSocketFrame) {log.info("接收到closing消息");ch.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 异常处理log.error("出现异常", cause);if (!handshakeFuture.isDone()) {handshakeFuture.setFailure(cause);}ctx.close();}
}

可以看到,我们这里也要手动标记握手完成,是在自定义的Handler的channelRead0方法中标记的。

测试的时候,我们可以看到,当客户端发送ping的时候,服务端会自动回pong,这个是有Netty实现的服务端自带的心跳机制。

2024-05-25 16:35:10 INFO [WebSocketClientHandler] channelActive, 进行handshake
2024-05-25 16:35:10 INFO [WebSocketClientHandler] websocket Handshake 完成!
123
2024-05-25 16:35:26 INFO [WebSocketClientHandler] 接收到TXT消息: 2024-35-25 04:05:26: 123
ping
2024-05-25 16:35:52 INFO [WebSocketClientHandler] 接收到pong消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/15712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker image上传至docker hub

要将 Docker 镜像上传到 Docker Hub&#xff0c;你需要遵循以下步骤&#xff1a; 登录到 Docker Hub&#xff1a; 确保你在 Docker Hub 上有一个账号。如果没有&#xff0c;请前往 Docker Hub (https://hub.docker.com/) 注册一个账号。 在终端中使用以下命令登录到 Docker …

Java中的弱引用与强引用

引用是Java中内存管理和垃圾回收机制的重要组成部分&#xff0c;Java 提供了多种类型的引用来允许开发者以不同的方式管理内存&#xff0c;其中最常用的是强引用&#xff08;strong reference&#xff09;和弱引用&#xff08;weak reference&#xff09;。以下是对这些引用类型…

【C++】牛客——BC157 素数回文

✨题目链接&#xff1a; BC157 素数回文 ✨题目描述 现在给出一个素数&#xff0c;这个素数满足两点&#xff1a; 只由1-9组成&#xff0c;并且每个数只出现一次&#xff0c;如13,23,1289。 位数从高到低为递减或递增&#xff0c;如2459&#xff0c;87631。 请你判断一下&…

python多进程multiprocessing卡住问题

一&#xff1a;背景 在使用多进程时&#xff0c;process.join()后面的代码并不会执行&#xff0c;一直卡在了第一个进程join()哪里不动。 环境&#xff1a;python3.8&#xff0c;centos7&#xff0c;multiprocessing库&#xff0c;使用mp.Queue() 二&#xff1a;调试过程 打…

从零开始搭建Springboot项目脚手架4:保存操作日志

目的&#xff1a;通过AOP切面&#xff0c;统一记录接口的访问日志 1、加maven依赖 2、 增加日志类RequestLog 3、 配置AOP切面&#xff0c;把请求前的request、返回的response一起记录 package com.template.common.config;import cn.hutool.core.util.ArrayUtil; import cn.hu…

单选或者多选的知识问题调研系统,怎么使用Neo4j的图数据库来实现

为了使用Neo4j的图数据库实现单选或多选的知识问题调研系统&#xff0c;你需要设计和实现以下几个步骤&#xff1a; 设计节点和关系插入数据定义查询和更新逻辑开发前端和后端应用来与Neo4j进行交互 1. 设计节点和关系 节点类型 Question&#xff1a;表示一个问题&#xff…

最新文章合集

GitHub宝藏项目&#xff1a;每天一个&#xff0c;让你的技术库增值不停&#xff01; STORM、SuperMemory、Awesome Chinese LLM、AI写作助手、资料搜集、文章生成、视角问题引导、模拟对话策略、内容导入、浏览器插件、资源库、开源微调模型 开发者必看&#xff1a;Linux终端…

Vue 3指令与事件处理

title: Vue 3指令与事件处理 date: 2024/5/25 18:53:37 updated: 2024/5/25 18:53:37 categories: 前端开发 tags: Vue3基础指令详解事件处理高级事件实战案例最佳实践性能优化 第1章 Vue 3基础 1.1 Vue 3简介 Vue 3 是一个由尤雨溪&#xff08;尤大&#xff09;领导的开源…

方言和大语言模型

方言多样性及其对语言模型的影响 语言的演变是不可避免的&#xff0c;反映并推动了重大的社会变革和传统。语言接触往往会推动我们说话方式的创新&#xff0c;在美国全球文化的影响下&#xff0c;一种新的叙事正在其语言织锦中展开。 例如&#xff0c;在佛罗里达州南部&#…

Qt 在windows下显示中文

Qt在windows平台上显示中文&#xff0c;简直是一门玄学&#xff0c;经过测试&#xff0c;有如下发现&#xff1a; 1&#xff0c; 环境&#xff1a;Qt 5.15.2 vs2019 64位 win11系统 默认用Qt 创建的文件使用utf-8编码格式&#xff0c;此环境下 中文没有问题 ui->textE…

Nginx实现负载均衡与故障检查自动切换

创作灵感来源于个人项目的一个稳定性规划&#xff0c;单节点的项目稳定性方面可能有很大的缺漏&#xff0c;因此需要升级为多节点&#xff0c;保证服务故障后&#xff0c;依然有其他服务可用&#xff0c;不会给前端用户造成影响。 &#xff08;前面讲选型&#xff0c;想直接看…

Kubernetes Service 之 ExternalName

Kubernetes Service 之 ExternalName ExternalName 定义 ExternalName 用来定义在不同的命名空间想要引用其它命名空间 Service 的别名&#xff0c;使得本空间 Service 名字在本空间更有区分度。 ExternalName 的使用 apiVersion: v1 kind: Service metadata:name: service…

IPv6 地址创建 EUI-64 格式接口 ID 的过程

IPv6 接口标识符 IPv6 地址中的接口标识符&#xff08;ID&#xff09;用于识别链路上的唯一接口&#xff0c;有时被称为 IPv6 地址的 “主机部分”。接口 ID 在链路上必须是唯一的&#xff0c;始终为 64 位长&#xff0c;并且可以根据数据链路层地址动态创建。 MAC 地址 中的…

Jenkins安装 :Aws EC2下Docker镜像安装

1 安装docker # 安装docker $ sudo yum install -y docker# 启动docker daemon $ sudo systemctl start docker# 用户加入docker组 $ sudo usermod -aG docker username 2 docker安装jenkins $ docker pull jenkins/jenkins:lts# 安装成功 $ docker images REPOSITORY …

逻辑这回事(一)----编码规范

说明&#xff1a;优先级是M的规则为强制项&#xff0c;优先级为R的规则为建议项。 通用约束 应有全局观念。 优先级&#xff1a;M 说明&#xff1a;你所编写的代码在成为最终硅片上的一部分之前&#xff0c;需要经过许多设计者利用各种各样的工具进行各种各样的处理。有时&…

解决vue3项目vite打包忽略.vue扩展名

项目打包时报could not relolve “...”&#xff0c;因为vite已不再默认忽略.vue扩展名。 解决方法如下&#xff1a; 在vite.config.js中配置vite使其忽略 .vue 扩展名&#xff08;不建议忽略&#xff09; 注意&#xff1a;即使忽略了.vue文件&#xff0c;在实际写的时候也要加…

达梦8 RLOG_COMPRESS_LEVEL参数对系统的影响

测试环境是一套主备达梦数据库。下面在主备库分别设置参数进行测试 测试一、 主库设置RLOG_COMPRESS_LEVEL9&#xff0c;备库设置为0。 分别删除主备库的归档日志后执行测试脚本 #当前时间 date disql SYSDBA/SYSDBA:1807 <<EOF #显示归档大小 select sum(free)/1024…

【独家揭秘!玩转ChatGPT?一文带你解锁秘籍!】

&#x1f680;【独家揭秘&#xff01;玩转ChatGPT&#xff1f;一文带你解锁秘籍&#xff01;】&#x1f680; &#x1f449; 【直达ChatGPT体验站】 ChatGPT&#xff0c;全称“Chat Generative Pre-trained Transformer”&#xff0c;是人工智能研究实验室OpenAI于2022年底推出…

HTTP 错误 404.15 - Not Found 请求筛选模块被配置为拒绝包含的查询字符串过长的请求。...

在做MVC站点时(使用IIS版本为7.5)&#xff0c;使用Get请求&#xff0c;当Url里查询字符串过长时&#xff0c;会出现如下错误&#xff1a; 出现该错误的原因为&#xff1a;IIS7.5对于Query String有长度限制&#xff0c;默认为2048。 按照图中可尝试的操作提示&#xff0c;可以…