#zookeeper集群+kafka集群

kafka3.0之前是依赖于zookeeper的。

zookeeper是开源,分布式的架构。提供协调服务(Apache项目)

基于观察者模式涉及的分布式服务管理架构。

存储和管理数据。分布式节点上的服务接受观察者的注册。一旦分布式节点上的数据发生变化,由zookeeper负责通知分布式节点上的服务。

互相监督,互相监控,用于大数据分析场景。

zookeeper:分为领导者和追随者 leader follower组成的集群

只要有一半以上的集群存活,zookeeper集群就可以正常工作。适用于安装奇数台的服务集群

主要作用:全局数据一致,每个zookeeper都可以保存一份相同的数据。维护监控服务的数据一致。

数据更新的原子性。要么都成功要么都失败。

实时性,只要有变化,立刻同步。

zookeeper的应用场景:

1.统一命名服务,在分布式的环境下,对所有的应用和服务都进行统一命名

2.统一配置管理,配置文件同步,kafka的配置文件被修改可以快速同步到其他节点。

3.统一集群管理:实时掌握所有节点的状态。

4.服务器动态上下线

5.实现负载均衡,把访问服务器的数据发送到访问最少的服务器客户端的请求

领导者和追随者:

zookeeper的选举机制。

三台服务器为例 A B C

A先启动,发起一次选举,投票给自己,只有一票,不满半数,A的状态是looking

B启动,再发起一次选举,A和B分别投自己一票,交换选举信息,myid,A发现B的myid比A大,A的这一票会转而投给B

A 0 B 2 没有半数以上结果,A B会进入looking B有可能成为leader

C启动 MYID C的myid最大,A和B都会把票投给C,加上C自己的一票

A 0 B 0 C 3

C的状态变为leader,A和B会变为follower

只要leader确定,后续的服务器都是追随者。

只有两种情况会开启选举机制:

1.初始化的情况会产生选举

2.服务器之间和leader丢失了连接状态

leader已存在,建立连接即可

leader不存在,服务器id大的胜出(EPOCH大,直接胜出。EPOCH相同,事务id大的胜出)

EPOCH:每个leader任期的一个代号,没有leader,大家的逻辑地位相同,每投完一

次数据,数据都是递增的。

事务id:表示

服务器的每一次变更

服务器id:用来表示zookeeper集群当中,都有个id,每台机器不重复,和myid保持一致

实验部署

zookeeoer+kafka(2.7.0)

192.168.233.10 ---zookeeper+kafak  20.0.0.10
192.168.233.20 ---zookeeper+kafak
192.168.233.30 ---zookeeper+kafak2/4G

关闭防火墙
所有服务器
拖入kafak,zookeeper所有服务器
升级java环境
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-develjava -version

line 2
tickTime=2000
服务器与客户端的心跳时间(ms)
line 6
initLimit=10
领导者和追随者之间,初始连接时能够容忍的超时时间,最多几次心跳超时line 10
syncLimit=5
同步超时时间,领导者和追随者之间同步通信超时的时间,超过5*2就会认为follower丢失,移出集群line15
dataDir=/tmp/zookeeper
保存数据的目录,需要单独创建line18
clientPort=2181line15下添加
dataLogDir=/opt/zookeeper/logs最后一行添加整个集群信息
server.1=192.168.233.10:3188:3288
server.2=192.168.233.20:3188:3288
server.3=192.168.233.30:3188:3288

server.1=192.168.233.10:3188:3288 server.2=192.168.233.20:3188:3288 server.3=192.168.233.30:3188:3288

1:每个zookeeper集群的初始myid。

192.168.233.10:服务器的IP地址

3188:领导者和追随者之间交换信息的端口(内部通信端口)

3288:一旦leader丢失响应,开启选举,3288就是用来进行选举时的服务器通信端口。

根据zoo.cfg里面的目录路径创建路径

输入myid

test1,2,3

编辑脚本

