分布式异步任务处理组件(七)

分布式异步任务处理组件底层网络通信模型的设计--如图:

  1. 使用Java原生NIO来实现TCP通信模型
  2. 普通节点维护一个网络IO线程,负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写,上层会有另一个线程负责网络通信协议的实现;---也就是说维护一个selector线程,负责处理socketchannel的IO事件;
  3. Leader节点网络通信层有多个线程--一个selector线程负责接受其他节点的连接请求,然后为每个连接建立一个线程并分配单独的selector来处理各自连接上的IO事件--如此设计的原因是各节点的状态严格依赖与主节点的心跳和其他通信,防止主节点线程阻塞导致心跳失败;从而引发节点下线带来的大量同步工作--后续会聊到;
  4. 各节点网络通信线程之上会有一个线程专门负责组件的网络通信协议,就是将网络传输的字节流解码成组件的通信协议包,因为NIO的buffer是数据块,所以首先通过读写队列将字节转化为字节流,通过协议转化为网络通信命令包,同时解决粘包半包等问题;
  5. 网络通信线程和协议实现线程之间通过读写两个队列来实现(网络IO线程的读队列就是协议线程的写队列,反过来一样,所以这里读写队列是相对的;),为了保证性能,避免重复创建对象和对象回收,设计了ByteBuffer缓存机制和异步读写队列数据结构--详细结构如图--
  6. 说一下三个队列--读写队列和缓存队列,用来实现IO通信线程和协议通信线程之间的数据通信--两个线程基本上会轮训处理网络IO事件,和上层协议事件,基本过程如下--
    1. 从网络IO线程角度出发--
      1. 当产生可读事件时,网络IO线程会从缓存队列中获取一个空的ByteBuffer,这里设计为当没有可用的缓存Buffer对象时会新建一个--具体在队列实现里讲,可能会产生写扩张现象,后期性能优化时考虑加入回收机制;
      2. 将socket缓冲区中的网络数据read进Buffer中,然后将Buffer对象入队到IO写队列中;
      3. 然后检查IO读队列不为空时,对IO读队列出队,获取要发送的数据Buffer对象,发送到其他节点中;
  7. 异步多线程队列,支持两个线程同时出队入队操作;原理和代码贴下来,基本实现:
