超强总结Kafka详解

一、Kafka简介

Kafka是什么

Kafka是一种高吞吐量的分布式发布订阅消息系统(消息引擎系统),它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,
搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来
解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop
的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

其实我们简单点理解就是系统A发送消息给kafka(消息引擎系统),系统B从kafka中读取A发送的消息。而kafka就是个中间商。

1.1 Kafka的特性:

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

  • 可扩展性:kafka集群支持热扩展

  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

  • 高并发:支持数千个客户端同时读写

1.2 Kafka的使用场景:

Kafaka经常用于削峰、解耦、异步。

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

  • 消息系统:解耦生产者和消费者、缓存消息等。

  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

  • 流式处理:比如spark streaming和storm

  • 事件源

1.3 Kakfa的设计思想

Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个replica作为partition leader(如果ISR列表中的replica全挂,选一个幸存的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其他的kafka broker。

二、Kafka架构

Kafka拓扑结构
在这里插入图片描述

在这里插入图片描述

三、Kafka中的术语解释概述

Broker【服务器节点】

Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。

  • 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。

  • 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。

  • 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

Topic【主题】

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于
一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。类似于数据库的表名。在每个broker上都
可以创建多个topic。

Partition【分区】

在这里插入图片描述

  • topic中的数据分割为一个或多个partition。每个topic至少有一个partition。topic的数据数据会写入到不同的partition。

  • 每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

  • partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。

  • 如果topic有多个partition,消费数据时就不能保证数据的顺序。

  • 在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:

  1. 方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。

  2. 提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?

kafka中有几个原则:

  1. partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。

  2. 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。

  3. 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?

那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。

  • 0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。

  • 1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。

  • all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

Producer【生产者】

生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

Consumer【消费者】

消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

Consumer Group【消费者组】

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Leader【领导者】

每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

Follower【跟随者】

  • Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。
  • 如果Leader失效,则从Follower中选举出一个新的Leader。
  • 当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

Replica【副本】

每个partition可以在其他的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。
存replica副本的方式是按照kafka broker的顺序存。
例如有5个kafka broker节点,某个topic有3个partition,每个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)。这样如果某个broker宕机,其实整个kafka内数据依然是完整的。但是,replica副本数越高,系统虽然越稳定,但是会带来资源和性能上的下降;replica副本少的话,也会造成系统丢数据的风险。

  • 传送消息:producer先把message发送到partition leader,再由leader发送给其他partition follower(如果让producer发送给每个replica那就太慢了)。 再向Producer发送ACK前需要保证有多少个Replica已经收到该消息:根据ack配的个数而定。

  • 处理某个Replica不工作的情况:如果这个部工作的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。如果这个不工作的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工作的partition replca写message成功,但是会等到time out,然后返回失败因为某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工作的partition replica从ack列表中移除,以后的producer发送message的时候就不会有这个ack列表下的这个部工作的partition replica了。

  • 处理Failed Replica恢复回来的情况:如果这个partition replica之前不在ack列表中,那么启动后重新受Zookeeper管理即可,之后producer发送message的时候,partition leader会继续发送message到这个partition follower上。如果这个partition replica之前在ack列表中,此时重启后,需要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工作的partition replica的时候自动从ack列表中移除的)

四、kafka应用实战

Kafak消费端配置

max.poll.interval.ms

默认为5分钟
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费,触发rebalance 。
如果你的消费者节点总是在重启完不久就不消费了,可以考虑检查改配置项或者优化你的消费者的消费速度等等。

max.poll.records

max-poll-records是Kafka consumer的一个配置参数,表示consumer一次从Kafka broker中拉取的最大消息数目,默认值为500条。在Kafka中,一个消费者组可以有多个consumer实例,每个consumer实例负责消费一个或多个partition的消息,每个consumer实例一次从broker中可以拉取一个或多个消息。

max-poll-records参数的作用就是控制每次拉取消息的最大数目,以实现消费弱化和控制内存资源的需求。

参考Kafka中的max-poll-records和listener.concurrency配置

常见配置参考:

 /*** kafka配置 default** @return*/private Map<String, Object> consumerProperties() {Map<String, Object> props = new HashMap<>(16);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMS);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);return props;}/*** 消费者工厂** @return*/@Bean("autocommitConsumerFactory")@Primarypublic DefaultKafkaConsumerFactory autocommitConsumerFactory() {Map<String, Object> props = consumerProperties();props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);return new DefaultKafkaConsumerFactory(props);}/*** kafka监听容器* kafkaListenerContainerFactory** @return*/@Bean(name = "containerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, String>> containerFactory() {ConcurrentKafkaListenerContainerFactory<Long, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);factory.setBatchListener(true);return factory;}

生产端配置

  /*** producerFactory* * @return*/@Beanpublic ProducerFactory<Long, String> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(props);}/*** kafkaTemplate** @return*/@Beanpublic KafkaTemplate<Long, String> kafkaTemplate(ProducerFactory<Long, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}

