zookeeper+kafka消息队列群集部署

一、消息队列

1.消息队列

消息是应用间传送的数据

消息队列是应用见的通信方式,消息发送后立即返回,由消息系统确保消息可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

2.消息队列特征

(1)存储

与依赖于使用套接字的基本 TCP 和 UDP 协议的传统请求和响应系统不同,消息队列通常将消息存储在某种类型的缓冲区中,直到目标进程读取这些消息或将其从消息队列中显式移除为止。

(2)异步

    与请求和响应系统不同,消息队列通过缓冲消息可以在应用程序中公开一定程度的异步性,允许源进程发送消息并在队列中累积消息,而目标进程则可以挑选消息进行处理。 这样,应用程序就可以在某些故障情况下运行,例如连接断断续续或源进程或目标进程故障。

路由:消息队列还可以提供路由功能,其中多个进程可以在同一队列中读取或写入消息,从而实现广播或单播通信模式。

二、Kafka

1.kafka基本概念

      Kafka是一种高吞吐量的分布式发布/订阅消息系统,即生产者生产(produce)各种数据,消费者(consume)消费(分析、处理)这些数据。

    kafka是Apache组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop平台的数据分析、低时延的实时系统、storm/spark流式处理引擎等。kafka现在它已被多家大型公司作为多种类型的数据管道和消息系统使用。

2.kafka角色术语

kafka的一些核心概念和角色

  1. Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker。
  2. Topic:每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)。
  3. Producer:指消息的生产者,负责发布消息到kafka broker。
  4. Consumer:指消息的消费者,从kafka broker拉取数据,并消费这些已发布的消息。
  5. Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition,每个partition都是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  6. Consumer Group:消费者组,可以给每个Consumer指定消费组,若不指定消费者组,则属于默认的group。
  7. Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息。

3:kafka拓扑架构

 从图中可以看出,典型的消息系统有生产者(Producer),存储系统(broker)和消费者(Consumer)组成,Kafka作为分布式的消息系统支持多个生产者和多个消费者,生产者可以将消息分布到集群中不同节点的不同Partition上,消费者也可以消费集群中多个节点上的多个Partition。在写消息时允许多个生产者写到同一个Partition中,但是读消息时一个Partition只允许被一个消费组中的一个消费者所消费,而一个消费者可以消费多个Partition。也就是说同一个消费组下消费者对Partition是互斥的,而不同消费组之间是共享的。

    kafka支持消息持久化存储,持久化数据保存在kafka的日志文件中,在生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,为了减少磁盘写入的次数,broker会将消息暂时缓存起来,当消息的个数或尺寸、大小达到一定阀值时,再统一写到磁盘上,这样不但提高了kafka的执行效率,也减少了磁盘IO调用次数。

kafka中每条消息写到partition中,是顺序写入磁盘的,这个很重要,因为在机械盘中如果是随机写入的话,效率将是很低的,但是如果是顺序写入,那么效率却是非常高,这种顺序写入磁盘机制是kafka高吞吐率,适合海量数据。

4.Topic和partition

       Kafka中的topic(主题)是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。推荐partition的数量一定要大于同时运行的consumer的数量。另外,建议partition的数量要小于等于集群broker的数量,这样消息数据就可以均匀的分布在各个broker中。

5.Producer生产机制

      Producer是消息和数据的生产者,它发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置的合理,所有消息都可以均匀分布到不同的Partition里,这样就实现了数据的负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。

6.Consumer消费机制        

       Kafka发布消息通常有两种模式:队列模式(queuing)和发布/订阅模式(publish-subscribe)。在队列模式下,只有一个消费组,而这个消费组有多个消费者,一条消息只能被这个消费组中的一个消费者所消费;而在发布/订阅模式下,可有多个消费组,每个消费组只有一个消费者,同一条消息可被多个消费组消费。  

Kafka中的Producer和consumer采用的是push、pull的模式,即producer向 broker进行push消息,comsumer从bork进行pull消息,push和pull对于消息的生产和消费是异步进行的。pull模式的一个好处是consumer可自主控制消费消息的速率,同时consumer还可以自己控制消费消息的方式是批量的从broker拉取数据还是逐条消费数据。

三、zookeeper

        ZooKeeper是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。

        ZooKeeper是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。

 1.zookeeper 应用举例

(1)单点故障

       所谓单点故障,就是在一个主从的分布式系统中,主节点负责任务调度分发,从节点负责任务的处理,而当主节点发生故障时,整个应用系统也就瘫痪了,那么这种故障就称为单点故障。那我们的解决方法就是通过对集群master角色的选取,来解决分布式系统单点故障的问题。

