4.netty源码分析

1.pipeline调用handler的源码

//pipeline得到双向链表的头,next到尾部,

2.心跳源码 主要分析IdleStateHandler3个定时任务内部类
//考虑了网络传输慢导致出站慢的情况
//超时重新发送,然后关闭

ReadTimeoutHandler(继承IdleStateHandler 直接关闭连接)和WriteTimeoutHandler(继承ChannelOutboundHandlerAdapter 使用定时任务来完成)
//NioEventLoop进行事件循环

 @Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio; //默认 50if (ioRatio == 100) { //如果被占用,就处理事件try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks(); //执行任务}} else {  //如果没有被占用final long ioStartTime = System.nanoTime();try {processSelectedKeys();  //选择一个key} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);   //ioRatio默认是50 ,计算出来是1.所以定时任务执行了 ioTime没有响应的时间(妙啊,如果执行1s或者是其他时间那可能还是检测不到心跳)}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}

//IdleStateHandler

3.eventLoop执行定时任务的源代码(传入并执行队列)

1.select 默认阻塞1秒

4.任务加入异步线程池(处理耗时任务时)(因为执行任务读写 和执行读写后执行Loop任务是同一个线程)
就不会阻塞netty的IO

1.handler加入线程池(自己创建group线程池)
2.context加入线程池

