kafka控制台模拟消费_Kafka 详解

f7841ea997101b18d85d7b21caf6b551.png

kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

关键词

  • 分布式流处理平台。
  • 在系统之间构建实时数据流管道。
  • 以topic分类对记录进行存储
  • 每个记录包含key-value+timestamp
  • 每秒钟百万消息吞吐量。

安装kafka

0.选择三台主机安装kafka
1.准备zk
略
2.jdk
略
3.tar文件
4.环境变量
略
5.配置kafka
[kafka/config/server.properties]
...
broker.id=201
...
listeners=PLAINTEXT://:9092
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:21816.分发server.properties,同时修改每个文件的broker.id7.启动kafka服务器
a)先启动zk
b)启动kafka
[s202 ~ s204]
$>bin/kafka-server-start.sh -daemon config/server.propertiesc)验证kafka服务器是否启动
$>netstat -anop | grep 90928.创建主题 
$>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test9.查看主题列表
$>bin/kafka-topics.sh --list --zookeeper s201:218110.启动控制台生产者
$>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test11.启动控制台消费者
$>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:218112.在生产者控制台输入hello world

kafka 的使用场景

  • 埋点日志的收集一个公司可以用Kafka可以收集各种服务的log。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和flink
  • 事件源

kafka如何保证的消息数据不丢失

当讨论这个问题的时候,首先需要考量kafka的运行机制。kafka主要分为三个组件,producer、consumer、broker。所以也必须从三个方面去考量,producer、consumer、broker端数据不丢失。

c02d972262acaf41d53b83a5abf8ecd9.png

一、producer端如何保证数据不丢失

1.ack的配配置策略

acks = 0    
生产者发送消息之后 不需要等待服务端的任何响应,它不管消息有没有发送成功,如果发送过程中遇到了异常,
导致broker端没有收到消息,消息也就丢失了。实际上它只是把消息发送到了socketBuffer(缓存)中,
而socketBuffer什么时候被提交到broker端并不关心,它不担保broker端是否收到了消息,
但是这样的配置对retry是不起作用的,因为producer端都不知道是否发生了错误,
而且对于offset的获取永远都是-1,因为broker端可能还没有开始写数据。
这样不保险的操作为什么还有这样的配置?kafka对于收集海量数据,
如果在收集某一项日志时是允许数据量有一定丢失的话,是可以用这种配置来收集日志。   
acks = 1(默认值)    
生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
其实就是消息只发给了leader leader收到消息后会返回ack到producer端。
如果消息无法写入leader时(选举、宕机等情况时),生产都会收到一个错误的响应,为了避免消息丢失,
生产者可以选择重发消息,如果消息成功写入,在被其它副本同步数据时leader  崩溃,那么此条数据
还是会丢失,因为新选举的leader是没有收到这条消息,ack设置为1是消息可靠性和吞吐量折中的方案。  
acks = all (或-1)    
生产者在发送消息之后,需要等待ISR中所有的副本都成功写入消息之后才能够收到来自服务端的成功响应,
在配置环境相同的情况下此种配置可以达到最强的可靠性。即:在发送消息时,需要leader 向fllow 
同步完数据之后,也就是ISR队列中所有的broker全部保存完这条消息后,才会向ack发送消息,表示发送成功。

2.retries的配置策略

在kafka中错误分为2种,一种是可恢复的,另一种是不可恢复的。  

  • 可恢复性的错误:  

如遇到在leader的选举、网络的抖动等这些异常时,如果我们在这个时候配置的retries大于0的, 也就是可以进行重试操作,那么等到leader选举完成后、网络稳定后,这些异常就会消息,错误也就可以恢复, 数据再次重发时就会正常发送到broker端。需要注意retries(重试)之间的时间间隔, 以确保在重试时可恢复性错误都已恢复。  

  • 不可恢复性的错误:  

如:超过了发送消息的最大值(max.request.size)时,这种错误是不可恢复的,如果不做处理, 那么数据就会丢失,因此我们需要注意在发生异常时把这些消息写入到DB、缓存本地文件中等等, 把这些不成功的数据记录下来,等错误修复后,再把这些数据发送到broker端。