(2) 传统的方式解决单点故障

       传统的方式是采用一个备用节点,这个备用节点定期向主节点发送ping包,主节点收到ping包以后向备用节点发送回复Ack信息,当备用节点收到回复的时候就会认为当前主节点运行正常,让它继续提供服务。而当主节点故障时,备用节点就无法收到回复信息了,此时,备用节点就认为主节点宕机,然后接替它成为新的主节点继续提供服务。

      但是会出现主节点并没有出现故障,只是在回复ack响应的时候网络发生了故障,这样备用节点就无法收到回复,那么它就会认为主节点出现了故障,接着,备用节点将接管主节点的服务,并成为新的主节点,此时,分布式系统中就出现了两个主节点(双Master节点)的情况,双Master节点的出现,会导致分布式系统的服务发生混乱。这样的话,整个分布式系统将变得不可用。为了防止出现这种情况,就需要引入ZooKeeper来解决这种问题。

2.zookeeper的工作原理

(1)master启动

       在分布式系统中引入Zookeeper以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是 主节点A 和 主节点B,当两个主节点都启动后,它们都会向ZooKeeper中注册节点信息。我们假设 主节点A 锁注册的节点信息是 master00001 , 主节点B 注册的节点信息是 master00002 ,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法,那么编号最小的节点将在选举中获胜并获得锁成为主节点,也就是 主节点A 将会获得锁成为主节点,然后 主节点B 将被阻塞成为一个备用节点。这样,通过这种方式Zookeeper就完成了对两个Master进程的调度。完成了主、备节点的分配和协作。

(2)master故障

        如果 主节点A 发生了故障,这时候它在ZooKeeper所注册的节点信息会被自动删除,而ZooKeeper会自动感知节点的变化,发现 主节点A  故障后,会再次发出选举,这时候  主节点B  将在选举中获胜,替代  主节点A  成为新的主节点,这样就完成了主、被节点的重新选举。

(3)master恢复

        如果主节点恢复了,它会再次向ZooKeeper注册自身的节点信息,只不过这时候它注册的节点信息将会变成 master00003 ,而不是原来的信息。ZooKeeper会感知节点的变化再次发动选举,这时候 主节点B 在选举中会再次获胜继续担任  主节点 , 主节点A 会担任备用节点。

zookeeper就是通过这样的协调、调度机制如此反复的对集群进行管理和状态同步的。

3.zookeeper集群架构

 

Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。

follower:跟随着角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。

observer:观察者角色,用户接收客户端的请求,并将写请求转发给leader,同时同步leader状态,但是不参与投票。Observer目的是扩展系统,提高伸缩性。

client:客户端角色,用于向zookeeper发起请求。

4.zookeeper工作流程

     Zookeeper修改数据的流程:Zookeeper集群中每个Server在内存中存储了一份数据,在Zookeeper启动时,将从实例中选举一个Server作为leader,Leader负责处理数据更新等操作,当且仅当大多数Server在内存中成功修改数据,才认为数据修改成功。

     Zookeeper写的流程为:客户端Client首先和一个Server或者Observe通信,发起写请求,然后Server将写请求转发给Leader,Leader再将写请求转发给其它Server,其它Server在接收到写请求后写入数据并响应Leader,Leader在接收到大多数写成功回应后,认为数据写成功,最后响应Client,完成一次写操作过程。

四、Zookeeper 在 Kafka 中的作用

1.broker 注册

2.topic注册

3.生产者负载均衡

4.消费者负载均衡

5.分区与消费者关系

6.消息消费进度offset记录

7.消费者注册

五、案例分析-群集部署kafka

更改名称:hostnamectl    set-hostname  kafka1
                  bash

[root@kafka1 ~]# vim /etc/hosts

192.168.10.101  kafka1

192.168.10.102  kafka2

192.168.10.103  kafka3

[root@kafka1 ~]# systemctl stop firewalld
[root@kafka1 ~]# setenforce 0

[root@kafka1 ~]# yum -y install java

1.zookeeper的部署

(1)安装zookeeper(三个节点的配置相同) 

[root@kafka1 ~]# yum -y install java