vim /etc/init.d/zookeeper
编辑脚本#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)echo "---------- zookeeper 启动 ------------"$ZK_HOME/bin/zkServer.sh start
;;
stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo "---------- zookeeper 重启 ------------"$ZK_HOME/bin/zkServer.sh restart
;;
status)echo "---------- zookeeper 状态 ------------"$ZK_HOME/bin/zkServer.sh status
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esacwqchmod +x /etc/init.d/zookeeper每台服务器都添加
chkconfig --add zookeepertest1,2,3
service zookeeper startservice zookeeper status
查看谁是leader

消息队列KAFKA

为什么要引入消息队列(MQ),他是一个中间件。负责发消息。在高并发环境下,同步请求来不及处理。来不及处理的请求会形成阻塞。

比如数据库会进行行锁或者表锁。请求线程满了,超标了,会报错:too many connection。导致整个系统雪崩

消息队列的作用: 1.异步处理请求(核心)

允许用户把一个消息放入队列,但是不立即处理,等用户想处理的时候再处理

2.流量消减,应用解耦

耦合:在软件系统当中,修改一个组件需要修改其他组件,高度耦合

低度耦合:修改其中一个组件,对其他组件影响不大,无须修改所有

解耦:只要通信保证,其他的不影响整个集群,可以独立的扩展,修改,降低组件之间的依赖性。

依赖点就是接口约束,通过不同的端口,保证集群通信。

3.可恢复性:

系统当中的有一部分组件消失,不影响整个系统。也就是说在消息队列当中,即使有一个处理消息的进程失败,一旦恢复还可以重写加入到队列当中,继续处理消息。

4.缓冲:

可以控制和优化数据经过系统的时间和速度。解决生产消息和消费消息处理速度不一致的问题。

5.峰值的处理能力:

消息队列在峰值的情况下,能够顶住突发的访问压力。避免专门为了突发情况而对系统进行修改

消息队列的模式

点对点,一对一:消息的生产者发送消息到队列中,消费者从队列中提取消息,消费者提取完之后,队列中被提取的消息将会被移除。后续消费者不能再消费队列当中的消息。消息队列可以有多个消费者,但是一个消息,只能由一个消费者提取。

RABBITMQ

发布订阅模式:一对多,观察者模式。消费者提取数据之后队列当中的数据不会被清除。

生产者发布一个消息到主题。所有消费者都是通过主题获取消息。

主题:topic **

topic类似一个数据流管道,生产者把消息发布到主题。消费者从主题当中订阅数据。主题可以分区,每个分区都有自己的偏移量。

分区---partition**

每个主题都可以分成多个分区。每个分区是数据的有序子集,分区可以允许kafka进行水平扩展,以处理大量数据。

消息在分钟按照偏移量存储,消费者可以独立读取每个分区的数据

偏移量**

每个消息在分区当中唯一的标识。消费者可以通过偏移量来跟踪已读或者未读消息的位置。也可以提交这个偏移量来记录已处理的信息。

消费三种方式: 1.begin,从头开始,获取所有

2.实时获取,我只消费,后续产生的消息

3.指定偏移量消费,代码实现

生产者

生产者把数据发送kafka的主题当中,负责写入消息。

消费者

从主题当中读取数据,消费者可以是一个也可以是多个。每个消费者有一个唯一的消费者ID,Kafka通过消费者实现负载均衡和容错性。

经纪人---Broker

每个Kafka节点都有一个borker,每个负责一台Kafka服务器,id唯一,存储主题分区当中数据,处理生产和消费者的请求。

维护元数据(zookeeper)

zookeeper:zookeeper负责保存元数据,元数据就是topic的相关信息(发在哪台主机上,指定了多少分区,以及副本数,偏移量)

zookeeper自建一个主题:_consumer_offsets

3.0之后不依赖zookeeper的核心。元数据由Kafka节点自己管理。

Kafka的工作流程

生产者写入topic的数据是持久化的,默认7小时

至少一次语义:只要消费者进入,确保消息至少被消费一次

实验部署