如何选择

高可用型配置

acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)  

  • 优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。  
  • 缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长

折中型配置:

acks = 1 retries > 0 retries 时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)  

  • 优点:保证了消息的可靠性和吞吐量,是个折中的方案  
  • 缺点:性能处于2者中间3.高吞吐型   

高效率配置:

acks = 0  

  • 优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求  
  • 缺点:不知道发送的消息是 否成功

每种配置都有对应的生产用途,视情况而定。。

二、consumer端如何保证数据不丢失

consumer端配置

1、group.id: consumer group 分组的一个id

消费者隶属消费组的名称,kafka的每个partition值允许同一个group的一个consumer消费。这样做的目的是为了保证kafka的高吞吐量

2、auto.offset.reset = earliest(最早) /latest(最晚)

设置从哪个位置开始消费

3、enable.auto.commit = true/false(默认true)

当设置为true时,意味着由kafka的consumer端自己间隔一定的时间会自动提交
offset,如果设置成了fasle,也就是由客户端(自己写代码)来提交,那就还得控制提交的时间间隔
auto.commit.interval.ms
当enabe.auto.commit设置为true时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔。

在consumer阶段,如果设置为true,意味着会自动提交offset,比如说当你pull了30条数据,但是当处理20条数据的时候自动提交了commit,当处理21条数据的时候,系统崩了,那当你再去拉取数据的时候,就会从30开始啦,那就会丢失21-30的数据

如果设置为false,可以手动提交,你可以处理一条提交一次,也可以处理一批提交一批,但是consumer在消费数据的时候,是以batch的模式去pull数据的,假设pull了30条数据,你在处理30条数据的时候,没处理一条,就提交一次的话,会非常影响消费能力,你可以还是按照一批来处理,设置一个累加器,处理一条加1,如果在处理数据时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次消费的时候就可以从提交的offset处进行再次消费。

consumer 保证确保消息只被处理一次处理,同时确保幂等性
需要结合具体的业务来看 :

  • 比如你拿个数据要写库,先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧
  • 比如你是写redis,那没问题了,反正每次都是set,天然幂等性
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

三、broker端是如何保证数据不丢失的

1.replication-factor 3    

在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。

2.min.insync.replicas = 2     

分区ISR队列集合中最少有多少个副本,默认值是1 

3.unclean.leader.election.enable = false     

是否允许从ISR队列中选举leader副本,默认值是false,如果设置成true,则可能会造成数据丢失。

leader选举造成的数据丢失

3个replica分别为0 1 2,0为leader,数据都能完全同步到100,在某一时刻,分别有2个fllow挂掉了,此时有producer往0 的replica上发送50条数据完后,此时的leader挂掉了,而此时刚好的1个fllow起来了,它没有向leader上feach数据,因为leader已经不存在了,此时有2种处理方法:重新起来的fllow可以成为1个leader,需要通过 unclean.leader.election.enable=true,这样做保证了高可用,但是这样做的弊端是:新起来的fllow成为了leader,但是它会丢失部分数据,虽然这样保证了高可用。另一种情况是设置为false,不让fllow竞选leader,但是这样也会造成数据的丢失。假如在ISR的队列里面,只有0 1,但此时replica 1 没有来得及向leader feach数据leader挂掉了,这样也会造成数据的丢失。

broker配置策略

  • min.insync.replica

在一个topic中,1个分区 有3个副本,在创建时设置了min.insync.replica=2,假如此时在ISR中只有leader副本(1个)存在,在producer端生产数据时,此时的acks=all,这也就意味着在producer向broker端写数据时,必须保证ISR中指定数量的副本(包含leader、fllow副本)全部同步完成才算写成功,这个数量就是由min.insync.replica来控制的,这样producer端向broker端写数据是不成功,因为ISR中只有leader副本,min.insync.replica要求2个副本,此时的producer生产数据失败(异常),当然consumer端是可以消费数据的,只不过是没有新数据产生而已.这样保证了数据的一致性,但这样会导致高可用性降低了。一般的配置是按: n/2 +1 来配置min.insync.replicas 的数量的,

