Netty 入门学习

前言

学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。

Spark 通信历史

最开始: Akka
Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题
Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

Netty

Server 主要代码:

// 创建ServerBootstrap实例,服务器启动对象
ServerBootstrap bootstrap = new ServerBootstrap();ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 等待服务器关闭
channelFuture.channel().closeFuture().sync();

主要是启动 ServerBootstrap、绑定端口、等待关闭。

Client 主要代码:

// 创建Bootstrap实例,客户端启动对象
Bootstrap bootstrap = new Bootstrap();
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();

Server 添加 Handler

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ServerHandler());}
});
bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}
});

这里的 ServerHandler 和 ClientHandler 都是自己实现的类,处理具体的逻辑。
如channelActive 建立连接时发消息给服务器,channelRead 读取数据时调用,处理读取数据的逻辑。给服务器或者客户端发消息可以用 writeAndFlush 方法。

完整代码

地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo

NettyServer

package com.dkl.java.demo;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {try {bind();} catch (InterruptedException e) {throw new RuntimeException(e);}}public static void bind() throws InterruptedException {// 创建boss线程组,用于接收连接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 创建worker线程组,用于处理连接上的I/O操作,含有子线程NioEventGroup个数为CPU核数大小的2倍EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建ServerBootstrap实例,服务器启动对象ServerBootstrap bootstrap = new ServerBootstrap();// 使用链式编程配置参数// 将boss线程组和worker线程组暂存到ServerBootstrapbootstrap.group(bossGroup, workerGroup);// 设置服务端Channel类型为NioServerSocketChannel作为通道实现bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 添加ServerHandler到ChannelPipeline,对workerGroup的SocketChannel(客户端)设置处理器socketChannel.pipeline().addLast(new ServerHandler());}});// 设置启动参数,初始化服务器连接队列大小。服务端处理客户端连接请求是顺序处理,一个时间内只能处理一个客户端请求// 当有多个客户端同时来请求时,未处理的请求先放入队列中bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 绑定端口并启动服务器,bind方法是异步的,sync方法是等待异步操作执行完成,返回ChannelFuture异步对象ChannelFuture channelFuture = bootstrap.bind(8888).sync();// 等待服务器关闭channelFuture.channel().closeFuture().sync();} finally {// 优雅地关闭boss线程组bossGroup.shutdownGracefully();// 优雅地关闭worker线程组workerGroup.shutdownGracefully();}}
}

ServerHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用** @param ctx* @throws Exception*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelRegistered");}/*** 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调* 用** @param ctx* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelUnregistered");}/*** 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelActive");}/*** 当 Channel 离开活动状态并且不再连接它的远程节点时被调用** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelInactive");}/*** 当从 Channel 读取数据时被调用** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("执行 channelRead");// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("Server端收到客户消息: " + message);// 发送响应消息给客户端ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,我收到你的消息啦~", CharsetUtil.UTF_8));} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}/*** 当 Channel 上的一个读操作完成时被调用,对通道的读取完成的事件或通知。当读取完成可通知发送方或其他的相关方,告诉他们接受方读取完成** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelReadComplete");}/*** 当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被* 调用** @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {System.out.println("执行 userEventTriggered");}/*** 当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法* * 来检测 Channel 的可写性。与可写性相关的阈值可以通过* * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法来* * 设置** @param ctx* @throws Exception*/@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelWritabilityChanged");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("执行 exceptionCaught");// 异常处理cause.printStackTrace();ctx.close();}
}

NettyClient

package com.dkl.java.demo;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {start();}public static void start() {// 创建EventLoopGroup,用于处理客户端的I/O操作EventLoopGroup groupThread = new NioEventLoopGroup();try {// 创建Bootstrap实例,客户端启动对象Bootstrap bootstrap = new Bootstrap();bootstrap.group(groupThread);// 设置服务端Channel类型为NioSocketChannel作为通道实现bootstrap.channel(NioSocketChannel.class);// 设置客户端处理bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}});// 绑定端口ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 优雅地关闭线程groupThread.shutdownGracefully();}}
}

ClientHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 连接建立时的处理,发送请求消息给服务器ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!我是客户端,测试通道连接", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("受到服务端响应的消息: " + message);// TODO: 对数据进行业务处理} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 异常处理cause.printStackTrace();ctx.close();}
}

运行截图


handler 执行顺序

Server 端

