分布式异步任务处理组件(七)

分布式异步任务处理组件底层网络通信模型的设计--如图:

  1. 使用Java原生NIO来实现TCP通信模型
  2. 普通节点维护一个网络IO线程,负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写,上层会有另一个线程负责网络通信协议的实现;---也就是说维护一个selector线程,负责处理socketchannel的IO事件;
  3. Leader节点网络通信层有多个线程--一个selector线程负责接受其他节点的连接请求,然后为每个连接建立一个线程并分配单独的selector来处理各自连接上的IO事件--如此设计的原因是各节点的状态严格依赖与主节点的心跳和其他通信,防止主节点线程阻塞导致心跳失败;从而引发节点下线带来的大量同步工作--后续会聊到;
  4. 各节点网络通信线程之上会有一个线程专门负责组件的网络通信协议,就是将网络传输的字节流解码成组件的通信协议包,因为NIO的buffer是数据块,所以首先通过读写队列将字节转化为字节流,通过协议转化为网络通信命令包,同时解决粘包半包等问题;
  5. 网络通信线程和协议实现线程之间通过读写两个队列来实现(网络IO线程的读队列就是协议线程的写队列,反过来一样,所以这里读写队列是相对的;),为了保证性能,避免重复创建对象和对象回收,设计了ByteBuffer缓存机制和异步读写队列数据结构--详细结构如图--
  6. 说一下三个队列--读写队列和缓存队列,用来实现IO通信线程和协议通信线程之间的数据通信--两个线程基本上会轮训处理网络IO事件,和上层协议事件,基本过程如下--
    1. 从网络IO线程角度出发--
      1. 当产生可读事件时,网络IO线程会从缓存队列中获取一个空的ByteBuffer,这里设计为当没有可用的缓存Buffer对象时会新建一个--具体在队列实现里讲,可能会产生写扩张现象,后期性能优化时考虑加入回收机制;
      2. 将socket缓冲区中的网络数据read进Buffer中,然后将Buffer对象入队到IO写队列中;
      3. 然后检查IO读队列不为空时,对IO读队列出队,获取要发送的数据Buffer对象,发送到其他节点中;
  7. 异步多线程队列,支持两个线程同时出队入队操作;原理和代码贴下来,基本实现:
package org.example.web.buffer;import org.example.web.api.SocketBufferQueue;import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {//异步读写队列实现原理;/** 当队列中的元素个数>1时,读线程和写线程可以同时进行,因为这时候不涉及操作共享变量*当队列中的元素个数<=1时,读写队列中只能有一个线程操作读或者写,因为此时会涉及队列头尾指针的操作;* 实现原理,写线程在获取写锁时可以正常做写操作:此时有两种情况--*     1,获取写锁之后队列为空,此时不会有读线程做读操作,只有获得写锁的该线程可以put,put完成之后将头尾指针同时指向改为以元素即可;此时队列元素个数为1;*     2,获取写锁之后队列中只有一个元素,这时也可以保证只有该线程在做写入,因为只有一个元素的情况下,读线程要读取该元素必须同时获得读锁和写锁;此时队列元素个数为2;*     3,读线程获取读锁之后有三种情况;size>1;size=1;size=0;*     4, 重点是保证不能多个线程同时进入队列元素为零的状态;就是读线程消费了最后一个元素,正好此时写线程在队列为空的时候写入,读写线程会同时操作头尾指针,造成错乱,所以在元素数量为1* 的时候就要进行同步操作;原理:*           1.读线程获取读锁之后如果size=1,此时不会先消费,而是试图获取写锁,防止此时有写线程同时操作,获取写锁之后再判断size是否为1,如果为1则做出队操作,然后释放写锁,如果为2则直接释放写锁--再进行出队操作;*           2,这里读线程获取读锁之后判断size=1,再获取读锁成功之后有两种情况--*                   1,有写线程在读线程之前获取到了写锁,则读线程获取到写锁的时候size>=2了(可能不止一个),*                   2,判断size=1之后直接获取到了写锁,此时就应该阻塞其他写线程做入队操作,等待自己完成出队操作之后再释放写锁;*     5,再说一下size怎么保证同步,*           1,在size<=1的时候严格保证线程同步操作,保证size;*           2,在size>1的时候,此时可以理解为队列同时在出队和入队,size在两个线程操作的时候先出队-1还是先入队+1其实是没有关系的,因为原子操作保证了最后结果是没有问题的就行;* */private AtomicInteger size;protected T head;protected T tail;private Object readLock;private Object writeLock;//这里考虑使用cas还是SynchronizedAsynchronousQueue(){this.writeLock=new Object();this.readLock=new Object();}AsynchronousQueue(int initSize){this();this.size=new AtomicInteger(initSize);}//空队列初始化要创建一个nodeAsynchronousQueue(T node){this(1);this.head=node;this.tail=this.head;}public boolean offerFirstOne(T node){synchronized (this.writeLock){if(this.size.get()>0){return false;}this.head=this.tail=node;return this.size.compareAndSet(0,1);}}public boolean offer(T node){preOfferElement(node);synchronized (this.writeLock){if(this.size.get()==0){return this.offerFirstOne(node);}else{T temp=this.head;node.next=temp;temp.pre=node;this.head=node;}return this.size.incrementAndGet() > 1;}}private void preOfferElement(T bufferNode){bufferNode.next=null;bufferNode.pre=null;}public T pollLastOne(){return this.size.compareAndSet(1,0)?this.tail:null;}public T poll(){synchronized (this.readLock){if(this.size.get()==0){return null;}if(this.size.get()==1){synchronized (this.writeLock){if(this.size()>1){return this.getTailElement();}if(this.size()==1){this.pollLastOne();}}}return this.getTailElement();}}private T getTailElement(){if(this.size()>1){this.tail= (T) this.tail.pre;this.size.decrementAndGet();return (T) this.tail.next;}return null;}public int size(){return this.size.get();}public int increamentSize(){return this.size.incrementAndGet();}public int decrementSize(){return this.size.decrementAndGet();}private class BufferNode{private ByteBuffer buffer;private BufferNode pre;private BufferNode next;BufferNode(ByteBuffer byteBuffer){this.buffer=byteBuffer;}BufferNode(){}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/23628.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PoseiSwap:基于 Nautilus Chain ,构建全新价值体系

在 DeFi Summer 后&#xff0c;以太坊自身的弊端不断凸显&#xff0c;而以 Layer2 的方式为其扩容成为了行业很长一段时间的叙事方向之一。虽然以太坊已经顺利的从 PoW 的 1.0 迈向了 PoS 的 2.0 时代&#xff0c;但以太坊创始人 Vitalik Buterin 表示&#xff0c; Layer2 未来…

iOS-砸壳篇(两种砸壳方式)

CrackerXI砸壳呢&#xff0c;当时你要是使用 frida-ios-dump 也是可以的&#xff1b; https://github.com/AloneMonkey/frida-ios-dump frida-ios-dump: 代码中需要更改的&#xff1a;手机中的内网ip 密码 等 最后放到我的砸壳路径里&#xff1a; python dump.py -l查看应用…

JMeter 4.x 简单使用

文章目录 前言JMeter 4.x 简单使用1. 启动2. 设置成中文3. 接口测试3.1. 设置线程组3.2. HTTP信息请求头管理器3.3. 添加HTTP请求默认值3.4. 添加HTTP cookie 管理3.5. 添加http请求3.5.1. 添加断言 3.6. 添加监听器-查看结果树3.7. 添加监听器-聚合报告 4. 测试 前言 如果您觉…

打开的idea项目maven不生效

方法一&#xff1a;CtrlshiftA&#xff08;或者help---->find action&#xff09;&#xff0c; 输入maven&#xff0c; 点击add maven projects&#xff0c;选择本项目中的pom.xml配置文件&#xff0c;等待加载........ 方法二&#xff1a;view->tools windows->mave…

【RabbitMQ(day4)】SpringBoot整合RabbitMQ与MQ应用场景说明

一、SpringBoot 中使用 RabbitMQ 导入对应的依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>配置配置文件 spring:application:name: rabbitmq-springbo…

CSS3标题文本后的横线

示例代码 <template><div><h2 class"background">删除线</h2><h2 class"background"><span>左右两侧线</span></h2><h2 class"background double"><span>双层线</span></…

Android Glide MemorySizeCalculator计算值,Kotlin

Android Glide MemorySizeCalculator计算值,Kotlin for (i in 100..1000 step 50) {val calculator MemorySizeCalculator.Builder(this).setMemoryCacheScreens(i.toFloat()).setBitmapPoolScreens(i.toFloat()).setMaxSizeMultiplier(0.8f).setLowMemoryMaxSizeMultiplier(0…

【网络】网络层(IP协议)

目录 一、基本概念 二、协议头格式 三、网段划分 四、特殊的IP地址 五、IP地址的数量限制 六、私有IP地址和公网IP地址 七、路由 一、基本概念 IP协议&#xff1a;提供一种能力&#xff0c; 将数据从A主机送到B主机&#xff0c;&#xff08;TCP协议&#xff1a;确保IP协议…

Webpack5新手入门简单配置

1.初始化项目 yarn init -y 2.安装依赖 yarn add -D webpack5.75.0 webpack-cli5.0.0 3.新建index.js 说明&#xff1a;写入下面的一句话 console.log("hello webpack"); 4.执行命令 说明&#xff1a;如果没有安装webpack脚手架就不能执行yarn webpack&#xff08…

P1064 [NOIP2006 提高组] 金明的预算方案 (依赖背包问题)(内附封面)

[NOIP2006 提高组] 金明的预算方案 题目描述 金明今天很开心&#xff0c;家里购置的新房就要领钥匙了&#xff0c;新房里有一间金明自己专用的很宽敞的房间。更让他高兴的是&#xff0c;妈妈昨天对他说&#xff1a;“你的房间需要购买哪些物品&#xff0c;怎么布置&#xff0…

python 连接oracle pandas以简化excel的编写和数据操作

python代码 Author: liukai 2810248865qq.com Date: 2022-08-18 04:28:52 LastEditors: liukai 2810248865qq.com LastEditTime: 2023-07-06 22:12:56 FilePath: \PythonProject02\pandas以简化excel的编写和数据操作.py Description: 这是默认设置,请设置customMade, 打开koro…

Laravel 框架安装路由和控制器 ①

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; THINK PHP &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &#x1f44…

启动Flink显示初始化状态怎么解决?

启动Flink显示初始化状态怎么解决&#xff1f; Flink On Yarn模式 问题 flnk任务在跑的过程中&#xff0c; 有时候任务停掉了 &#xff0c;不过我有 定时任务&#xff0c;可以把失败的flink任务拉起来&#xff0c;但是因为最新的checkpoint做失败了&#xff0c;导致脚本无法拉…

mysql报错:name ‘_mysql‘ is not defined

原因是&#xff1a; Mysqldb 不兼容 python3.5 以后的版本 解决办法&#xff1a; 使用pymysql代替MySQLdb 在项目应用下的__init__.py 添加上去 import pymysqlpymysql.version_info (1, 4, 13, "final", 0) pymysql.install_as_MySQLdb()

ChatGPT即将取代程序员

W...Y的主页 相信ChatGPT大家已经都不陌生&#xff0c;我们经常会在工作和学习中应用。但是ChatGPT的发展速度飞快。功能也越来越全面。ChatGPT的文章也是层次不穷的出现&#xff0c;ChatGPT即将取代程序员的消息也铺天盖地。那ChatGPT真的会取代程序员吗&#xff1f;我们是否…

HDFS中的Federation联邦机制

HDFS中的Federation联邦机制 当前HDFS体系架构--简介局限性 联邦Federation架构简介好处配置示例 当前HDFS体系架构–简介 当前的HDFS结构有两个主要的层&#xff1a; 命名空间&#xff08;namespace&#xff09; 由文件&#xff0c;块和目录组成的统一抽象的目录树结构。由n…

300个智商测试FLASH智商游戏ACCESS数据库

最近在找IQ测试方面的数据&#xff0c;网上大多只留传着33道题这种类型&#xff0c;其他的又因各种条件&#xff08;比如图片含水印等&#xff09;不能弄&#xff0c;这是从测智网下载的一些测试智商的游戏数据&#xff0c;游戏文件是FLASH的&#xff0c;扩展名是SWF。 数据包总…

机器学习笔记之优化算法(七)线搜索方法(步长角度;非精确搜索;Wolfe Condition)

机器学习笔记之优化算法——线搜索方法[步长角度&#xff0c;非精确搜索&#xff0c;Wolfe Condition] 引言回顾&#xff1a; Armijo \text{Armijo} Armijo准则及其弊端 Glodstein \text{Glodstein} Glodstein准则及其弊端 Wolfe Condition \text{Wolfe Condition} Wolfe Condi…

在excel中整理sql语句

数据准备 CREATE TABLE t_test (id varchar(32) NOT NULL,title varchar(255) DEFAULT NULL,date datetime DEFAULT NULL ) ENGINEInnoDB DEFAULT CHARSETutf8mb4; INSERT INTO t_test VALUES (87896cf20b5a4043b841351c2fd9271f,张三1,2023/6/8 14:06); INSERT INTO t_test …

Maven可选依赖和排除依赖简单使用

可选依赖 可选依赖指对外隐藏当前所依赖的资源 在maven_04_dao的pom.xml,在引入maven_03_pojo的时候&#xff0c;添加optional <dependency><groupId>com.rqz</groupId><artifactId>maven_03_pojo</artifactId><version>1.0-SNAPSHOT&…