package org.example.web.buffer;import org.example.web.api.SocketBufferQueue;import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {//异步读写队列实现原理;/** 当队列中的元素个数>1时,读线程和写线程可以同时进行,因为这时候不涉及操作共享变量*当队列中的元素个数<=1时,读写队列中只能有一个线程操作读或者写,因为此时会涉及队列头尾指针的操作;* 实现原理,写线程在获取写锁时可以正常做写操作:此时有两种情况--*     1,获取写锁之后队列为空,此时不会有读线程做读操作,只有获得写锁的该线程可以put,put完成之后将头尾指针同时指向改为以元素即可;此时队列元素个数为1;*     2,获取写锁之后队列中只有一个元素,这时也可以保证只有该线程在做写入,因为只有一个元素的情况下,读线程要读取该元素必须同时获得读锁和写锁;此时队列元素个数为2;*     3,读线程获取读锁之后有三种情况;size>1;size=1;size=0;*     4, 重点是保证不能多个线程同时进入队列元素为零的状态;就是读线程消费了最后一个元素,正好此时写线程在队列为空的时候写入,读写线程会同时操作头尾指针,造成错乱,所以在元素数量为1* 的时候就要进行同步操作;原理:*           1.读线程获取读锁之后如果size=1,此时不会先消费,而是试图获取写锁,防止此时有写线程同时操作,获取写锁之后再判断size是否为1,如果为1则做出队操作,然后释放写锁,如果为2则直接释放写锁--再进行出队操作;*           2,这里读线程获取读锁之后判断size=1,再获取读锁成功之后有两种情况--*                   1,有写线程在读线程之前获取到了写锁,则读线程获取到写锁的时候size>=2了(可能不止一个),*                   2,判断size=1之后直接获取到了写锁,此时就应该阻塞其他写线程做入队操作,等待自己完成出队操作之后再释放写锁;*     5,再说一下size怎么保证同步,*           1,在size<=1的时候严格保证线程同步操作,保证size;*           2,在size>1的时候,此时可以理解为队列同时在出队和入队,size在两个线程操作的时候先出队-1还是先入队+1其实是没有关系的,因为原子操作保证了最后结果是没有问题的就行;* */private AtomicInteger size;protected T head;protected T tail;private Object readLock;private Object writeLock;//这里考虑使用cas还是SynchronizedAsynchronousQueue(){this.writeLock=new Object();this.readLock=new Object();}AsynchronousQueue(int initSize){this();this.size=new AtomicInteger(initSize);}//空队列初始化要创建一个nodeAsynchronousQueue(T node){this(1);this.head=node;this.tail=this.head;}public boolean offerFirstOne(T node){synchronized (this.writeLock){if(this.size.get()>0){return false;}this.head=this.tail=node;return this.size.compareAndSet(0,1);}}public boolean offer(T node){preOfferElement(node);synchronized (this.writeLock){if(this.size.get()==0){return this.offerFirstOne(node);}else{T temp=this.head;node.next=temp;temp.pre=node;this.head=node;}return this.size.incrementAndGet() > 1;}}private void preOfferElement(T bufferNode){bufferNode.next=null;bufferNode.pre=null;}public T pollLastOne(){return this.size.compareAndSet(1,0)?this.tail:null;}public T poll(){synchronized (this.readLock){if(this.size.get()==0){return null;}if(this.size.get()==1){synchronized (this.writeLock){if(this.size()>1){return this.getTailElement();}if(this.size()==1){this.pollLastOne();}}}return this.getTailElement();}}private T getTailElement(){if(this.size()>1){this.tail= (T) this.tail.pre;this.size.decrementAndGet();return (T) this.tail.next;}return null;}public int size(){return this.size.get();}public int increamentSize(){return this.size.incrementAndGet();}public int decrementSize(){return this.size.decrementAndGet();}private class BufferNode{private ByteBuffer buffer;private BufferNode pre;private BufferNode next;BufferNode(ByteBuffer byteBuffer){this.buffer=byteBuffer;}BufferNode(){}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/23628.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PoseiSwap:基于 Nautilus Chain ,构建全新价值体系

在 DeFi Summer 后&#xff0c;以太坊自身的弊端不断凸显&#xff0c;而以 Layer2 的方式为其扩容成为了行业很长一段时间的叙事方向之一。虽然以太坊已经顺利的从 PoW 的 1.0 迈向了 PoS 的 2.0 时代&#xff0c;但以太坊创始人 Vitalik Buterin 表示&#xff0c; Layer2 未来…

iOS-砸壳篇(两种砸壳方式)

CrackerXI砸壳呢&#xff0c;当时你要是使用 frida-ios-dump 也是可以的&#xff1b; https://github.com/AloneMonkey/frida-ios-dump frida-ios-dump: 代码中需要更改的&#xff1a;手机中的内网ip 密码 等 最后放到我的砸壳路径里&#xff1a; python dump.py -l查看应用…

SpringBoot自定义注解 + AOP+分布式Redis 防止重复提交

第一步 引入依赖pom.xml&#xff1a; <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.3</version> <!-- 使用最新版本 --></dependency><dependency><groupId&g…

单片机传感器类型一览

一、单片机&#xff1a; c51、attiny85 < arduino、stm8/stm32 < esp8266、esp32、raspiberry pico 功能简单型&#xff1a;C51、Attiny85等 功能可以较复杂型&#xff1a;Arduino < STM32 联网、人工智能等高级复杂型&#xff1a;esp8266 < esp32、pico …

docker-compose

文章目录 一、docker-compose简介二、docker-compose安装1.安装命令2.卸载旧版本3.修改权限4、常用命令 总结 一、docker-compose简介 示例&#xff1a;pandas 是基于NumPy 的一种工具&#xff0c;该工具是为了解决数据分析任务而创建的。 二、docker-compose安装 1.安装命令…

JMeter 4.x 简单使用

文章目录 前言JMeter 4.x 简单使用1. 启动2. 设置成中文3. 接口测试3.1. 设置线程组3.2. HTTP信息请求头管理器3.3. 添加HTTP请求默认值3.4. 添加HTTP cookie 管理3.5. 添加http请求3.5.1. 添加断言 3.6. 添加监听器-查看结果树3.7. 添加监听器-聚合报告 4. 测试 前言 如果您觉…

打开的idea项目maven不生效

方法一&#xff1a;CtrlshiftA&#xff08;或者help---->find action&#xff09;&#xff0c; 输入maven&#xff0c; 点击add maven projects&#xff0c;选择本项目中的pom.xml配置文件&#xff0c;等待加载........ 方法二&#xff1a;view->tools windows->mave…

【RabbitMQ(day4)】SpringBoot整合RabbitMQ与MQ应用场景说明

一、SpringBoot 中使用 RabbitMQ 导入对应的依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>配置配置文件 spring:application:name: rabbitmq-springbo…

CSS3标题文本后的横线

示例代码 <template><div><h2 class"background">删除线</h2><h2 class"background"><span>左右两侧线</span></h2><h2 class"background double"><span>双层线</span></…

Python 扩展 快捷贴士:os模块下的创建目录的方式

Python3 os.makedirs() 方法 概述 os.makedirs() 方法用于递归创建多层目录。 如果子目录创建失败或者已经存在&#xff0c;会抛出一个 OSError 的异常&#xff0c;Windows上Error 183 即为目录已经存在的异常错误。 如果第一个参数 path 只有一级&#xff0c;即只创建一层目…

ADC前级的抗混叠滤波器(anti-alias filter设计

可以加我微信hezkz17申请加入数字音频系统研究开发交流答疑群(课题组) A/D前的抗混叠滤波器(anti-alias filter)设计方法? 作用, 抗混叠滤波器是在进行模数转换(A/D转换)之前,为了防止混叠现象而使用的滤波器。混叠是指当输入信号的频率超过采样率的一半时,在采样过程…

Android Glide MemorySizeCalculator计算值,Kotlin

Android Glide MemorySizeCalculator计算值,Kotlin for (i in 100..1000 step 50) {val calculator MemorySizeCalculator.Builder(this).setMemoryCacheScreens(i.toFloat()).setBitmapPoolScreens(i.toFloat()).setMaxSizeMultiplier(0.8f).setLowMemoryMaxSizeMultiplier(0…

后端登录安全的一种思路

PS:作者是小白能接触到的就只会这样写。勿喷。 前提 思路: 结合io流将登录token存储到配置文件中,不将token存储到浏览器端&#xff0c;从而避免盗取。 下面jwt的学习可以参考下这个: JWT --- 入门学习_本郡主是喵的博客-CSDN博客 JWT工具类 Component public class JWTtU…

用html+javascript打造公文一键排版系统12:删除附件说明中“附件:”里的空格

如果我们在输入附件说明时在“附件&#xff1a;”之间加入空格&#xff0c;那么排版时就要删除这些空格。 因为string对象replace()支持正则表达式&#xff0c;于是考虑用replace()来完成。 写了一段只有一个多余空格的代码来测试&#xff1a; <!DOCTYPE HTML> <HT…

【网络】网络层(IP协议)

目录 一、基本概念 二、协议头格式 三、网段划分 四、特殊的IP地址 五、IP地址的数量限制 六、私有IP地址和公网IP地址 七、路由 一、基本概念 IP协议&#xff1a;提供一种能力&#xff0c; 将数据从A主机送到B主机&#xff0c;&#xff08;TCP协议&#xff1a;确保IP协议…

QT 驱动条码打印机(没有验证过)

这里的打印机是条码打印机&#xff0c;因为第一次接触这种设备&#xff0c;所以买了斑马的GK888t型条码打印机&#xff0c;据说ZPL语言就是斑马的杰作想必支持会好点。实际是&#xff0c;除了ZPL本身外&#xff0c;没有SDK&#xff0c;也没有DDK&#xff0c;所以&#xff0c;一…

《OWASP代码审计》学习——注入漏洞审计

一、注入的概念 注入攻击允许恶意用户向应用程序添加或注入内容和命令&#xff0c;以修改其行为。这些类型的攻击是常见且广泛的&#xff0c;黑客很容易测试网站是否易受攻击&#xff0c;攻击者也很容易利用这些攻击。如今&#xff0c;它们在尚未更新的遗留应用程序中非常常见…

Webpack5新手入门简单配置

1.初始化项目 yarn init -y 2.安装依赖 yarn add -D webpack5.75.0 webpack-cli5.0.0 3.新建index.js 说明&#xff1a;写入下面的一句话 console.log("hello webpack"); 4.执行命令 说明&#xff1a;如果没有安装webpack脚手架就不能执行yarn webpack&#xff08…

Docker dockerfile 案例:centos 支持 vim

创建一个 centos 容器&#xff0c;容器内默认是不支持使用 vim 指令的&#xff0c;只能使用 vi 指令。&#xff08;附&#xff1a;Dockerfile 语法与指令&#xff09; 但想在创建 centos 容器后就支持 vim 指令&#xff0c;需要自定义 centos&#xff0c;编写 dockerfile&…

h5浏览pdf文件

将hybrid整个复制到一级文件夹下 hybrid地址&#xff1a;https://download.csdn.net/download/qq_37194189/88157330 创建一个 pdf页面用于展示pdf文件 <template><view style"width: 100%;" ><web-view :src"pdfUrl"></web-view&…