/** Copyright 2012 The Netty Project** The Netty Project licenses this file to you under the Apache License,* version 2.0 (the "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at:**   http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the* License for the specific language governing permissions and limitations* under the License.*/
package source.echo2;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;import java.util.concurrent.Callable;/*** Handler implementation for the echo server.*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// group 就是充当业务线程池,可以将任务提交到该线程池// 这里我们创建了16个线程static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("EchoServer Handler 的线程是=" + Thread.currentThread().getName());//按照原来的方法处理耗时任务//解决方案2 用户程序自定义的普通任务ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);//输出线程名System.out.println("EchoServerHandler execute 线程是=" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));} catch (Exception ex) {System.out.println("发生异常" + ex.getMessage());}}});ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);//输出线程名System.out.println("EchoServerHandler execute 线程2是=" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));} catch (Exception ex) {System.out.println("发生异常" + ex.getMessage());}}});//方式1
//        //将任务提交到 group线程池
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//
//                //接收客户端信息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, "UTF-8");
                //休眠10秒
                Thread.sleep(10 * 1000);
//                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
//                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
//                return null;
//
//            }
//        });
//
//        //将任务提交到 group线程池
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//
//                //接收客户端信息
//                ByteBuf buf = (ByteBuf) msg;
//                byte[] bytes = new byte[buf.readableBytes()];
//                buf.readBytes(bytes);
//                String body = new String(bytes, "UTF-8");
//                //休眠10秒
//                Thread.sleep(10 * 1000);
//                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
//                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
//                return null;
//
//            }
//        });
//
//
//        //将任务提交到 group线程池
//        group.submit(new Callable<Object>() {
//            @Override
//            public Object call() throws Exception {
//
//                //接收客户端信息
//                ByteBuf buf = (ByteBuf) msg;
//                byte[] bytes = new byte[buf.readableBytes()];
//                buf.readBytes(bytes);
//                String body = new String(bytes, "UTF-8");
//                //休眠10秒
//                Thread.sleep(10 * 1000);
//                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
//                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
//                return null;
//
//            }
//        });//普通方式//接收客户端信息ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);String body = new String(bytes, "UTF-8");//休眠10秒Thread.sleep(10 * 1000);System.out.println("普通调用方式的 线程是=" + Thread.currentThread().getName());ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));System.out.println("go on ");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.//cause.printStackTrace();ctx.close();}
}

5.netty源码
//面试

1.连接 ServerBootstrap空的构造器 作用为初始化类的成员
//启动bind

6.RPC(remote procedure call) call function like local machine

1.usual framework , ali Dubbo google gRPC ,Go rpcx
apache thrift ,spring cloud
pic 17.rpc procedure

  1. imitate dubbo RPC 通过jdk反射实现

// server handler规定 msg开头是规定的字符串(或者协议)
msg.toString().startsWith(“helloService#hello”)
//callable可以在服务器和客户端之间使用,wait()然后notify() (在同一机器)
//read调用只一次,然后调用call(netty的pipeline自动调用)
//server和client,只需按照需要的名字来调用

//下面是客户端传递参数给服务端,然后服务端调用被代理的实现的接口

//创建消费者,调用远程服务

public class ClientBootstrap {//这里定义协议头(调用的方法名)public static final String providerName = "HelloService#hello#";public static void main(String[] args) throws  Exception{//创建一个消费者NettyClient customer = new NettyClient();//创建代理对象HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);for (;;) {Thread.sleep(2 * 1000);//通过代理对象调用服务提供者的方法(服务)String res = service.hello("你好 dubbo~");System.out.println("调用的结果 res= " + res);}}
}

//创建客户端创建代理对象,和初始化客户端添加handler

public class NettyClient {//创建线程池private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static NettyClientHandler client;private int count = 0;//编写方法使用代理模式,获取一个代理对象public Object getBean(final Class<?> serivceClass, final String providerName) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serivceClass}, (proxy, method, args) -> {System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");//{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码if (client == null) {initClient();}//设置要发给服务器端的信息//providerName 协议头 args[0] 就是客户端调用api hello(???), 参数client.setPara(providerName + args[0]);return executor.submit(client).get();});}//初始化客户端private static void initClient() {client = new NettyClientHandler();//创建EventLoopGroupNioEventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(client);}});try {bootstrap.connect("127.0.0.1", 7000).sync();} catch (Exception e) {e.printStackTrace();}}
}
//客户端handler,处理来自服务器的请求,服务器有数据通知线程,没有就等待public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {private ChannelHandlerContext context;//上下文private String result; //返回的结果private String para; //客户端调用方法时,传入的参数//与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(" channelActive 被调用  ");context = ctx; //因为我们在其它方法会使用到 ctx}//收到服务器的数据后,调用方法 (4)//@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(" channelRead 被调用  ");result = msg.toString();notify(); //服务器返回数据,唤醒等待的线程}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}//被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5@Overridepublic synchronized Object call() throws Exception {System.out.println(" call1 被调用  ");context.writeAndFlush(para); //这里的context是client自己的,//进行waitwait(); //等待channelRead 方法获取到服务器的结果后,唤醒System.out.println(" call2 被调用  ");return  result; //服务方返回的结果}//(2)void setPara(String para) {System.out.println(" setPara  ");this.para = para;}
}

//创建服务器端,添加服务器handler

public class NettyServer {public static void startServer(String hostName, int port) {startServer0(hostName,port);}//编写一个方法,完成对NettyServer的初始化和启动private static void startServer0(String hostname, int port) {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler()); //业务处理器}});ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();System.out.println("服务提供方开始提供服务~~");channelFuture.channel().closeFuture().sync();}catch (Exception e) {e.printStackTrace();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) {//代码代填..NettyServer.startServer("127.0.0.1", 7000);}
}
//服务器handler,如果接收到客户端的请求,得到字符,传入参数调用实现类方法public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//获取客户端发送的消息,并调用服务System.out.println("msg=" + msg);//客户端在调用服务器的api 时,我们需要定义一个协议//比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"if(msg.toString().startsWith(ClientBootstrap.providerName)) {msg.toString().lastIndexOf("#")String substring = msg.toString().substring(msg.toString().lastIndexOf("#") + 1);String result = new HelloServiceImpl().hello(substring);ctx.writeAndFlush(result); //向客户端写回返回的结果}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

//接口(客户端需要,使用jdk代理可以代理接口,使用cglib可以直接代理类)
//这个是接口,是服务提供方和 服务消费方都需要

public interface HelloService {String hello(String mes);
}//实现类
public class HelloServiceImpl implements HelloService {private static int count = 0;//当有消费方调用该方法时, 就返回一个结果@Overridepublic String hello(String mes) {System.out.println("收到客户端消息=" + mes);//根据mes 返回不同的结果if(mes != null) {return "你好客户端, 我已经收到你的消息 [" + mes + "] 第" + (++count) + " 次";} else {return "你好客户端, 我已经收到你的消息 ";}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/19131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js如何将图片转成BASE64编码,网页跟uniapp开发的app的区别?

Base64是一种用64个字符来表示任意二进制数据的方法&#xff0c;这篇文章主要为大家介绍了如何实现将图片转为base64格式&#xff0c;感兴趣的小伙伴可以学习一下 前言 前段时间在写我的VUE全栈项目的时候&#xff0c;遇到要把前端的照片上传到后端&#xff0c;再由后端存到数…

SpringBoot项目中使用Lombok插件中Slf4j日志框架

前言&#xff1a;idea需要安装lombok插件&#xff0c;因为该插件中添加了Slf4j注解&#xff0c;可以将Slf4j翻译成 private static final org.slf4j.Logger logger LoggerFactory.getLogger(this.XXX.class); springboot本身就内置了slf4j日志框架&#xff0c;所以不需要单独…

【2023】Redis实现消息队列的方式汇总以及代码实现

Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始&#xff1a;List 队列代码实现 二、发布订阅模式&#xff1a;Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、…

小白到运维工程师自学之路 第六十二集 (docker持久化与数据卷容器)

一、概述 Docker持久化是指将容器中的数据持久保存在主机上&#xff0c;以便在容器重新启动或迁移时不丢失数据。由于Docker容器是临时和可变的&#xff0c;它们的文件系统默认是易失的&#xff0c;这意味着容器中的任何更改或创建的文件都只存在于此容器的生命周期内。但是&a…

小主机折腾记16

7月折腾了 1.2500s&#xff0c;2550k&#xff0c;e3 1225的性能测试 结果如下图 总结如下&#xff1a; a.2500s e3 1225 2390t 差别不大 b.1333频率相对1066频率内存提升12%左右 c.为什么少了2550k&#xff0c;因为装上去风扇尬转&#xff0c;没画面&#xff0c;我猜是因为…

node.js判断元素是否包括

在Node.js中&#xff0c;可以使用Array.prototype.some()方法来判断数组中是否包含某个元素。下面是一个示例代码&#xff1a; const arr [ { ‘_android:name’: ‘com.eg.android.AlipayGphone’ }, { ‘_android:name’: ‘com.eg.android.AlipayGphoneRC’ }, { ‘_andro…

助力青少年科技创新人才培养,猿辅导投资1亿元设立新基金

近日&#xff0c;在日本千叶县举办的2023年第64届国际数学奥林匹克&#xff08;IMO&#xff09;竞赛公布比赛结果&#xff0c;中国队连续5年获得团体第一。奖牌榜显示&#xff0c;代表中国参赛的6名队员全部获得金牌。其中&#xff0c;猿辅导学员王淳稷、孙启傲分别以42分、39分…

用latex的ACM模板写论文如何去除页眉页脚以及Reference

简单粗暴如下&#xff1a; 1、latex最开始补充&#xff1a; \documentclass[acmsmall]{acmart} \settopmatter{printacmreffalse} % Removes citation information below abstract \renewcommand\footnotetextcopyrightpermission[1]{} % removes footnote with conference in…

FFmepg视频解码

1 前言 上一篇文章<FFmpeg下载安装及Windows开发环境设置>介绍了FFmpeg的下载安装及环境配置&#xff0c;本文介绍最简单的FFmpeg视频解码示例。 2 视频解码过程 本文只讨论视频解码。 FFmpeg视频解码的过程比较简单&#xff0c;实际就4步&#xff1a; 打开媒体流获取…

代码随想录额外题目| 二叉树 ●129求根到叶数字之和 ●1382二叉树变平衡●100相同的树

#129求根到叶数字之和 回溯放进vector&#xff0c;然后从后往前拿&#xff0c;乘1 10 100 ... 很基础的回溯 my code&#xff1a; void backtrack(int depth, TreeNode* cur, vector<TreeNode*> &vec, int &sum){if(cur->leftnullptr &&cur->rig…

如何方便地使用TCL恢复带BD设计的Vivado工程

恢复无BD设计的Vivado工程 当工程中无Block Design设计时&#xff0c;工程恢复过程相对简单。使用write_project_tcl命令可以直接生成用于恢复工程的tcl文件&#xff0c;如*_prj.tcl&#xff0c;在恢复时直接运行就可以了。 修改恢复工程的路径 *_prj.tcl在恢复工程时会将其…

监控对象都有哪些分类

1、业务监控 这类指标是管理层非常关注的&#xff0c;代表企业营收&#xff0c;或者跟客户主流程相关&#xff0c;类似 BI 数据。不过相比 BI 数据&#xff0c;业务监控指标有两点不同。 对精确度要求没有那么高&#xff1a;因为监控只要发现趋势异常就可以&#xff0c;至于是…

极简在线商城系统,支持docker一键部署

Hmart 给大家推荐一个简约自适应电子商城系统&#xff0c;针对虚拟商品在线发货&#xff0c;支持企业微信通知&#xff0c;支持docker一键部署&#xff0c;个人资质也可搭建。 前端 后端 H2 console 运行命令 docker run -d --name mall --restartalways -p 8080:8080 -e co…

LeetCode_贪心算法_中等_763.划分字母区间

目录 1.题目2.思路3.代码实现&#xff08;Java&#xff09; 1.题目 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍…

Jackson:String转object反序列化失败

场景 消费mq时String转Object 代码 for (MessageExt msg : msgs) {String msgBody new String(msg.getBody(), StandardCharsets.UTF_8);BinlogEvent binlogEvent JsonUtil.silentString2Object(msgBody, BinlogEvent.class);binlogEvent.setPort(Long.valueOf(port));tCo…

SAP CAP篇十二:AppRouter 深入研究

本文目录 本系列文章理解现有程序app文件夹中的package.json理解approuter.js 修改现有程序修改package.json新建index.js在Approuter中显示额外的逻辑 添加一些额外的Logger对应代码及branch 本系列文章 SAP CAP篇一: 快速创建一个Service&#xff0c;基于Java的实现 SAP CAP…

深入浅出Pytorch函数——torch.sum

分类目录&#xff1a;《深入浅出Pytorch函数》总目录 相关文章&#xff1a; 深入浅出Pytorch函数——torch.Tensor 函数torch.sum有两种形式&#xff1a; torch.sum(input, *, dtypeNone)&#xff1a;返回输入张量input所有元素的和。torch.sum(input, dim, keepdimFalse, *,…

2308C++技巧

struct ubiq {template <class Type>constexpr operator Type() const {return Type{};}; }; // int i ubiq{}; double d ubiq{}; char c ubiq{}; //可以多个同时初化. template <class T, std::size_t... I> constexpr auto 类型转标识数组(std::size_t* types…

(AcWing)多重背包问题 I,II

有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件&#xff0c;每件体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使物品体积总和不超过背包容量&#xff0c;且价值总和最大。 输出最大价值。 输入格式 第一行两个整数 N&#xff0c;…

使用树莓派picow和drv8833驱动直流电机

raspberry pico w引脚图 1. 准备工作 板子编辑器raspberry pico wmicropython&#xff08;thonny编辑器&#xff09; 最新的raspberry pi pico w系统包下载地址。 点亮板载led灯 需要注意的是pico的板载led灯是GPIO25引脚&#xff0c;picow的板子led灯则直接用Pin包的&qu…