test1,2,3
cd /opt
tar -xf kafka
mv kafka_2,13..   kafka设置环境变量
vim /etc/profile
最后插入
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profilecd kafka
cd config/
lsserver.properties是主配置文件

server.properties是主配置文件

先备份

test1 2 3    每台主机配置有些许不同
vim server.propertiesline 21  60  123line 21
broker.id=1
//同时也是leader的id,三台主机,不重复即可line 28
如果用broker进行修改,该行可以不改line 42
num.network.thread=3
处理网络的线程数量,默认即可line 46
num.io.thread=8
处理磁盘的io线程数量,这个值一定要比硬盘数大line 50
socket.send.buffer.bytes=102400
发送套接字的缓存大小line 54
socket.receive.buffer.bytes=102400
接受者的接受套接字缓冲区大小line58
socket.request.max.bytes=104857600
请求套接字的缓冲区大小line 65
log.dirs=/var/log/kafkaline 70
num.partitions=1
在此Kafka服务器上创建topic,默认分区数。如果指定了,这个配置无效了。line75
num.recovery.thread.per.data.dir=1
用来恢复,回收,清理data下的数据的线程数量。Kafka默认不允许删除主题。line110(103)
log.retention.hours=168
生产者发布的数据文件在主题当中保存的时间,单位-小时line130(123)
zookeeper.connect=192.168.233.10:2181,192.168.233.20:2181,192.168.233.30:2181
//指定zookeeper集群wq
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

test1,2,3

设置Kafka启动脚本

#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/opt/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esacwqchmod +x /etc/init.d/kafka
chkconfig --add kafka
service kafka startnetstat -antp | grep 9092

test2--创建主题

kafka所有命令都在bin下,所以执行命令都得去bin目录下执行

cd /opt/kafka/bin/kafka-topics.sh --create --zookeeper 20.0.0.10:2181,20.0.0.20:2181,20.0.0.60:2181 --replication-factor 2 --partitions 3 --topic test1--replication-factor 2:创建的副本数,副本的作用就是冗余

创建主题成功

1.在Kafka的bin目录下,是所有kafka可执行命令文件

2.--zookeeper指定的是zookeeper的地址和端口,保存kafka的元数据

3.--replication-factor 2 定义每个分区的副本数

4.partitions 3 指定主题的分区数

 5.--topic test1 指定主题的名称

Partition:分区编号

Leader:每个分区都有一个领导者(Leader),领导者负责处理分区的读写操作。 在上述输出中,领导者的编号分别为 3、1、3。

Replicas:每个分区可以有多个副本(Replicas),用于提供冗余和容错性。 在上述输出中,Replica 3、1、2 分别对应不同的 Kafka broker。

Isr:ISR(In-Sync Replicas)表示当前与领导者保持同步的副本。 ISR 3、1分别表示与领导者同步的副本。

test1,2,3
做映射
20.0.0.10 test1
20.0.0.20 test2
20.0.0.60 test3

发布消息

bin目录下
test2
kafka-console-producer.sh --broker-list 20.0.0.10:9092,20.0.0.20:9092,20.0.0.60:9092 --topic test1

kafka-console-consumer.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.60:9092 --topic test2  --from-beginning

test2

创建主题必须要创建分区,分区一定要给副本数

kafka-topics.sh --create --zookeeper 192.168.233.20:2181 --replication-factor 1 --partitions 1 --topic gq1test3
kafka-topics.sh --create --zookeeper 192.168.233.20:2181 --replication-factor 1 --partitions 1 --topic gq2test1
kafka-console-consumer.sh --bootstrap-server 192.168.233.20:9092 --topic gq1kafka-console-consumer.sh --bootstrap-server 192.168.233.20:9092 --topic gq2test2
发起
kafka-console-producer.sh --broker-list 192.168.233.20:9092 -topic gq1

修改分区数

kafka-topics.sh --zookeeper 192.168.233.20:2181 --alter --topic gq1 --partitions 3test2
kafka-topics.sh --describe --zookeeper 192.168.233.20:2181 --alter --topic gq1 kafka-topics.sh --delete --zookeeper 192.168.233.20:2181 --topic gq1

