基于Netty实现TCP通信

创建一个Maven项目添加下面依赖

    <dependencies><!-- 日志依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><optional>true</optional></dependency><dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-mapper-asl</artifactId><version>1.9.13</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.84.Final</version></dependency></dependencies>

编码解码器

package com.example.nettydemo.coder;import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;import java.nio.charset.StandardCharsets;public class NettyEncoder extends MessageToByteEncoder<String> {public NettyEncoder() {}@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {byte[] byteMsg = msg.getBytes(StandardCharsets.UTF_8);int msgLength = byteMsg.length;ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length);buf.writeInt(msgLength);buf.writeBytes(byteMsg, 0, msgLength);out.writeBytes(buf);buf.release();}
}
package com.example.nettydemo.coder;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;
import java.util.List;@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int beginReader = in.readerIndex();int dataLength = in.readInt();if (in.readableBytes() < dataLength) {in.readerIndex(beginReader);} else {byte[] data = new byte[dataLength];in.readBytes(data);String str = new String(data, 0, dataLength, StandardCharsets.UTF_8);out.add(str);}}
}

服务端

package com.example.nettydemo.server;import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.ObjectMapper;import java.io.IOException;
import java.util.Map;@Slf4j
public class TcpServer {private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private ServerBootstrap server;private ChannelFuture channelFuture;private Integer port;public TcpServer(Integer port) {this.port = port;// nio连接处理池this.bossGroup = new NioEventLoopGroup();// 处理事件池this.workerGroup = new NioEventLoopGroup();server = new ServerBootstrap();server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 自定义处理类ch.pipeline().addLast(new NettyDecoder());ch.pipeline().addLast(new NettyEncoder());ch.pipeline().addLast(new TcpServerHandler());}});server.option(ChannelOption.SO_BACKLOG, 128);server.childOption(ChannelOption.SO_KEEPALIVE, true);}public synchronized void startListen() {try {// 绑定到指定端口channelFuture = server.bind(port).sync();log.info("netty服务器在[{}]端口启动监听", port);} catch (Exception e) {log.error("netty服务器在[{}]端口启动监听失败", port);e.printStackTrace();}}public void sendMessageToClient(String clientIp, Object msg) {Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);Channel channel = channelMap.get(clientIp);String sendStr;try {sendStr = OBJECT_MAPPER.writeValueAsString(msg);} catch (JsonGenerationException e) {throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);}try {log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr);channel.writeAndFlush(sendStr);} catch (Exception var4) {log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr);throw new RuntimeException(var4);}}public void pushMessageToClients(Object msg) {Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);if (channelMap != null && !channelMap.isEmpty()) {channelMap.forEach((k, v) -> sendMessageToClient(k, msg));}}
}
package com.example.nettydemo.server;import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {/*** 用跳表存储连接channel*/public static Map<Integer, Map<String, Channel>> channelSkipMap = new ConcurrentSkipListMap<>();@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("应用程序的监听通道异常!");cause.printStackTrace();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();// 获取每个用户端连接的ipInetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();String clientIp = ipSocket.getAddress().getHostAddress();InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();// 本地端口做键int localPort = localSocket.getPort();Map<String, Channel> channelMap = channelSkipMap.get(localPort);if (channelMap == null || channelMap.isEmpty()) {channelMap = new HashMap<>(4);}channelMap.put(clientIp, channel);channelSkipMap.put(localPort, channelMap);log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// 获取每个用户端连接的ipChannel channel = ctx.channel();InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();int localPort = localSocket.getPort();InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();String clientIp = ipSocket.getAddress().getHostAddress();Map<String, Channel> channelMap = channelSkipMap.get(localPort);channelMap.remove(clientIp);log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {Channel channel = channelHandlerContext.channel();// 获取每个用户端连接的ipInetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();log.info("接收到客户端: {} 应用数据:{}", ipSocket, msg);}
}
package com.example.nettydemo.server;public class ServerTest {public static void main(String[] args) {TcpServer tcpServer = new TcpServer(40001);tcpServer.startListen();while (true) {try {// 每5秒向客户端发送一次 "test-朱上林123"Thread.sleep(5000);tcpServer.pushMessageToClients("test-朱上林123");} catch (Exception e) {e.printStackTrace();}}}
}

客户端

package com.example.nettydemo.client;import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;import java.io.IOException;@Slf4j
public class TcpClient {private EventLoopGroup group;private ChannelFuture channelFuture;private final String ip;private final Integer port;private final ObjectMapper objectMapper = new ObjectMapper();public TcpClient(String ip, Integer port) {this.ip = ip;this.port = port;}/*** 建立连接**/public synchronized void connectServer() {log.info("开始建立连接,ip:{}, port:{}", ip, port);// 生命nio连接池this.group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();// 配置解码器以及消息处理类b.group(this.group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyEncoder());pipeline.addLast(new NettyDecoder());pipeline.addLast(new TcpClientHandler());}});// 开始连接this.channelFuture = b.connect(ip, port).sync();} catch (Exception var4) {log.error("连接建立失败,ip:{}, port:{}", ip, port);this.group.shutdownGracefully();var4.printStackTrace();}}/*** 关闭连接*/public void close() {this.group.shutdownGracefully();}/*** 发送消息** @param msg*/public synchronized void sendCommonMsg(Object msg) {String sendStr;if (!getConnectStatus()) {connectServer();}try {sendStr = objectMapper.writeValueAsString(msg);} catch (JsonMappingException e) {throw new RuntimeException(e);} catch (JsonGenerationException e) {throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);}try {log.info("发送消息内容:{}", sendStr);this.channelFuture.channel().writeAndFlush(sendStr);} catch (Exception var4) {log.error("发送消息失败,消息内容:{}", sendStr);throw new RuntimeException(var4);}}/*** 获取当前连接状态*/public Boolean getConnectStatus() {return group != null && !group.isShutdown() && !group.isShuttingDown();}
}
package com.example.nettydemo.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<String> {/*** 读取事件** @param channelHandlerContext* @param msg*/@Overridepublic void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {log.info("服务返回消息 :{}", msg);}/*** 发生异常** @param channelHandlerContext* @param throwable*/@Overridepublic void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) {log.error("通信发生异常:" + throwable.getMessage());channelHandlerContext.close();}
}
package com.example.nettydemo.client;public class TcpClientTest {public static void main(String[] args) {TcpClient tcpClient = new TcpClient("127.0.0.1", 40001);// 客户端连接到服务器后,向服务器发送一条消息:tcpClient.connectServer();tcpClient.sendCommonMsg("我是Client,刚刚是我连接到你的!");}
}

启动服务端和客户端实现通信

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下课!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/184439.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP 连接断开

1&#xff1a;TCP 四次挥手过程是怎样的&#xff1f; 客户端打算关闭连接&#xff0c;此时会发送一个 TCP 首部 FIN 标志位被置为 1 的报文&#xff0c;也即 FIN 报文&#xff0c;之后客户端进入 FIN_WAIT_1 状态。 服务端收到该报文后&#xff0c;就向客户端发送 ACK 应答报文…

【微信小程序】保存多张图片到本地相册 wx.saveImageToPhotosAlbum

这里写目录标题 微信小程序检测是否有存储权限wx.getSetting 图片上传从HTML中提取img标签的src属性多图片下载 微信小程序检测是否有存储权限 wx.getSetting 上传前判断是否开启存储权限&#xff0c;如果不检测直接上传会出现fail的情况 var _this this wx.getSetting({su…

阿里云崩溃了,为什么你没有收到补偿?【补偿领取方式放文末】

事情经过 北京时间11月27日&#xff0c;阿里云部分地域云数据库控制台访问出现异常。据悉&#xff0c;从当日09:16起&#xff0c;阿里云监控发现北京、上海、杭州、深圳、青岛、香港以及美东、美西地域的数据库产品(RDS、PolarDB、Redis等)的控制台和OpenAPI访问出现异常&…

PHP在线日语学习平台

有需要请加文章底部Q哦 可远程调试 PHP在线日语学习平台 一 介绍 此日语学习平台基于原生PHP开发&#xff0c;数据库mysql。系统角色分为用户和管理员。(附带参考设计文档) 技术栈&#xff1a;phpmysqlphpstudyvscode 二 功能 学生 1 注册/登录/注销 2 个人中心 3 查看课程…

C#开发的OpenRA游戏之属性SelectionDecorations(14)

C#开发的OpenRA游戏之属性SelectionDecorations(14) 前面分析选择类时,还有一个功能,就是把选中物品的状态和生命值显示出来。 它是通过下面的函数来实现: protected override IEnumerable<IRenderable> RenderSelectionBars(Actor self, WorldRenderer wr, bool …

this.$nextTick与this.$set,解决拖拽表格没有刷新问题!

一&#xff1a;this.$nextTick的用法 将回调延迟到下次 DOM 更新循环之后执行。在修改数据之后立即使用它&#xff0c;然后等待 DOM 更新。它跟全局方法 Vue.nextTick 一样&#xff0c;不同的是回调的 this 自动绑定到调用它的实例上。 原因是&#xff0c;Vue是异步执行DOM更…

kubernetes(k8s)容器内无法连接同所绑定的Service ClusterIP问题记录

kubernetes(k8s)容器内无法连接同所绑定的Service ClusterIP问题记录 1. k8s环境 k8s使用kubernetes-server-linux-amd64_1.19.10.tar.gz 二进制bin 的方式手动部署 k8s 版本: [rootmaster ~]# kubectl version Client Version: version.Info{Major:"1", Minor:&…

CentOS7安装RabbitMQ

服务器系统版本&#xff1a;CentOS7 安装RabbitMq版本&#xff1a;3.7.18 将此安装包目录下的两个文件上传到服务/usr/local/rabbitmq中备用。 安装Erlang依赖包 rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm安装RabbitMQ安装包(需要联网) yum install -y rabbitmq-server-3.7.1…

yml转properties工具

目前搜索到的大部分代码都存在以下问题&#xff1a; 复杂结构解析丢失解析后顺序错乱 所以自己写了一个&#xff0c;经过不充分测试&#xff0c;基本满足使用。可以直接在线使用 在线地址 除了yml和properties互转之外&#xff0c;还可以生成代码、sql转json等&#xff0c;可…

深入解析进程

在现代计算机系统中&#xff0c;进程是一个核心概念&#xff0c;它代表了程序的执行实例。通过并发执行多个进程&#xff0c;计算机能够提高效率和资源利用率。 1. 进程的概念 进程是指在计算机系统中正在执行的程序的实例。每个进程都有自己的地址空间、寄存器集合、堆栈和文…

【Linux】OpenSSH 命令注入漏洞(CVE-2020-15778)(iptables屏蔽22端口方式)

背景 漏洞名称&#xff1a;OpenSSH 命令注入漏洞(CVE-2020-15778) 详细描述&#xff1a;OpenSSH&#xff08;OpenBSD Secure Shell&#xff09;是OpenBSD计划组的一套用于安全访问远程计算机的连接工具。该工具是SSH协议的开源实现&#xff0c;支持对所有的传输进行加密&#…

java:lombok库方便的使用@Getter和@Setter

背景 Lombok是一个Java库&#xff0c;它通过注解减少了大量常规代码&#xff0c;例如getter和setter方法&#xff0c;equals&#xff0c;hashCode&#xff0c;toString等方法。使用Lombok可以减少模板代码的编写&#xff0c;使代码更简洁&#xff0c;更易于阅读和维护。 常用…

DaVinci Resolve Studio达芬奇软件 18.6.3

DaVinci Resolve Studio 18是一款专业的视频编辑和调色软件&#xff0c;适用于电影、电视节目、广告等各种视觉媒体的制作。它具有完整的后期制作功能&#xff0c;包括剪辑、调色、特效、音频处理等。 以下是DaVinci Resolve Studio 18的主要特点&#xff1a; - 提供了全面的视…

Go map类型

一、map介绍 1、map说明 map是一种无须的基于key-value的数据结构&#xff0c;Go语言中的map是引用类型&#xff0c;必须初始化才能使用Go语言中map的定义语法如下&#xff1a;map[KeyType]ValueType其中 KeyType&#xff1a;表示键的类型ValueType&#xff1a;表示键对应的值…

CAN网络出现错误帧从哪些方面去分析解决

标题&#xff1a;CAN网络出现错误帧从哪些方面去分析 实例1&#xff1a; 断电重启后&#xff0c;会有错误帧产生。 检查方案&#xff1a; 查看收发模块的初始化、使能是否在发送CAN报文之前完成&#xff1f; 实例2&#xff1a; 周期性报文&#xff0c;有时会冒出一帧错误帧&…

[Java][单列集合具体操作]以ArrayList为例讲解“增”“删”“查”“改”操作

我们来研究List系列的集合 List相较于set 优势在于&#xff1a; 1.有序的 2.有索引的 3.可重复的 这里最关键的是2.有索引的&#xff1a;因为这点我们可以做更多拓展性的操作 List是Collection中的一种 Collection中的方法List都继承了 索引拓展出的方法 是add(索引位置&#x…

【接口自动化】selenium库也有大用场(获取cookie)

相信有些童鞋在做接口、或者说接口自动化测试的过程中会遇到这样的场景&#xff1a;测试的接口&#xff0c;必须是需要登录后才能发起请求成功的。 那么怎么解决呢&#xff1f; 本着团队协作的精神&#xff0c;我们就去让开发同学开个后门&#xff0c;给你个“万能”值&#x…

基层管理人员的薪酬结构设计及分析

人力资源经理们经常面对这样的难题&#xff1a;怎么用相同的工资水平更好的保留和发展基层管理人员&#xff1f;要解决这个难题&#xff0c;第一要明确企业的付薪理念&#xff0c;选择相应的基层管理人员薪酬结构类型&#xff1b;第二要确定合理的针对基层管理人员的薪酬结构比…

Condition 源码解析

Condition 源码解析 文章目录 Condition 源码解析一、Condition二、Condition 源码解读2.1. lock.newCondition() 获取 Condition 对象2.2. condition.await() 阻塞过程2.3. condition.signal() 唤醒过程2.4. condition.await() 被唤醒后 三、总结 一、Condition 在并发情况下…

【网络奇遇之旅】:那年我与计算机网络的初相遇

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; 计算机网络 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 一. 前言二. 计算机网络的定义三. 计算机网络的功能3.1 资源共享3.2 通信功能3.3 其他功能 四. 计算机网络…