zookeeper+kafka消息队列集群部署

一.消息队列

1、什么是消息队列

        消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

        消息队列(MessageQueue)是一种在软件系统中用于传递消息和实现异步通信的技术。它通常被用来解耦不同组件或服务之间的通信,从而提高系统的可靠性、扩展性和灵活性。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从MQ中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

2、特征

(1)存储:消息队列作为缓冲层,可以处理生产者和消费者的速度差异,防止消息丢失或处理能力不足的问题。

(2)异步:消息队列实现了生产者和消费者的异步通信,生产者不需要等待消费者处理完消息就可以继续工作。

(3)解藕:消息队列能够解耦生产者和消费者,使它们不需要直接知道彼此的存在,从而降低系统组件之间的耦合度。

3、为什么使用消息队列

(1)解耦

        允许你独立的扩展或修改两边的处理过程,只要确保它们遵守相同的接口约束。

(2)冗余

        消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。

(3)扩展性

        因为消息队列解耦了你的处理过程,所以增大消息入队的处理的频率是很容易的,只要另外增加处理过程即可

(4)实现削峰填谷

        在系统的流量剧增时,消息队列能够缓冲请求,平滑处理峰值流量,避免直接将大量请求发送给后端服务导致服务不可用或性能下降。

(5)可恢复性

        系统的一部分组件失效后,不会影响整个系统,消息队列降低了进程间的耦合度,所以即时一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

(6)顺序保证

        消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(kafka保证一个partition内的消息的有序性)

(7)缓冲

        有助于控制和优化数据流经过系统的速度,解决生产消息和处理消息速度不一致的问题。

(8)异步通信

        提供了异步处理机制,允许用户把一个消息放入队列,但不立即处理它,在需要的时候处理。

二.kafka

1.概念

Kafka 是一个高性能、分布式的消息队列系统,最初由LinkedIn开发并开源。它具有以下几个显著的特点和优势:

  1. 高吞吐量:Kafka 能够处理非常高的消息吞吐量。它的设计目标是每秒处理数百万条消息。

  2. 分布式:Kafka 是一个分布式系统,消息被分布存储在多个节点上,支持水平扩展。这使得它能够处理大规模数据和流量。

  3. 持久性:Kafka 将消息持久化到磁盘,保证了消息在发送和消费过程中的持久性和可靠性。即使消费者离线一段时间,也能确保不丢失消息。

  4. 多订阅者支持:Kafka 的消息可以被多个消费者组消费,每个消费者组可以独立消费消息,支持发布-订阅和队列模式。

  5. 可扩展性:Kafka 可以通过增加节点来水平扩展集群,以支持更大的负载和更高的吞吐量。

  6. 低延迟:Kafka 的设计追求低延迟,适用于需要快速数据传输和处理的场景。

  7. 消息保留:Kafka 允许消息在一定时间内保留在系统中,消费者可以根据需要重放过去的消息。

  8. 社区活跃:作为开源项目,Kafka 拥有一个活跃的社区,提供了丰富的文档、示例和支持。

 2.kafka角色术语

(1)broker:kafka集群包含一个或多个服务器,每个服务器被称为broker(经纪人)。

(2)Topic:每条发布到kafka集群的消息都有一个分类,这个分类被称为Topic(主题)。

(3)Producer:消息的生产者,负责发布消息到kafka broker。

(4)Consumer:指消息的消费者,从kafka broker拉取数据,并消费这些已发布的消息

(5)partition:partition是物理上的概念,每个Topic包含一个或多个Partition,每个Partition都是一个有序队列,Partition中的每条消息都会被分配一个有序的id(offset)

(6)Consumer Group:消费者组,默认属于group组。

(7)Message:消息,通信的基本单位,每个producer可以向一个topic发布一些信息

三.zookeeper 

1、概述

        zookeeper是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让它们有序的去访问某个共享资源,防止造成资源竞争(脑裂)的后果。

2、zookeeper工作原理

(1)master启动

        在分布式系统中引入Zookeeper以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是主节点A和主节点B,当两个主节点都启动后,它们都会向Zookeeper中注册节点信息。我们假设主节点A锁注册的节点信息是master001,主节点B注册的节点信息是master002,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法为例,编号最小的节点将在选举中获胜并获得锁成为主节点,也就是主节点A将会获得锁成为主节点,然后主节点B将被阻塞成为一个备用节点。这样,通过这种方式Zookeeper就完成了对两个Master进程的调度。完成了主、备节点的分配和协作