总结:

1.zookeeper:主要是分布式,观察者模式。统一各个服务器节点的数据。

在Kafka当中,他就是收集保存Kafka的元数据。

2.kafka消息队列,订阅发布模式

RABBIT MQ 轻量级

KAFKA 速度快,占用资源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/189034.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

公共英语三级阅读理解一篇,附带答案

公共英语三级阅读理解 The food we eat seems to have profound effects on our health.Although science has made enormous steps in making food more fit to eat, it has, at the same time, made many foods unfit to eat.Some research has shown…

【快速见刊|投稿优惠】2024年机电一体与自动化技术国际学术会议(IACMAT 2024)

2024年机电一体与自动化技术国际学术会议(IACMAT 2024) 2024 International Academic Conference on Mechatronics and Automation Technology(IACMAT 2024) 一【会议简介】 2024年机电一体与自动化技术国际学术会议(IACMAT 2024)即将召开,它以“机电一体&#xff0…

2023年【安全员-B证】最新解析及安全员-B证免费试题

题库来源:安全生产模拟考试一点通公众号小程序 安全员-B证最新解析是安全生产模拟考试一点通生成的,安全员-B证证模拟考试题库是根据安全员-B证最新版教材汇编出安全员-B证仿真模拟考试。2023年【安全员-B证】最新解析及安全员-B证免费试题 1、【多选题…

用友U8 ERP和面粉行业专版系统接口集成方案

面粉加工行业面临着数据管理和业务流程自动化的挑战。众诚ERP系统和用友U8系统的数据集成是解决这一挑战的关键。 解决方案 轻易云平台提供了一套完善的数据同步和集成解决方案,包括以下几个方面: 基础资料同步:包括物料、客户、供应商、仓…

解决:AttributeError: ‘NoneType’ object has no attribute ‘shape’

解决:AttributeError: ‘NoneType’ object has no attribute ‘shape’ 文章目录 解决:AttributeError: NoneType object has no attribute shape背景报错问题报错翻译报错位置代码报错原因解决方法今天的分享就到此结束了 背景 在使用之前的代码时&…

【数值计算方法(黄明游)】矩阵特征值与特征向量的计算(二):Jacobi 过关法(Jacobi 旋转法的改进)【理论到程序】

文章目录 一、Jacobi 旋转法1. 基本思想2. 注意事项 二、Jacobi 过关法1. 基本思想2. 注意事项 三、Python实现迭代过程(调试) 矩阵的特征值(eigenvalue)和特征向量(eigenvector)在很多应用中都具有重要的数…

Spring Task

Spring Task 是Spring框架提供的任务调度工具,可以按照约定的时间自动执行某个代码逻辑。 **定位:**定时任务框架 **作用:**定时自动执行某段Java代码 cron表达式 cron表达式其实就是一个字符串,通过cron表达式可以定义任务触…

c语言:模拟实现atoi函数

atoi函数的功能和用法&#xff1a; 主要功能&#xff1a;将字符串转换为整数。例如&#xff0c;将字符类型的“123”转换为整数123. #include <stdio.h> #include <stdlib.h>int main() {char str[] "123";int num atoi(str);printf("Converted …

【matlab程序】画海洋流场