同时也要将unclean.leader.election.enable=false

  • unclean.leader.election.enable

假如现在有leader 0 fllow 1 fllow 2 三个副本,存储的数据量分别是10 9 8,此时的broker的配置是:min.insync.replica=2 acks=all,leader的数据更新到了15,在没有同步到fllow 1 fllow 2时挂掉了,此时的ISR队列中是有fllow 1 和fllow 2的,如果unclean.leader.election.enable设置的是true,表示在ISR中的副本是可以竞选leader这样就会造成9-15或8-15之间的数据丢失,所以unclean.leader.election.enable必须设置成成false,这样整个kafka cluster都不读写了,这样就保证了数据的高度一致性.

kafka中topic设计原理

因为consumer group 中所有的consumer一定会消费topic中的partition,而一个partition只能同时被同一group中的一个consumer消费;

所以最优的设计就是:

  • consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
  • 一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率

参考文章

https://www.cnblogs.com/MrRightZhao/p/11498952.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/289493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python在web可以开发吗_Python Web开发

参考原文 WSGI接口 WSGI(Web Server Gateway Interface)是一个接口,用来屏蔽底部的细节(如TCP的建立连接,HTTP原始请求和响应格式等)。WSGI接口定义非常简单,只需要Web开发者实现一个函数&#…

更新丨.NET 7 预览版2 中的 ASP.NET Core

点击上方蓝字 关注我们(本文阅读时间:6分钟).NET 7 预览版2 现已推出,其中包括对 ASP.NET Core 的许多重大改进。以下是此预览版中新增内容的摘要:• 推断来自服务的 API 控制器操作参数;• SignalR 集线器方法的依赖注…

LoadRunner+Android模所器实现抓包并调试本地服务端

为了测试Android软件的服务端的功能,需要重现某些客户端操作,便于发现功能问题,性能问题。也方便客户端与本机服务端特别是服务端代码进行断点调试。这个时候需要对网络操作进行重现。loadRunner是hp公司开发的压力测试工具。功能比较强大&am…

架构师

系统架构师是一个既需要掌控整体又需要洞悉局部瓶颈并依据具体的业务场景给出解决方案的人。具体来说是一个确认和评估系统需求,给出开发规范,搭建系统实现的核心构架,并澄清技术细节、扫清主要难点的技术人员。主要着眼于系统的“技术实现”…

统信uos系统考试题_148款!富士通及旗下晟拓品牌系列打印机适配统信UOS

近日,南京富士通电子信息科技股份有限公司(简称:富士通)及其旗下晟拓子品牌148款主流打印机产品与统信桌面操作系统UOS的适配工作即将完成,这次适配涵盖了富士通及晟拓的常用主流机型。富士通正式成为统信软件产品生态合作伙伴。本次适配&…

手机浏览器html5游戏,移动浏览器都爱 HTML5 ?

目前中国第三方手机浏览器市场竞争正在愈演愈烈,但由于各应用开发商对手机浏览器的内容和资源的整合能力、技术研发能力、战略布局目的等方面均各不相同,浏览器产品也出现了同质化严重、内容匮乏等问题,亮点突出、吸引用户的产品较缺乏&#…

怎么快速了解自己的MySQL服务器?

From: http://www.cnblogs.com/benshan/archive/2013/01/09/2853097.html 1、查看数据库服务器状态:status Linux 下的MySQL服务器状态 该列表中主要包括MySQL的版本(为version 5.1.61)、运行平台(debian-linux-gnu(i686)&#xf…

八类网线和七类网线的区别_什么是七类网线?七类网线水晶头如何制作?

要了解七类网线如何使用?需要掌握这四个问题:1、什么是七类网线?2、七类网线与六类网线有什么区别?3、七类网线用什么水晶头?如何制作7类网线水晶头?4、7类网线的应用场景?带着这四个问题&#…

poj2632 累死了

题意: 给定A*B的格子,放入N个机器人,每个机器人初始位置及朝向给定。给定M条指令。指令类型有三种: 1、L:左转90 2、R:右转90 3、F:前进一格 问执行指令过程中机器人是否发生碰撞&am…