(2)master故障

        如果主节点A发生了故障,这时候它在Zookeeper所注册的节点信息会被自动删除,而Zookeeper会自动感知节点的变化,发现主节点A故障后,会再次发出选举,这时候主节点B将在选举中获胜,替代主节点A成为新的主节点,这样就完成了主、被节点的重新选举

(3)master恢复 

        如果主节点恢复了,它会再次向zookeeper注册自身的节点信息,只不过这时候它注册的节点信息将会变成master003,而不是原来的信息。zookeeper会感知节点的变化再次发动选举,这时候主节点B在选举过程中再次获胜继续担任主节点,节点A担任备份节点。

3.zookeeper集群架构

Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。

follower:跟随着角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。

observer:观察者角色,用户接收客户端的请求,并将写请求转发给leader,同时同步leader状态,但是不参与投票。bserver日的是扩展系统,提高伸缩性。

client:客户端角色,用于向zookeeper发起请求 。

4.zookeeper工作流程

        Zookeeper修改数据的流程:Zookeeper集群中每个Server在内存中存储了一份数据,在Zookeeper启动时,将从实例中选举一个Server作为1eader,Leader负责处理数据更新等操作,当且仅当大多数Server在内存中成功修改数据,才认为数据修改成功。

        Zookeeper写的流程为:客户端Client首先和一个Server或者bserve通信,发起写请求,然后Server将写请求转发给Leader,Leader再将写请求转发给其它Server,其它Server在接收到写请求后写入数据并响应Leader,Leader在接收到大多数写成功回应后,认为数据写成功,最后响应client,完成一次写操作过程。

四、zookeeper在kafka中的作用

1.Broker注册

        Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:
/brokers/ids
        每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0……N]
        Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的BrokerID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker岩机,则对应的临时节点也会被自动删除。

2.Topic注册

        在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录。

3.生产者负载均衡

        由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡

(1)四层负载均衡

        根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除

(2)使用Zookeeper进行负载均衡

        由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制

4.消费者负载均衡

        与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的个消费者,不同的消费者分组消费白己特定的Topic下面的消息,互不干扰

5.记录信息分区与消费者关系

      消费组(ConsumerGroup)下有多个Consumer(消费者)。对于每个消费者组(ConsumerGroup),Kafka都会为其分配一个全局唯一的GroupID,Group内部的所有消费者共享该ID。订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)。
      同时,Kafka为每个消费者分配一个ConsumerID,通常采用Hostname:UUID形式表示
      在Kafka中,规定了每个消总分区只能被同组的一个消费者进行消费,因此,需要在
 Zookeeper上记录消息分区与consumer之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其consumer ID写入到Zookeeper对应消息分区的临时节点上,例如:
 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个消息分区的标识,节点内容就是该消息分区上消费者的ConsumerID。

6、消息消费进度Offset记录

        在消费者对指定消分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。ffset在Zookeeper中由一个专门节点进行记录,其节点路径为:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值

7.消费者注册

消费者服务器在初始化启动时加入消费者分组的步骤如下:

(1)注册到消费者分组

        每个消费者服务器启动时,都会到zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

(2)对消费者分组中的消费者的变化注册监听

        每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少就触发消费者的负载均衡。