【matlab程序】画海洋流场 clear;clc; file ( ‘0227.nc’); latncread(file,‘latitude’); lonncread(file,‘longitude’); uncread(file,‘water_u’); vncread(file,‘water_v’); [x,y]meshgrid(lon,lat); xx’; yy’; interval4; figure (1) set(gcf,‘color’,[1 1 1…

Period of an Infinite Binary Expansion(分析+欧拉)

传送阵&#xff1a;NEFU2022-Eulers totient function - Virtual Judge 思路&#xff1a; 对于一个小于1的数&#xff0c;化为二进制&#xff0c;找第一次进入循环的位置和最小循环周期。 我们设第一次进入循环的位置是i&#xff0c;第一次循环结束后&#xff0c;再次进入循…

3D云参观红色革命纪念馆允许更多人在线交流、体验

生活在和平年代的新一代青少年&#xff0c;可能对革命先烈英勇事迹难以有很深的体会&#xff0c;无法切实感受到中国共产党无畏牺牲、誓死保家卫国的红色精神&#xff0c;因此借助VR虚拟现实制作技术&#xff0c;让参观者们走近革命先烈中&#xff0c;感受老一辈无产阶级革命家…

高德地图全国行政区域信息

行政区域查询-API文档-开发指南-Web服务 API | 高德地图API private static void tm1(String s) throws IOException {String url"https://restapi.amap.com/v3/config/district?keywords中华人民共和国&subdistrict3&key用户key";String sx OkHttpUtils.g…

domjudge题目配置和开比赛

系统使用的是7.3.3&#xff0c;domjudge配置的方法请参考前文 domjudge配置-CSDN博客 题目导入 传统比较 首先可以去domjudge中随便下载一个题目&#xff0c;下载下来的压缩包应该是这样的 │ domjudge-problem.ini │ problem.pdf │ problem.yaml │ └─data└─sec…

模型层(回顾补充)

1.1基本使用 orm框架---》对象关系映射 数据库中&#xff1a;一个个表 &#xff1a;user表&#xff0c;book表&#xff0c;一条条的记录 程序中&#xff1a;一个个类&#xff0c;一个个对象 以后数据库中一张表---》对应程序中一个类 以后数据库中一条记录--》对应…

12月01日,每日信息差//阿里国际发布3款AI设计生态工具//美团买菜升级为“小象超市”//外国人永居证换新、6国游客免签来华

_灵感 &#x1f396; 阿里国际发布3款AI设计生态工具 &#x1f384; AITO问界系列11月交付新车18827辆 &#x1f30d; 美团买菜升级为“小象超市” &#x1f30b; 全球首个金融风控大模型国际标准出炉&#xff0c;由腾讯牵头制定 &#x1f381; 支付宝&#xff1a;支持外国人…

Python函数关键字参数及用法

在定义 Python 函数时可定义形参&#xff08;形式参数的意思&#xff09;&#xff0c;这些形参的值要等到调用时才能确定下来&#xff0c;由函数的调用者负责为形参传入参数值。简单来说&#xff0c;就是谁调用函数&#xff0c;谁负责传入参数值。 Python 函数的参数名不是无意…

Appium 元素定位与常用方法,让你轻松玩转自动化测试!

对测试人来说&#xff0c;Appium 是非常重要的一个开源跨平台自动化测试工具&#xff0c;它允许测试人员在不同的平台&#xff08;iOS、Android 等&#xff09;使用同一套 API 来写自动化测试脚本&#xff0c;这样可大幅提升代码复用率和工作效率。 本文汇总了从 Appium 基础到…

Python列表切片操作详解:提取、复制、反转等应用示例

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在Python中&#xff0c;列表切片是处理列表数据非常强大且灵活的方法。本文将全面探讨Python中列表切片的多种用法&#xff0c;包括提取子列表、复制列表、反转列表等操作&#xff0c;结合丰富的示例代码进行详细…

直饮水表与智能水表有哪些区别?

随着科技的不断进步,智能家居的概念正逐渐深入人们的生活。其中,直饮水表和智能水表作为创新科技的代表,在水资源的使用和管理方面发挥了重要作用。然而,这两者之间存在一些关键的区别。那么&#xff0c;直饮水表与智能水表到底有哪些区别呢&#xff1f; 直饮水表和智能水表都是…

java+springboot学生宿舍公寓管理系统xueshenggongy

经过查阅资料和调查统计发现&#xff0c;高校学生宿舍管理工作变得越来越繁重和琐碎&#xff0c;如在学生住宿安排&#xff08;特别是新生住宿安排&#xff09;、宿舍大幅调换、公共设施统计维护、宿舍杂费统计收取、宿舍卫生管理统计、出入登记记录等各个方法存在着大量问题和…