SpringBoot 集成 KafkaTemplate 发送Kafka消息

@Resourceprivate ObjectMapper mapper;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;try {Order order = new Order();String message = mapper.writeValueAsString(order);CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("order", message);future.thenAccept(result -> {if (result.getRecordMetadata() != null) {log.debug("send message:{} with offset:{}", message, result.getRecordMetadata().offset());}}).exceptionally(exception -> {log.error("KafkaProducer send message failure,topic={},data={}", topic, message, exception);return null;});} catch (Exception e) {log.error("KafkaProducer send message exception,topic={},message={}", topic, message, e);}

SpringBoot 集成 @KafkaListener 消费Kafka消息

@Resourceprivate ObjectMapper mapper;@KafkaListener(id = "order_consumer",topics = "order",groupId = "g_order_consumer_group",properties = {"max.poll.interval.ms:300000", "max.poll.records:1"})// 可以只有ConsumerRecords<String, String> records参数。ack参数非必需,ack.acknowledge()是为了防消息丢失public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {String msg = record.value();log.info("Consume msg:{}", msg);try {Order order = mapper.readValue(val, Order.class);// 处理业务逻辑} catch (Exception e) {log.error("Consume failed, msg:{}", val, e);}}ack.acknowledge();}

五、Kafka可视化管理工具

【Kafka可视化工具】kafka-manager
kafka-manager安装及基本使用

【Kafka可视化工具】Offset Explorer
Kafka-Offset Explorer安装及基本使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/40191.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用elasticsearch完成多语言搜索的三种方式

文档目标&#xff1a; 基于elasticsearch&#xff0c;实现不同语言搜索特定语言的文档数据&#xff1b;比如输入中文的内容&#xff0c;搜索中文文档数据&#xff0c;输入英文搜索英文文档数据&#xff0c;日韩文类似 方案概述&#xff1a; 方式一&#xff1a;不同的语言使用不…

使用Ubuntu 22.04安装Frappe-Bench【二】

系列文章目录 第一章 使用VMware创建Ubuntu 22.04【一】 文章目录 系列文章目录前言什么是Frappe-Bench&#xff1f;使用安装ERPNext能实现什么效果&#xff1f; 官网给了一个说明 一、使用Ubuntu 22.04安装Frappe-Bench一、安装要求二、安装命令三、 可能出现问题 总结 前言 …

【计算机毕业设计】026基于微信小程序的原创音乐

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

Linux多进程和多线程(五)进程间通信-消息队列

多进程(五) 进程间通信 消息队列 ftok()函数创建消息队列 创建消息队列示例 msgctl 函数示例:在上⼀个示例的基础上&#xff0c;加上删除队列的代码 发送消息 示例: 接收消息示例 多进程(五) 进程间通信 消息队列 消息队列是一种进程间通信机制&#xff0c;它允许两个或多个…

LCD显示从电路IC 到 驱动编写调试

文章目录 LCD驱动电路IC简述Panel 模块驱动图示含义接口与连接 Panel内部驱动驱动原理框图TCON(Timing Controller):时序控制器。一、控制屏幕时序与信号驱动二、提升图像质量三、接口支持与兼容性四、市场应用广泛 Gate控制信号工作时序Source Driver IC原理框图 LCD驱动应该怎…

JAVA—图形化“登录,注册”界面

前言&#xff1a;学习了一段时间JAVA的swing组件&#xff0c;心血来潮写了一个登录&#xff0c;注册界面。 知道大伙喜欢美女&#xff0c;所以把用户登录界面背景设置成了beauty&#xff01; 所用知识基本上都属于swing组件&#xff1a; javax.siwng.JFrame; //窗体类 javax.sw…

c++习题08-计算星期几

目录 一&#xff0c;问题 二&#xff0c;思路 三&#xff0c;代码 一&#xff0c;问题 二&#xff0c;思路 首先&#xff0c;需要注意到的是3^2000这个数值很大&#xff0c;已经远远超过了long long 数据类型能够表示的范围&#xff0c;如果想要使用指定的数据类型来保存…

14-18 2024 年影响企业 GenAI 的关键技术趋势

现在&#xff0c;大多数 .com 公司已于 2023 年更名为 .ai&#xff0c;那么价值万亿美元的问题是&#xff1a;接下来会发生什么&#xff1f;哪些关键障碍、工具、技术和方法将重塑格局 企业 AI 的不同之处在于&#xff0c;它专注于可衡量、可管理的输出&#xff0c;企业可以控…