连接时:执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete断开连接时:执行 channelReadComplete
(强制中断 Client 连接
执行 exceptionCaught
执行 userEventTriggered (exceptionCaught 中 ctx.close()) 触发
)
执行 channelInactive
执行 channelUnregisteredchannelReadComplete 中 ctx.close(); 触发:
执行 channelInactive
执行 channelUnregistered

Client 端

执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete

Spark 对应位置

  • Spark版本:3.2.3
  • Server: org.apache.spark.network.server.TransportServer.init
  • Client: org.apache.spark.network.client.TransportClientFactory.createClient


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68064.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Microsoft Sql Server 2019 数据类型

数据类型 bigint、int、smallint、tinyint 使用整数数据的精确数字数据类型。 若要节省数据库空间,请使用能够可靠包含所有可能值的最小数 据类型。 例如,对于一个人的年龄,tinyint 就足够了,因为没人活到 255 岁以上。 但对于建筑物的 年龄,tinyint 就不再适应,因为建…

爬虫学习心得

爬虫学习心得 下面给新学者分享一些爬虫的技巧1 xpath的学习a xpath的基本用法b xpath案例 2.正则表达式学习a 基本使用b 12306提取省份成json格式 3.Bs的学习3.1基础3.3基础案例3.5基本案例 4.拿取数据技巧4.1通过标签获取4.2通过lamda表达式获取数据 下面给新学者分享一些爬虫…

[笔记] 使用 Jenkins 实现 CI/CD :从 GitLab 拉取 Java 项目并部署至 Windows Server

随着软件开发节奏的加快&#xff0c;持续集成&#xff08;CI&#xff09;和持续部署&#xff08;CD&#xff09;已经成为确保软件质量和加速产品发布的不可或缺的部分。Jenkins作为一款广泛使用的开源自动化服务器&#xff0c;为开发者提供了一个强大的平台来实施这些实践。然而…

qt QPainter setViewport setWindow viewport window

使用qt版本5.15.2 引入viewport和window目的是用于实现QPainter画出来的内容随着窗体伸缩与不伸缩两种情况&#xff0c;以及让QPainter在widget上指定的区域(viewport)进行绘制/渲染&#xff08;分别对应下方demo1&#xff0c;demo2&#xff0c;demo3&#xff09;。 setViewpo…

likeshop同城跑腿系统likeshop回收租赁系统likeshop多商户商城安装及小程序对接方法

前言&#xff1a;首先likeshop是一个开发平台&#xff0c;是一个独创的平台就像TP内核平台一样&#xff0c;你可以在这个平台上开发和衍生出很多伟大的产品&#xff0c;以likeshop为例&#xff0c;他们开发出商城系统&#xff0c;团购系统&#xff0c;外卖点餐系统&#xff0c;…

C# 与 Windows API 交互的“秘密武器”:结构体和联合体

一、引言 在 C# 的编程世界里&#xff0c;当我们想要深入挖掘 Windows 系统的底层功能&#xff0c;与 Windows API 打交道时&#xff0c;结构体和联合体就像是两把神奇的钥匙&#x1f511; 它们能够帮助我们精准地操控数据&#xff0c;实现一些高级且强大的功能。就好比搭建一…

C++实现设计模式---状态模式 (State)

状态模式 (State) 状态模式 是一种行为型设计模式&#xff0c;它允许对象在运行时根据内部状态的改变来动态改变其行为。通过将状态相关的行为封装到独立的类中&#xff0c;状态模式使得状态的切换更加清晰和灵活。 意图 将对象的行为和状态分离&#xff0c;随着状态的改变动…

202312 青少年软件编程等级考试C/C++ 一级真题答案及解析(电子学会)

第 1 题 数的输入和输出 输入一个整数和双精度浮点数,先将浮点数保留2位小数输出,然后输出整数。 时间限制:1000 内存限制:65536 输入 一行两个数,分别为整数N(不超过整型范围),双精度浮点数F,以一个空格分开。 输出 一行两个数,分别为保留2位小数输出的F,以…

信息系统项目管理-采购管理-采购清单示例

序号类别产品/服务名称规格/功能描述数量备注1硬件服务器高性能处理器&#xff0c;大容量存储10HP、DELL2网络设备高速路由器和交换机10华为3工作站多核处理器&#xff0c;高分辨率显示器25国产设备4移动检查设备手持式移动检查仪&#xff0c;可连接云平台30国产设备5打印机和扫…

继续以“实用”指导Pythonic编码(re通配表达式)(2024年终总结②)