[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz

[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper

(2)创建数据保存目录(三个节点的配置相同)

[root@kafka1 ~]# cd /etc/zookeeper/

[root@kafka1 zookeeper]# mkdir zookeeper-data

(3)修改配置文件(三个节点的配置相同)

[root@kafka1 zookeeper]# cd /etc/zookeeper/conf

[root@kafka1 ~]# mv zoo_sample.cfg zoo.cfg

[root@kafka1 ~]# vim zoo.cfg

dataDir=/etc/zookeeper/zookeeper-data

clientPort=2181

server.1=192.168.10.114:2888:3888

server.2=192.168.10.115:2888:3888

server.3=192.168.10.116:2888:3888

备注:

zookeeper只用的端口

2181:对cline端提供服务

3888:选举leader使用

2888:集群内机器通讯使用(Leader监听此端口)

(4)创建节点id文件(按server编号设置这个id,三个机器不同)

节点1:

[root@kafka1 conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid

节点2:

[root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid

节点3:

[root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid

(5)三个节点启动zookeeper进程

[root@kafka1 conf]# cd /etc/zookeeper/

[root@kafka1 zookeeper]# ./bin/zkServer.sh start        (启动)

[root@kafka1 zookeeper]# ./bin/zkServer.sh status       (状态)

(6)查看各个节点进程

2.kafka的部署

(1)kafka的安装(三个节点的配置相同)

[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz

[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka

(2)修改配置文件

[root@kafka1 ~]# cd /etc/kafka/

[root@kafka2 kafka]# vim config/server.properties

broker.id=1                                     ##21行  修改,注意其他两个的id分别是2和3

listeners=PLAINTEXT://192.168.10.101:9092       #31行  修改,其他节点改成各自的IP地址

log.dirs=/etc/kafka/kafka-logs ## 60行  修改

num.partitions=1                 ##65行 分片数量,不能超过节点数

zookeeper.connect=192.168.10.114:2181,192.168.10.115:2181,192.168.10.116:2181

备注:

9092是kafka的监听端口

(3)创建日志目录(三个节点的配置相同)

[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs

(4)在所有kafka节点上执行开启命令,生成kafka群集(三个节点的配置相同)

[root@kafka1 kafka]# ./bin/kafka-server-start.sh /etc/kafka/config/server.properties &

3.测试

创建topic(任意一个节点)

bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test

列出topic(任意一个节点)

bin/kafka-topics.sh --list --zookeeper kafka1:2181

bin/kafka-topics.sh --list --zookeeper kafka2:2181

bin/kafka-topics.sh --list --zookeeper kafka3:2181

生产消息

bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test

消费消息

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test

删除topic

bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/46663.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式学习(二)工厂模式——抽象工厂模式+注册表

设计模式学习(二)工厂模式——抽象工厂模式注册表 前言使用简单工厂改进使用注册表改进参考文章 前言 在上一篇文章中我们提到了抽象工厂模式初版代码的一些缺点:①客户端违反开闭原则②提供方违反开闭原则。本文将针对这两点进行讨论 使用…

CSS-0_3 CSS和单位

文章目录 CSS的值和单位属性值长度单位CSS和绝对单位CSS和相对单位百分比em & rem视口 颜色单位 碎碎念 CSS的值和单位 我们知道,CSS是由属性和属性值所组成的表 随着CSS的发展,属性不说几千也有几百,我从来不支持去背诵所有的可能性。…

昇思25天学习打卡营第22天|基于MindSpore的红酒分类实验

基于MindSpore的红酒分类实验 K近邻算法实现红酒聚类 1、实验目的 了解KNN的基本概念;了解如何使用MindSpore进行KNN实验。 2、K近邻算法原理介绍 K近邻算法(K-Nearest-Neighbor, KNN)是一种用于分类和回归的非参数统计方法,…

WPF 手撸插件 一

1、本文主要使不适用第三方工具,纯手工的WPF主项目加载另一个WPF的项目,这里我们加载的是*.exe。 2、项目结构如下图。AbstractionLayer用于创建插件的接口。WPFIPluginDemo是主程序。WpfPlugin3是要加载的插件程序。 3、 AbstractionLayer中添加接口IP…

jvm常用密令、jvm性能优化、jvm性能检测、Java jstat密令使用、Java自带工具、Java jmap使用

1.jps是Java虚拟机的进程状态工具,用于列出正在运行的Java进程 jps命令的使用:cmd打开直接jps 1.1不带参数: jps 默认情况下,列出所有正在运行的 Java 进程的进程 ID 和主类名。 1.2 -l:显示完整的主类名或 JAR 文件…

计算机的错误计算(三十二)

摘要 在计算机的错误计算(二十八)与(三十 一)中,我们探讨了 Visual Studio 对 6个随机exp(x)函数的计算精度问题。根据网友的反馈,本节将展示 Python 对它们的输出:结果几乎与 Visual Studio …

MyBatis框架学习笔记(四):动态SQL语句、映射关系和缓存

1 动态 SQL 语句-更复杂的查询业务需求 1.1 动态 SQL-官方文档 (1)文档地址: mybatis – MyBatis 3 | 动态 SQL (2)为什么需要动态 SQL 动态 SQL 是 MyBatis 的强大特性之一 使用 JDBC 或其它类似的框架,根据不同条…

链接追踪系列-09.spring cloud项目整合elk显示业务日志

准备工作: 参看本系列之前篇:服务器安装elastic search 本机docker启动的kibana-tencent 使用本机安装的logstash。。。 本微服务实现的logstash配置如下: 使用腾讯云redis 启动本机mysql 启动本机docker 启动nacos,微服务依赖它作为…

为什么要使用加密软件?

一、保护数据安全:加密软件通过复杂的加密算法对敏感数据进行加密处理,使得未经授权的人员即使获取了加密数据,也无法轻易解密和获取其中的内容。这极大地提高了数据在存储、传输和使用过程中的安全性。 二、遵守法律法规:在许多国…

实验六:频域图像增强方法

一、实验目的 熟练掌握频域滤波增强的各类滤波器的原理及实现。分析不同用途的滤波器对频域滤波增强效果的影响,并分析不同的滤波器截止频率对频域滤波增强效果的影响。二、实验原理 ① Butterworth 低通滤波器:一种具有最大平坦通带幅度响应的滤波器。它的特点是在通带内具…

Dify中固定递归字符文本分割器的chunk长度计算方式

本文主要从源码角度剖析了Dify中FixedRecursiveCharacterTextSplitter的chunk长度计算方式。 1.self._length_function(chunk) 源码位置:dify\api\core\splitter\fixed\_text\_splitter.py\FixedRecursiveCharacterTextSplitter类\split\_text方法\self.\_length\_function(…

AutoHotKey自动热键(十一)下载SciTE4AutoHotkey-Plus的中文增强版脚本编辑器

关于AutoHotkey的专用编辑器, SciTE4AutoHotkey是一个免费的基于 SciTE 的 AutoHotkey 脚本编辑器,除了 DBGp 支持, 它还为 AutoHotkey 提供了语法高亮, 调用提示, 参数信息和自动完成, 以及其他拥有的编辑特性和辅助工具.XDebugClient 是一个基于 .NET Framework 2.0 的简单开…

buuctf-web

先输入127.0.0.1查找本地 得到网页目录,再输入127.0.0.1|ls查找下一级 得到php文件,127.0.0.1 | ls /返回上级目录 127.0.0.1 | cat /flag得到flag

如何提取视频中的音频?提取音频的几种方法

如何提取视频中的音频?提取视频中的音频,是许多人在处理多媒体内容时常遇到的需求。这一过程不仅仅是简单地从视听媒体中抽离音频部分,它背后蕴含着许多技术上的挑战和创意上的可能性。通过提取音频,你可以更方便地利用视频中的声…

object-C 解答算法:两数之和(leetCode-1)

两数之和(leetCode-1) 题目如下图:(也可以到leetCode上看完整题目,题号1) 解答方法一: 最简单的方法就是双指针遍历数组.代码如下 - (NSMutableArray *)sumOfTwoNumbers:(NSMutableArray *)array target:(int)target {NSMutableArray * resultArray [[NSMutableArray alloc…

【python】操作mysql数据库

一、操作步骤 MySQL是一个开源的关系型数据库管理系统(RDBMS),它使用结构化查询语言(SQL)作为操作和管理数据的主要方式。MySQL具有以下特点: 开源:MySQL是开源软件,这意味着任何人…

数电基础 - 触发器

目录 ​编辑 一. 简介 二. SR锁存器 三. JK 触发器 四. D 触发器 五. 电平触发的触发器 六. 脉冲触发的触发器 七. 边沿触发的触发器 八 . 触发器的逻辑功能和描述方法 一. 简介 触发器是数字电路中的一种基本存储单元,具有记忆功能,能够存储一…

36.UART(通用异步收发传输器)-RS232(3)

(1)串口发送模块visio视图: (2)串口发送模块Verilog代码: /* 常见波特率: 4800、9600、14400、115200 在系统时钟为50MHz时,对应计数为: (1/4800) * 10^9 /20 -1 10416 …

macOS 安装软件提示 “已损坏,无法打开。 您应该将推出磁盘映像” 或 “已损坏,无法打开。 您应该将它移到废纸篓”,解决办法

本文以 Pulsar Assistant 软件为例进行介绍,Redisant 系列的其他软件同理,只需要根据不同软件修改下面命令中的软件名即可。 在 macOS 系统上安装 下载最新的.dmg包,双击打开安装程序,将软件拖动到下方的程序目录即可。 安装时报…

AWS Aurora Postgres 的开源替代品:存储和计算分离 | 开源日报 No.278

neondatabase/neon Stars: 13.0k License: Apache-2.0 Neon 是一个无服务器的开源替代品,用于 AWS Aurora Postgres。它将存储和计算分离,通过在节点集群中重新分配数据来替换 PostgreSQL 存储层。 提供自动扩展、分支和无限存储。Neon 安装包括计算节…