前端Web开发HTML5+CSS3+移动web视频教程 Day4 CSS 第2天

P44 - P 四个知识点&#xff1a; 复合选择器 CSS特性 背景属性 显示模式 复合选择器 复合选择器仍然是选择器&#xff0c;只要是选择器&#xff0c;作用就是找标签。复合选择器就是把基础选择器进行组合使用。组合了之后就可以在大量的标签里面更快更精准地找标签了。找…

Qt中线程的使用

目录 1 .QThread重要信号和函数 1.1 常用共用成员函数 1.2信号和槽函数 1.3静态函数 1.4 任务处理函数 2.关于QThread的依附问题&#xff1a; 3.关于connect连接 4.QThread的使用 5.线程池QThreadPool 5.1. 线程池的原理 5.2&#xff0e;QRunable类 5.3. QThreadPoo…

安装维修制氮设备的注意指南

制氮设备在许多工业领域都发挥着重要作用&#xff0c;无论是确保生产过程中的氮气供应&#xff0c;还是维持设备的稳定运行&#xff0c;正确的安装和维修都是关键。以下是一些重要的注意事项&#xff0c;帮助您顺利完成制氮设备的安装与维修工作。 一、安装注意事项 (一)选址与…

VUE自定义新增、复制、删除dom元素

功能需求&#xff0c;能灵活新增或删除一个dom元素&#xff0c;在此dom元素中还存在能灵活新增、删除的dom元素。实现后功能图如下&#xff1a; 点击新增策略&#xff0c;能新增整个策略dom 实现思路&#xff1a;定义一个数量和一个数组&#xff0c;然后使用循环遍历展示内容&a…

将iStoreOS部署到VMware ESXi变成路由器

正文共&#xff1a;888 字 19 图&#xff0c;预估阅读时间&#xff1a;1 分钟 前面把iStoreOS部署到了VMware workstation上&#xff08;将iStoreOS部署到VMware Workstation&#xff09;。如果想把iStoreOS直接部署到ESXi上&#xff0c;你会发现转换镜像不能直接生成OVF或者OV…

css+js实现导航栏色块跟随滑动+点击后增加样式

这篇文章&#xff0c;我给大家分享一个导航菜单的效果。用cssJS实现&#xff0c;效果如图&#xff1a; 本例实现效果&#xff1a;当鼠标移动到其他菜单项时&#xff0c;会有个背景色块跟随鼠标横向平滑移动。当鼠标点击后&#xff0c;被点击的菜单名称文字字体会加粗。 现在&…

《数字图像处理与机器视觉》案例四 基于分水岭算法的粘连物体的分割与计数

一、引言 分水岭算法&#xff08;Watershed Algorithm&#xff09;&#xff0c;是一种基于拓扑理论的数学形态学的分割方法&#xff0c;其基本思想是把图像看作是测地学上的拓扑地貌&#xff0c;图像中每一点像素的灰度值表示该点的海拔高度&#xff0c;每一个局部极小值及其影…

SpringBoot 集成Swagger在线接口文档 接口注解

介绍 Swagger接口文档是一种自动生成、描述、调用和可视化的RESTful风格Web服务接口文档的工具。它通过一系列的规范和自动化工具&#xff0c;极大地简化了后端开发人员与前端开发人员之间的协作。 依赖 <!--swagger--> <dependency><groupId>io.springfo…

「媒体邀约」天津媒体资源?媒体邀约宣传报道

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 媒体宣传加速季&#xff0c;100万补贴享不停&#xff0c;一手媒体资源&#xff0c;全国100城线下落地执行。详情请联系胡老师。 天津拥有丰富的媒体资源&#xff0c;利用这些资源进行有效…

ICMP协议详解及尝试用ping和tracert捕抓ICMP报文

一、ICMP协议 1.1、定义 ICMP&#xff08;Internet Control Message Protocol&#xff0c;互联网控制消息协议&#xff09;是一个支持IP层数据完整性的协议&#xff0c;主要用于在IP主机、路由器之间传递控制消息。这些控制消息用于报告IP数据报在传输过程中的错误&#xff0c…

C++ 语法

一、头文件与源文件 头文件用于声明函数,类似于java中service层的接口; 源文件用于实现头文件函数,相当于java中serviceImpl层的实现类; 定义接口 实现接口 使用接口 二、指针概述 定义与使用 定义一个指针p用于存a变量的内存地址,即指针就是地址; 解引用可以获取或修改…

40岁以上的中年人很难找到工作

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 你们有没有发现&#xff0c;90%的40岁以上的中年人&#xff0c;为了多挣钱&#xff0c;几乎除了吃饭和睡觉之外&#xff0c;都在拼命加班劳作&#xff0c;只要一停下来&#xff0c;心里就有一种内疚感&#xff0c;…