弃现成工具手剥任务&#x1f9d0;&#xff0c;我哈哈滴就像笨笨的傻大个儿&#x1f60b;。 (笔记模板由python脚本于2025年01月12日 23:29:33创建&#xff0c;本篇笔记适合熟悉正则表达式的coder翻阅) 【学习的细节是欢悦的历程】 Python官网&#xff1a;https://www.python.or…

Mysql学习笔记之函数

1.简介 SQL提供了很多函数&#xff0c;便于在查询时能够快速的进行计算或者计数等操作&#xff0c;下文介绍一些实际场景中常用的函数。 2.字符串函数 常用的字符串函数如下表所示&#xff1a; 函数名描述LENGTH(str)返回字符串的长度&#xff0c;以字节为单位。例如LENGTH…

pytest 参数介绍

命令行参数描述常见使用案例-v / --verbose显示每个测试用例的详细信息&#xff0c;包括测试名称和状态pytest -v-s / --captureno禁用输出捕获&#xff0c;允许 print() 输出显示pytest -s-q / --quiet安静模式&#xff0c;减少输出&#xff0c;仅显示每个测试的通过/失败结果…

10步打造完美ASP.NET、Web API和控制台应用程序文件夹结构

一、前言 在大型项目中&#xff0c;合理的文件夹结构是项目成功的关键之一。一个好的文件夹结构就像是一座井然有序的图书馆&#xff0c;每一本书&#xff08;代码文件&#xff09;都有其固定的位置&#xff0c;让人能迅速找到所需。它可以让团队成员更容易理解和维护代码&…

conntrack iptables 安全组

centos 安装yum install conntrack-tools 1. conntrack状态 NEW: 新建连接&#xff08;第一次包&#xff09;。 ESTABLISHED: 已建立连接&#xff0c;正在传输数据。 RELATED: 与已有连接相关的连接&#xff0c;如 FTP 数据连接。 INVALID: 无效连接&#xff0c;无法识别或不…

相机和激光雷达的外参标定 - 无标定板版本

1. 实现的效果 通过本软件实现求解相机和LiDAR的外参&#xff0c;即2个传感器之间的三维平移[x, y, z]和三维旋转[roll, pitch, yaw]。完成标定后&#xff0c;可将点云投影到图像&#xff0c;效果图如下&#xff1a; 本软件的优势&#xff1a;&#xff08;1&#xff09;无需特…

WPF系列九:图形控件EllipseGeometry

简介 EllipseGeometry用于绘制一个椭圆的形状。它通常与其他图形元素结合使用&#xff0c;比如 Path 或者作为剪切区域来定义其他元素的外形。 定义椭圆&#xff1a;EllipseGeometry 用来定义一个椭圆或者圆的几何形状。参与绘制&#xff1a;可以被用作 Path 元素的数据&…

EFCore HasDefaultValueSql (续2 HasComputedColumnSql)

前情&#xff1a;EFCore HasDefaultValueSql EFCore HasDefaultValueSql (续1 ValueGeneratedOnAdd)-CSDN博客 小伙伴在使用 HasDefaultValueSql 时&#xff0c;对相关的 ValueGeneratedOnAdd, HasComputedColumnSql 也有了疑问&#xff1a; HasComputedColumnSql 对于计算…

qt设置qwidget背景色无效

最近在做一个界面&#xff0c;有三个子窗体&#xff0c;于是就把各个子窗体分别做成了三个UI&#xff0c;再将3个UI&#xff0c;放到1个UI&#xff0c;再将那一个UI在其他窗体上进行提升。 最后就发现怎么设置qwidget的背景都没有效果。 在Qt中&#xff0c;如果是给Qwidget的…

【Rust学习笔记】Rust 的所有权介绍

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 博客内容主要围绕&#xff1a; 5G/6G协议讲解 高级C语言讲解 Rust语言讲解 文章目录 Rust中的所有权介绍1.1 一个简单的例子1.2 一个稍微复杂的例…

Centos9-SSH免密登录配置-修改22端口-关闭密码登录-提高安全性

Centos9-SSH免密登录配置-修改22端口-关闭密码登录 生成秘钥对将公钥信息存进authorized_keys测试登录查询访问记录、比对指纹更换22访问端口关闭账号密码登录生成秘钥对 生成密钥对,指定 备注 和 文件目录命令执行后,默认两次回车,不设置秘钥使用密码ssh-keygen -t rsa -b …