(3)对Broker服务器变化注册监听

        消费者需要对/broker/ids/[-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡

(4)进行消费者负载均衡

为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消
息分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡

 五.集群部署kafka

1:修改主机hosts文件(所有主机都配置)

[root@kafka1 ~]# vim /etc/hosts
192.168.10.101 kafka1
192.168.10.102 kafka2
192.168.10.103 kafka3

2:zookeeper的部署

(1)安装zookeeper(三个节点的配置相同)
[root@kafka1 ~]# systemctl stop firewalld
[root@kafka1 ~]# setenforce 0
[root@kafka1 ~]# yum -y install java
[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
 (2)创建数据保存目录(三个节点的配置相同)
[root@kafka1 ~]# cd /etc/zookeeper/
[root@kafka1 zookeeper]# mkdir zookeeper-data
(3)修改配置文件(三个节点的配置相同)
[root@kafka1 zookeeper]# cd /etc/zookeeper/conf
[root@kafka1 ~]# mv zoo_sample.cfg zoo.cfg
[root@kafka1 ~]# vim zoo.cfg 
dataDir=/etc/zookeeper/zookeeper-data
clientPort=2181
server.1=192.168.10.101:2888:3888
server.2=192.168.10.102:2888:3888
server.3=192.168.10.103:2888:3888

注释:zookeeper只用的端口

2181:对cline端提供服务

3888:选举leader使用

2888:集群内机器通讯使用(Leader监听此端口)

(4)创建节点id文件(按server编号设置这个id,三个机器不同)

节点1:

[root@kafka1 conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid

节点2:

[root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid

节点3:

[root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid
(5)三个节点启动zookeeper进程
[root@kafka1 conf]# cd /etc/zookeeper/
[root@kafka1 zookeeper]# ./bin/zkServer.sh start
[root@kafka1 zookeeper]# ./bin/zkServer.sh status

 3:kafka的部署

(1)kafka的安装(三个节点的配置相同)
[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
(2)修改配置文件
[root@kafka1 ~]# cd /etc/kafka/[root@kafka2 kafka]# vim config/server.propertiesbroker.id=1##21行  修改,注意其他两个的id分别是2和3listeners=PLAINTEXT://192.168.10.101:9092#31行  修改,其他节点改成各自的IP地址log.dirs=/etc/kafka/kafka-logs## 60行  修改num.partitions=1##65行 分片数量,不能超过节点数zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:218##123行,填写集群中各节点的地址和端口

注释:

9092是kafka的监听端口

(3)创建日志目录(三个节点的配置相同)
[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
(4)在所有kafka节点上执行开启命令,生成kafka群集(三个节点的配置相同)
[root@kafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties &

如果启动不了,可以将/etc/kafka/kafka-logs中的数据清除再试试

4:测试

创建topic(任意一个节点)

bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test

列出topic(任意一个节点)

bin/kafka-topics.sh --list --zookeeper kafka1:2181bin/kafka-topics.sh --list --zookeeper kafka2:2181bin/kafka-topics.sh --list --zookeeper kafka3:2181

生产消息

bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test

消费消息

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test

输入完成会可以去生产者方测试输入,看消费者方有无信息 

删除topic

bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/48372.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1、springboot3 vue3开发平台-后端-项目构建

文章目录 1. 创建项目1.1 前置环境条件1.2 项目创建 2. 模块配置2.1 父工程配置概述2.2 配置启动模块2.3 父工程相关依赖管理 1. 创建项目 1.1 前置环境条件 idea2023, jdk17 1.2 项目创建 创建父工程并删除不需要的文件目录: 右键父工程依次创建其他模块 最…

Windows 、Linux、MacOS 进程管理机制

本心、输入输出、结果 文章目录 Windows 、Linux、MacOS 进程管理机制前言Windows 进程管理机制Linux 进程管理macOS 进程管理内存不够了,几个操作系统如何处理Windows 、Linux、MacOS 进程管理机制 编辑 | 简简单单 Online zuozuo 地址 | https://blog.csdn.net/qq_15071263 …

【Qt】窗口

文章目录 QMainWindow菜单栏工具栏状态栏浮动窗口对话框自定义对话框Qt内置对话框QMessageBox QMainWindow Qt中的主窗口以QMainWindow表示,其总体结构如下: 菜单栏 菜单栏MenuBar,可包含多个菜单Menu,每个菜单也可以包含多个菜…

C语言 指针方法 输入一行文字,找出其中大写字母、小写字母、空格、数字以及其他字符各有多少

输入一行文字,找出其中大写字母、小写字母、空格、数字以及其他字符各有多少。 #include <stdio.h>void countCharacters(char *str, int *upper, int *lower, int *space, int *digit, int *other) {*upper = *lower = *space = *digit = *other = 0;while (*str != \0…

Java面试题系列 - 第17天

Java中的代理模式与动态代理 背景说明&#xff1a;代理模式是一种结构型设计模式&#xff0c;用于在客户端和目标对象之间提供一个代理或占位符。在Java中&#xff0c;动态代理技术允许在运行时创建代理对象&#xff0c;这在AOP&#xff08;面向切面编程&#xff09;和RPC&…

03 Git的基本使用

第3章&#xff1a;Git的基本使用 一、创建版本仓库 一&#xff09;TortoiseGit ​ 选择项目地址&#xff0c;右键&#xff0c;创建版本库 ​ 初始化git init版本库 ​ 查看是否生成.git文件&#xff08;隐藏文件&#xff09; 二&#xff09;Git ​ 选择项目地址&#xff0c…

【LeetCode】相同的树

目录 一、题目二、解法完整代码 一、题目 给你两棵二叉树的根节点 p 和 q &#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a;p [1,2,3],…

FastGPT 知识库搜索测试功能解析

目录 一、代码解析 1.1 searchTest.ts 1.2 controller.ts 本文接上一篇文章FastGPT 知识库搜索测试功能解析 对具体代码进行解析。 一、代码解析 FastGPT 知识库的搜索测试功能主要涉及两个文件&#xff0c;分别是 searchTest.ts 和 controller.ts 文件&#xff0c;下面分…

【python】练习 10.5:访客簿 编写⼀个 while 循环,提⽰⽤户输⼊其名字。

练习 10.5&#xff1a;访客簿 编写⼀个 while 循环&#xff0c;提⽰⽤户输⼊其名字。 要求 收集⽤户输⼊的所有名字&#xff0c;将其写⼊ guest_book.txt&#xff0c;并确保这个⽂件中 的每条记录都独占⼀⾏。 代码 from pathlib import Pathpath Path(guest_book.txt) gu…

npm下载的依赖包版本号怎么看

npm下载的依赖包版本号怎么看 版本号一般分三个部分&#xff0c;主版本号、次版本号、补丁版本号。 主版本号&#xff1a;一般依赖包发生重大更新时&#xff0c;主版本号才回发生变化&#xff0c;如Vue2.x到Vue3.x。次版本号&#xff1a;当依赖包中发生了一些小变化&#xff…

运行springboot项目报错:java: java.lang.NoSuchFieldError: members_field

项目场景&#xff1a; 在idea中运行从git上拉取的基于springboot框架的项目运行报错 问题描述 运行spingboot项目报错 java: java.lang.NoSuchFieldError: members_field原因分析&#xff1a; 检查你所使用的java版本&#xff08;我这里是在idea上运行的&#xff0c;可以很直…

ArkTS语言---基础知识

ArkTS是一种为构建高性能应用而设计的编程语言。ArkTS在继承TypeScript语法的基础上进行了优化&#xff0c;以提供更高的性能和开发效率。目前流行的编程语言TypeScript是在JavaScript基础上通过添加类型定义扩展而来的&#xff0c;而ArkTS则是TypeScript的进一步扩展。TypeScr…

qt QScrollArea 可滚动区域控件简单举例

1.qt 滚动控件简单举例 在Qt中&#xff0c;滚动控件通常是通过QScrollArea来实现的。以下是一个简单的例子&#xff0c;展示了如何使用QScrollArea来创建一个滚动控件&#xff1a; #include <QApplication> #include <QWidget> #include <QVBoxLayout>…

DETR算法解读——Transformer在目标检测任务的首次应用

论文&#xff1a;End-to-End Object Detection with Transformers 作者&#xff1a;Nicolas Carion, Francisco Massa, Gabriel Synnaeve, Nicolas Usunier, Alexander Kirillov, Sergey Zagoruyko 机构&#xff1a;Facebook AI 链接&#xff1a;https://arxiv.org/abs/2005.12…

git教程, 命令行版

前言 git就是代码版本管理系统&#xff0c;很简单的作用就是每一次commit之后&#xff0c;修改文件都是跟上一次commit的仓库文件做对比&#xff0c;也可以调出历史的文件查看某次commit修改了什么东西 0环境准备&#xff1a; 安装git, 百度一下&#xff0c;然后打开cmd&…

Django 执行原生SQL

在Django中&#xff0c;你可以使用Raw SQL queries来执行原生的SQL查询。这对于需要进行复杂查询或Django的ORM无法满足的查询非常有用。 1&#xff0c;添加模型 Test/app11/models.py from django.db import modelsclass Post(models.Model):title models.CharField(max_le…

视频压缩文件太大了怎么缩小?怎么压缩视频大小?视频压缩方法:10个!(宝藏)

视频压缩文件太大了怎么缩小&#xff1f;让我看看是谁下班之后不是一手刷手机短视频&#xff0c;顺便葛优躺在沙发上的&#xff1f;互联网发展到现在&#xff0c;视频已成为我们生活中不可或缺的一部分。不管是视频录制还是视频缓存&#xff0c;视频文件体积越来越庞大&#xf…

C++中size_t怎么用

size_t是C标准库中定义的一种无符号整型数据类型&#xff0c;通常用于表示大小或数量&#xff0c;如数组的长度、容器的大小或指针算术。size_t类型确保了跨平台的一致性&#xff0c;使得代码更加健壮和可移植。 size_t定义在<cstddef>头文件中&#xff0c;因此在使用之…

reserve和resize

void test_vector4() {vector<int> v1;//cout << v1.max_size() << endl;//v1.reserve(10);v1.resize(10);for (size_t i 0; i < 10; i){v1[i] i;}for (auto e : v1){cout << e << " ";}cout << endl;} 在上面这段代码中对…

使用shedlock实现分布式互斥执行

前言 前序章节&#xff1a;springboot基础(82):分布式定时任务解决方案shedlock 如果你不清楚shedlock&#xff0c;建议先阅读前序章节&#xff0c;再来查看本文。 如果我们不在spring环境下&#xff0c;如何使用shedlock实现分布式互斥执行&#xff1f; 我们可以使用shedl…