消息中间件Kafka分布式数据处理平台

目录

一.Kafka基本介绍

1.定义

2.特点

(1)高吞吐量、低延迟

(2)可扩展性

(3)持久性、可靠性

(4)容错性

(5)高并发

3.系统架构

(1)Broker(服务代理节点)

(2)Producer(生产者)

(3)Consumer(消费者)

(4)Consumer Group(消费组)

(5)ZooKeeper

(6)Topic(主题)

(7)Partition(分区)

(8)Replica(副本)

(9)Leader and Follower

(10)Offset(偏移量)

二.部署ZooKeeper+Kafka集群

1.环境准备

2.下载安装包

3.修改配置文件

4.设置环境变量

5.配置ZooKeeper启动脚本

6.设置开机自启并启动

7.Kafka命令行操作

(1)创建topic

(2)查看当前服务器中的所有topic

(3)查看某个topic的详情

(4)发布消息

(5)消费消息

(6)修改分区数

(7)删除topic


一.Kafka基本介绍

1.定义

Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域。
最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于 Zookeeper 协调的分布式消息中间件系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于 hadoop 的批处理系统、低延迟的实时系统、Spark/Flink 流式处理引擎,nginx 访问日志,消息服务等等,用 scala 语言编写,Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。
官方网址:https://kafka.apache.org/

2.特点

(1)高吞吐量、低延迟

Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个 topic 可以分多个 Partition,Consumer Group 对 Partition 进行消费操作,提高负载均衡能力和消费能力。

(2)可扩展性

kafka 集群支持热扩展

(3)持久性、可靠性

消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

(4)容错性

允许集群中节点失败(多副本情况下,若副本数量为 n,则允许 n-1 个节点失败)

(5)高并发

支持数干个客户端同时读写

3.系统架构

  • 生产者生产数据传给broker即kafka服务器集群
  • kafka集群将数据存储在topic主题中,每个topic主题中有多个分片(分片做了备份在其他topic)
  • 分片中存储数据,kafka集群注册在zookeeper中,zookeeper通知消费者kafka服务器在线列表
  • 消费者收到zookeeper通知的在线列表,从broker中拉取数据
  • 消费者保存偏移量到zookeeper中,以便记录自己宕机消费到什么地方
(1)Broker(服务代理节点)
  • 服务代理节点,其实就是一个kafka实例或服务节点,多个broker构成了kafka集群
  • 一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic
(2)Producer(生产者)
  • 生产者,也就是写入消息的一方,将消息写入broker中
  • 即数据的发布者,该角色将消息 push 发布到 Kafka 的 topic 中
  • broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中
  • 生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition
     
(3)Consumer(消费者)
  • 消费者,也就是读取消息的一方,从broker中pull 拉取数据
  • 可以消费多个 topic 中的数据