代码生成器原理

整个架设思路分的4个部分:A:底层物理数据库层,主要是存储数据用的。B:数据库访问层,主要是为了写一套代码可以跑在多种数据库上。C:一些辅助工具、基础组件,是为了加强自动产生代码的功能、简化…

【Blog.Core开源】将Program升级为.NET6.0版本

大家假期好,好久不见,之前忙于其他事情,公众号暂时搁置了一个月了,新的一年开始了,很多小伙伴开始催更了,粉丝的要求必须满足。2022年打算重点推广BCVP社区,所以还是希望有愿意投稿的小伙伴&…

python画两条曲线_查找在matplotlib中绘制的两条曲线之间的区域(在区域之间填充)...

我有两条曲线的x和y值列表,它们都有奇怪的形状,而且我没有任何函数。我需要做两件事:(1)绘制它并对曲线之间的区域进行着色,如下图所示;(2)找到曲线之间该着色区域的总面…

OXY OPENCART 商城自适应主题模板 ABC-0020-05

OXY OPENCART 商城自适应主题模板 ABC-0020-05OXY FEATURESLayoutFoundation Framework – the most advanced responsive front-end framework in the world.100% Fully Responsive – Solid Flexible Responsive Layout that scales from 320px to 1440px.You can disable re…

年月跨度_建筑结构丨国内跨度最大的张弦桁架工程——合肥滨湖国际会展中心二期首榀桁架滑移成功...

来源:中建科工 华中大区。2020年12月8日全国公建领域最大跨度的张弦桁架钢结构工程合肥滨湖国际会展中心二期首榀桁架滑移顺利完成合肥滨湖国际会展中心二期项目位于合肥市滨湖新区锦绣大道与广西路交口,该项目2#综合馆建筑面积约4.8万平方米&#xff0c…

【C#/.NET】不用AutoMapper,我用啥呢?

微信公众号:趣编程ACE关注可了解更多的.NET日常实战开发技巧。如需源码请后台留言源码;**[如果觉得对您有帮助,欢迎关注]TinyMapper简介本文来自社区群粉丝投稿TinyMapper是一个.NET平台下的一个轻量级对象映射工具,号称是.Net平台下最快的对…

HBase键值分片的简单运用

1.基本原理介绍 HBase的存储是通过行键建立索引进行存储的,而且HBase只支持一级索引,因此只要通过行键才能很快的找到需要的数据。HBase是一个分布式的系统,HBase通过行键的分片,把不同的数据存储在不同的主机上。1).顺序分片依据…

我要3万取款机怎么取_7万的新宝骏RS-3怎么样?用车三个月后,车主说出了实话...

小型SUV领域,作为目前SUV市场之中竞争最激烈的细分领域之一,已经成为众多车企的必争之地。而正因如此,小型SUV市场所具有的车型非常的丰富,且其中更是一点也不缺爆款车型,正如在去年底刚上市的一款小型SUV——新宝骏RS…

MODIS数据说明

MODIS目前主要存在于两颗卫星上:TERRA和AQUA。TERRA卫星每日地方时上午10:30时过境,因此也把它称作地球观测第一颗上午星(EOS-AM1)。AQUA每日地方时下午过境,因此称作地球观测第一颗下午星(EOS-PM1)。两颗星相互配合,每1-2天可重复…

.NET 为大型应用接入 ApplicationStartupManager 启动流程框架

对于大型的应用软件,特别是客户端应用软件,应用启动过程中,需要执行大量的逻辑,包括各个模块的初始化和注册等等逻辑。大型应用软件的启动过程都是非常复杂的,而客户端应用软件是对应用的启动性能有所要求的&#xff0…

思科为其核心网络业务增加了订阅服务

据国外媒体报道,思科系统公司的核心网络业务于周二开始进军订阅服务市场,其向大型企业宣布了一系列新的安全和自动化工具。 这个新的“直观网络”代表了思科如何从其旗舰业务中获得收入的重大变化。San Jose公司一直依赖于网络设备的销售,这些…