(4)Consumer Group(消费组
  • 消费者组,由多个 consumer 组成
  • 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名,若不指定组名则属于默认的组
  • 将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力
  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,防止数据被重复读取
  • 消费者组之间互不影响
  • 消费组。一个或多个消费者构成一个消费组,不同的消费组可以订阅同一个主题的消息且互不影响
     
(5)ZooKeeper
  • kafka使用zookeeper来管理集群的元数据 meta 信息,以及控制器的选举等操作
  • 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费
  • zookeeper的作用就是,生产者push数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是通过zookeeper去寻找的。消费者消费哪一条数据,也需要zookeeper的支持,从zookeeper获得offset,offset记录上一次消费的数据消费到哪里,这样就可以接着下一条数据进行消费
(6)Topic(主题
  • 可以理解为一个队列,生产者和消费者面向的都是一个 topic。
  • 类似于数据库的表名或者 ES 的 index
  • 物理上不同 topic 的消息分开存储
(7)Partition(分区
  • 分区,同一个主题下的消息还可以继续分成多个分区,一个分区只属于一个主题
  • 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。Kafka 只保证 partition 内的记录是有序的,而不保证 topic 中不同 partition 的顺序
  • 每个 topic 至少有一个 partition,当生产者产生数据的时候,会根据分配策略选择分区,然后将消息追加到指定的分区的队列末尾

Partation 数据路由规则:
1.指定了 patition,则直接使用
2.未指定 patition 但指定 key(相当于消息中某个属性),通过对 key 的 value 进行 hash 取模,选出一个 patition
3.patition 和 key 都未指定,使用轮询选出一个 patition

每条消息都会有一个自增的编号,用于标识消息的偏移量,标识顺序从 0 开始。

每个 partition 中的数据使用多个 segment 文件存储。

如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下(例如商品秒杀、 抢红包),需要将 partition 数目设为 1。

  • broker 存储 topic 的数据。如果某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。
  • 如果某 topic 有 N 个 partition,集群有 (N+M) 个 broker,那么其中有 N 个 broker 存储 topic 的一个 partition, 剩下的 M 个 broker 不存储该 topic 的 partition 数据。
  • 如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
(8)Replica(副本
  • 副本,一个分区可以有多个副本来提高容灾性
  • 为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower
(9)Leader and Follower
  • 分区有了多个副本,那么就需要有同步方式。kafka使用一主多从进行消息同步,主副本提供读写的能力,而从副本不提供读写,仅仅作为主副本的备份
  • 每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition
  • Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。Follower 只负责备份,不负责数据的读写。
  • 如果 Leader 故障,则从 Follower 中选举出一个新的 Leader。
  • 当 Follower 挂掉、卡住或者同步太慢,Leader 会把这个 Follower 从 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合) 列表中删除,重新创建一个 Follower
(10)Offset(偏移量)
  • 可以唯一的标识一条消息,分区中的每一条消息都有一个所在分区的偏移量,这个偏移量唯一标识了该消息在当前这个分区的位置,并保证了在这个分区的顺序性,不过不保证跨分区的顺序性
  • 偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息(即消费位置)
  • 消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 Kafka 的消息
  • 某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制
  • 消息最终还是会被删除的,默认生命周期为 1 周(7*24小时)
     

二.部署ZooKeeper+Kafka集群

1.环境准备

服务器类型系统和IP地址需要安装的组件
Zookeeper服务器1CentOS7.4(64 位) 192.168.227.100jdk、ZooKeeper
Zookeeper服务器2CentOS7.4(64 位) 192.168.227.101jdk、ZooKeeper
Zookeeper服务器3CentOS7.4(64 位) 192.168.227.102jdk、ZooKeeper

需要部署ZooKeeper集群(详情请见ZooKeeper分布式应用程序协调服务-CSDN博客)三台服务器步骤相同,此处只展示一台设备的搭建

2.下载安装包

1. #下载安装包
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz1.1 #有压缩包就直接拖进来
cd /opt
rz -E2. #安装Kafka
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

3.修改配置文件

1. #移动并将配置文件进行备份
cd /usr/local/kafka/config/
cp server.properties{,.bak}2. #修改
vim server.properties
-------------------------------------------
broker.id=0                           
#21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2listeners=PLAINTEXT://192.168.227.100:9092    
#31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改num.network.threads=3    
#42行,broker 处理网络请求的线程数量,一般情况下不需要去修改num.io.threads=8         
#45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数socket.send.buffer.bytes=102400       #48行,发送套接字的缓冲区大小socket.receive.buffer.bytes=102400    #51行,接收套接字的缓冲区大小socket.request.max.bytes=104857600    #54行,请求套接字的缓冲区大小log.dirs=/usr/local/kafka/logs        #60行,kafka运行日志存放的路径,也是数据存放的路径num.partitions=1    
#65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖num.recovery.threads.per.data.dir=1    #69行,用来恢复和清理data下数据的线程数量log.retention.hours=168    
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除log.segment.bytes=1073741824    
#110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件zookeeper.connect=192.168.227.100:2181,192.168.227.101:2181,192.168.227.102:2181    
#123行,配置连接Zookeeper集群地址
------------------------------------------------

4.设置环境变量

1. #修改环境变量
vim /etc/profile
----------------------------------------
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
-----------------------------------------2. #刷新配置文件
source /etc/profile3. #查看环境变量
echo $PATH

5.配置ZooKeeper启动脚本

vim /etc/init.d/kafka
------------------------------------------------
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac
------------------------------------------------------------------

6.设置开机自启并启动

1. #设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka2. #分别启动 Kafka
service kafka start

7.Kafka命令行操作

(1)创建topic
kafka-topics.sh --create --zookeeper 192.168.227.100:2181,192.168.227.101:2181,192.168.227.102:2181 --replication-factor 2 --partitions 3 --topic test############################################
--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
--replication-factor:定义分区副本数,1 代表单副本,建议为 2 
--partitions:定义分区数 
--topic:定义 topic 名称

(2)查看当前服务器中的所有topic
kafka-topics.sh --list --zookeeper 192.168.227.100:2181,192.168.227.101:2181,192.168.227.102:2181

(3)查看某个topic的详情
kafka-topics.sh  --describe --zookeeper 192.168.227.100:2181,192.168.227.101:2181,192.168.227.102:2181

(4)发布消息
kafka-console-producer.sh --broker-list 192.168.227.100:9092,192.168.227.101:9092,192.168.227.102:9092  --topic test

(5)消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.227.100:9092,192.168.227.101:9092,192.168.227.102:9092 --topic test --from-beginning
#--from-beginning:会把主题中以往所有的数据都读取出来

(6)修改分区数
kafka-topics.sh --zookeeper 192.168.227.100:2181,192.168.227.101:2181,192.168.227.102:2181 --alter --topic test --partitions 6

(7)删除topic
kafka-topics.sh --delete --zookeeper 192.168.227.100:2181,192.168.227.101:2181,192.168.227.102:2181 --topic test

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/823631.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第五套

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第五套 (共9套,有答案和解析,答案非官方,仅供参考)(共九套,每套四十个选择题) 部分题目分享,完整版获取(WX:didadida…

JVM 方法调用之方法分派

JVM 方法调用之方法分派 文章目录 JVM 方法调用之方法分派1.何为分派2.静态分派3.动态分派4.单分派与多分派5.动态分派的实现 1.何为分派 在上一篇文章《方法调用之解析调用》中讲到了解析调用,而解析调用是一个静态过程,在类加载的解析阶段就确定了方法…

ECharts:五大卓越在线示例库助力高效数据可视化开发

1. ECharts官方示例库 ECharts官网提供的示例库是最权威、最新的展示平台,涵盖了所有基础和高级图表类型,每个示例都配有详尽的代码解释和配置说明。开发者可以直接查看源代码,复制粘贴后稍加修改就能应用于实际项目中。 2. Make A Pie - EC…

【笔试训练】day4

不到5分钟写完,今天的题又又又难一点啦! 1.Fibonacci数列 思路: 直接模拟一遍斐波那契数列的递增过程,大于n就直接结束。因为后面只会越来越大,跟题目求的最小步数不符。在这个过程中用一个变量去维护这个当前的元素与目标n还差…

【编程TOOL】VC++6.0下载安装配置使用保姆式教程

目录 ​编辑 1.软件介绍 2.软件下载 3.软件安装 3.1.下载得到可执行文件并双击进行安装 3.2. 点击下一步 3.3. 选择安装位置 3.4. 勾选“创建桌面快捷方式”并点击下一步 5. 点击安装并等待 3.6. 先取消运行,后点击完成,软件即安装完毕 4.兼容性配置 4.1…

基于SpringBoot+Vue的疾病防控系统设计与实现(源码+文档+包运行)

一.系统概述 在如今社会上,关于信息上面的处理,没有任何一个企业或者个人会忽视,如何让信息急速传递,并且归档储存查询,采用之前的纸张记录模式已经不符合当前使用要求了。所以,对疾病防控信息管理的提升&a…

IoC与Spring

目录 IoC控制反转 现实案例 特点 目的 DI依赖注入 小总结 介绍Spring 狭义和广义上的Spring 传统编码方式的不足 需求引入 弊端分析 IoC控制反转 现实案例 1、买水果问老板各种水果的口感而不是自己去挨个尝试一遍。 2、买房子找中介而不是自己去花时间找房东。…

别找了,这35份Excel自动排班表真的好用!

别再自己做排班表了,调了半天不好看格式还不对。 看看自己需要的是哪些类型的排班表?是公司值班,还是直播排班,还是考勤汇总,总有一个适合你。 刚整理的35份办公常用的排班表,希望能帮到你! …

基于Python的机器学习的文本分类系统

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

CentOS 7安装Redis

说明:本文介绍如何在CentOS 7操作系统下安装Redis 下载安装 首先,去官网上下载所需要安装的版本,官网地址:https://download.redis.io/releases/,我这里下载3.2.1版本的 下载完,上传到云服务器上&#xf…

<router-link>出现Error: No match for {“name“:“home“,“params“:{}}

在将<a></a>标签换到<router-link></router-link>的时候出现No match for {"name":"home","params":{}}这样的错误&#xff0c;其中格式并无错误&#xff0c; <router-link class"navbar-brand active" …

她在《繁花》大放异彩,“浪姐”暴瘦15斤,打脸了不看好她的观众

不知不觉&#xff0c;《浪姐》已经迎来第5季了。播到第4季的时候&#xff0c;改名成《乘风破浪2023》&#xff0c;这一季叫《乘风2024》&#xff0c;和前几季相比&#xff0c;热度依然不减。 都说3个女人一台戏&#xff0c;更何况这个节目&#xff0c;每次能请到30位姐姐&…

刷题。。。。。。

1.ezmd5 根据题目提示 我们知道应该是要上传两张md5值相同的图片 根据原文链接&#xff1a;cryptanalysis - Are there two known strings which have the same MD5 hash value? - Cryptography Stack Exchange 把保存下来的图片上传一下 得到flag 2.ezhttp 根据原文链接&…

node基础 第二篇

01 ffmpeg开源跨平台多媒体处理工具&#xff0c;处理音视频&#xff0c;剪辑&#xff0c;合并&#xff0c;转码等 FFmpeg 的主要功能和特性:1.格式转换:FFmpeg 可以将一个媒体文件从一种格式转换为另一种格式&#xff0c;支持几乎所有常见的音频和视频格式&#xff0c;包括 MP…

冲上热搜-奇安信今年的年终奖。。

最近,奇安信宣布全员无年终奖&#xff0c;同时冲上了脉脉热搜榜第一。作为网安界的一哥&#xff0c;奇安信的决定无疑给许多期待年终奖的员工带来了沉重的打击。 从公司内部的绩效考核机制来看,奇安信将员工分为了5个档次:S、A、B、B、B-。而大多数员工被评定为中等的B档,这意味…

【网络编程】web服务器shttpd源码剖析——命令行和文件配置解析

hello &#xff01;大家好呀&#xff01; 欢迎大家来到我的网络编程系列之web服务器shttpd源码剖析——命令行解析&#xff0c;在这篇文章中&#xff0c;你将会学习到在Linux内核中如何创建一个自己的并发服务器shttpd&#xff0c;并且我会给出源码进行剖析&#xff0c;以及手绘…

C++异常学习

C语言传统的处理错误的方式 传统的错误处理机制&#xff1a; 终止程序&#xff0c;如assert&#xff0c;缺陷&#xff1a;用户难以接受。如发生内存错误&#xff0c;除0错误时就会终止程序。返回错误码&#xff0c;缺陷&#xff1a;需要程序员自己去查找对应的错误。如系统的…

ES增强框架easy-es

因为最近做的功能是关于舆情的,所以数据量比较大的,本来打算用MySQL做时间分表来做,但是经过一段时间的测试,发现数据量太大,用时间分表不能满足性能的要求,所以决定将数据存储改为ES,但是短时间内改底层框架又不是一个小工程,时间上不允许,所以找到了一个很合适的框架,他跟myb…

深入理解JVM中的G1垃圾收集器原理、过程和参数配置

码到三十五 &#xff1a; 个人主页 心中有诗画&#xff0c;指尖舞代码&#xff0c;目光览世界&#xff0c;步履越千山&#xff0c;人间尽值得 ! 在Java虚拟机&#xff08;JVM&#xff09;中&#xff0c;垃圾收集&#xff08;GC&#xff09;是一个自动管理内存的过程&#xff…

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第四套

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第四套 (共9套&#xff0c;有答案和解析&#xff0c;答案非官方&#xff0c;仅供参考&#xff09;&#xff08;共九套&#xff0c;每套四十个选择题&#xff09; 部分题目分享&#xff0c;完整版获取&#xff08;WX:didadida…