kafka控制台模拟消费_Kafka 详解

f7841ea997101b18d85d7b21caf6b551.png

kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

关键词

  • 分布式流处理平台。
  • 在系统之间构建实时数据流管道。
  • 以topic分类对记录进行存储
  • 每个记录包含key-value+timestamp
  • 每秒钟百万消息吞吐量。

安装kafka

0.选择三台主机安装kafka
1.准备zk
略
2.jdk
略
3.tar文件
4.环境变量
略
5.配置kafka
[kafka/config/server.properties]
...
broker.id=201
...
listeners=PLAINTEXT://:9092
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:21816.分发server.properties,同时修改每个文件的broker.id7.启动kafka服务器
a)先启动zk
b)启动kafka
[s202 ~ s204]
$>bin/kafka-server-start.sh -daemon config/server.propertiesc)验证kafka服务器是否启动
$>netstat -anop | grep 90928.创建主题 
$>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test9.查看主题列表
$>bin/kafka-topics.sh --list --zookeeper s201:218110.启动控制台生产者
$>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test11.启动控制台消费者
$>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:218112.在生产者控制台输入hello world

kafka 的使用场景

  • 埋点日志的收集一个公司可以用Kafka可以收集各种服务的log。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和flink
  • 事件源

kafka如何保证的消息数据不丢失

当讨论这个问题的时候,首先需要考量kafka的运行机制。kafka主要分为三个组件,producer、consumer、broker。所以也必须从三个方面去考量,producer、consumer、broker端数据不丢失。

c02d972262acaf41d53b83a5abf8ecd9.png

一、producer端如何保证数据不丢失

1.ack的配配置策略

acks = 0    
生产者发送消息之后 不需要等待服务端的任何响应,它不管消息有没有发送成功,如果发送过程中遇到了异常,
导致broker端没有收到消息,消息也就丢失了。实际上它只是把消息发送到了socketBuffer(缓存)中,
而socketBuffer什么时候被提交到broker端并不关心,它不担保broker端是否收到了消息,
但是这样的配置对retry是不起作用的,因为producer端都不知道是否发生了错误,
而且对于offset的获取永远都是-1,因为broker端可能还没有开始写数据。
这样不保险的操作为什么还有这样的配置?kafka对于收集海量数据,
如果在收集某一项日志时是允许数据量有一定丢失的话,是可以用这种配置来收集日志。   
acks = 1(默认值)    
生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
其实就是消息只发给了leader leader收到消息后会返回ack到producer端。
如果消息无法写入leader时(选举、宕机等情况时),生产都会收到一个错误的响应,为了避免消息丢失,
生产者可以选择重发消息,如果消息成功写入,在被其它副本同步数据时leader  崩溃,那么此条数据
还是会丢失,因为新选举的leader是没有收到这条消息,ack设置为1是消息可靠性和吞吐量折中的方案。  
acks = all (或-1)    
生产者在发送消息之后,需要等待ISR中所有的副本都成功写入消息之后才能够收到来自服务端的成功响应,
在配置环境相同的情况下此种配置可以达到最强的可靠性。即:在发送消息时,需要leader 向fllow 
同步完数据之后,也就是ISR队列中所有的broker全部保存完这条消息后,才会向ack发送消息,表示发送成功。

2.retries的配置策略

在kafka中错误分为2种,一种是可恢复的,另一种是不可恢复的。  

  • 可恢复性的错误:  

如遇到在leader的选举、网络的抖动等这些异常时,如果我们在这个时候配置的retries大于0的, 也就是可以进行重试操作,那么等到leader选举完成后、网络稳定后,这些异常就会消息,错误也就可以恢复, 数据再次重发时就会正常发送到broker端。需要注意retries(重试)之间的时间间隔, 以确保在重试时可恢复性错误都已恢复。  

  • 不可恢复性的错误:  

如:超过了发送消息的最大值(max.request.size)时,这种错误是不可恢复的,如果不做处理, 那么数据就会丢失,因此我们需要注意在发生异常时把这些消息写入到DB、缓存本地文件中等等, 把这些不成功的数据记录下来,等错误修复后,再把这些数据发送到broker端。

如何选择

高可用型配置

acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)  

  • 优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。  
  • 缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长

折中型配置:

acks = 1 retries > 0 retries 时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)  

  • 优点:保证了消息的可靠性和吞吐量,是个折中的方案  
  • 缺点:性能处于2者中间3.高吞吐型   

高效率配置:

acks = 0  

  • 优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求  
  • 缺点:不知道发送的消息是 否成功

每种配置都有对应的生产用途,视情况而定。。

二、consumer端如何保证数据不丢失

consumer端配置

1、group.id: consumer group 分组的一个id

消费者隶属消费组的名称,kafka的每个partition值允许同一个group的一个consumer消费。这样做的目的是为了保证kafka的高吞吐量

2、auto.offset.reset = earliest(最早) /latest(最晚)

设置从哪个位置开始消费

3、enable.auto.commit = true/false(默认true)

当设置为true时,意味着由kafka的consumer端自己间隔一定的时间会自动提交
offset,如果设置成了fasle,也就是由客户端(自己写代码)来提交,那就还得控制提交的时间间隔
auto.commit.interval.ms
当enabe.auto.commit设置为true时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔。

在consumer阶段,如果设置为true,意味着会自动提交offset,比如说当你pull了30条数据,但是当处理20条数据的时候自动提交了commit,当处理21条数据的时候,系统崩了,那当你再去拉取数据的时候,就会从30开始啦,那就会丢失21-30的数据

如果设置为false,可以手动提交,你可以处理一条提交一次,也可以处理一批提交一批,但是consumer在消费数据的时候,是以batch的模式去pull数据的,假设pull了30条数据,你在处理30条数据的时候,没处理一条,就提交一次的话,会非常影响消费能力,你可以还是按照一批来处理,设置一个累加器,处理一条加1,如果在处理数据时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次消费的时候就可以从提交的offset处进行再次消费。

consumer 保证确保消息只被处理一次处理,同时确保幂等性
需要结合具体的业务来看 :

  • 比如你拿个数据要写库,先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧
  • 比如你是写redis,那没问题了,反正每次都是set,天然幂等性
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

三、broker端是如何保证数据不丢失的

1.replication-factor 3    

在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。

2.min.insync.replicas = 2     

分区ISR队列集合中最少有多少个副本,默认值是1 

3.unclean.leader.election.enable = false     

是否允许从ISR队列中选举leader副本,默认值是false,如果设置成true,则可能会造成数据丢失。

leader选举造成的数据丢失

3个replica分别为0 1 2,0为leader,数据都能完全同步到100,在某一时刻,分别有2个fllow挂掉了,此时有producer往0 的replica上发送50条数据完后,此时的leader挂掉了,而此时刚好的1个fllow起来了,它没有向leader上feach数据,因为leader已经不存在了,此时有2种处理方法:重新起来的fllow可以成为1个leader,需要通过 unclean.leader.election.enable=true,这样做保证了高可用,但是这样做的弊端是:新起来的fllow成为了leader,但是它会丢失部分数据,虽然这样保证了高可用。另一种情况是设置为false,不让fllow竞选leader,但是这样也会造成数据的丢失。假如在ISR的队列里面,只有0 1,但此时replica 1 没有来得及向leader feach数据leader挂掉了,这样也会造成数据的丢失。

broker配置策略

  • min.insync.replica

在一个topic中,1个分区 有3个副本,在创建时设置了min.insync.replica=2,假如此时在ISR中只有leader副本(1个)存在,在producer端生产数据时,此时的acks=all,这也就意味着在producer向broker端写数据时,必须保证ISR中指定数量的副本(包含leader、fllow副本)全部同步完成才算写成功,这个数量就是由min.insync.replica来控制的,这样producer端向broker端写数据是不成功,因为ISR中只有leader副本,min.insync.replica要求2个副本,此时的producer生产数据失败(异常),当然consumer端是可以消费数据的,只不过是没有新数据产生而已.这样保证了数据的一致性,但这样会导致高可用性降低了。一般的配置是按: n/2 +1 来配置min.insync.replicas 的数量的,

同时也要将unclean.leader.election.enable=false

  • unclean.leader.election.enable

假如现在有leader 0 fllow 1 fllow 2 三个副本,存储的数据量分别是10 9 8,此时的broker的配置是:min.insync.replica=2 acks=all,leader的数据更新到了15,在没有同步到fllow 1 fllow 2时挂掉了,此时的ISR队列中是有fllow 1 和fllow 2的,如果unclean.leader.election.enable设置的是true,表示在ISR中的副本是可以竞选leader这样就会造成9-15或8-15之间的数据丢失,所以unclean.leader.election.enable必须设置成成false,这样整个kafka cluster都不读写了,这样就保证了数据的高度一致性.

kafka中topic设计原理

因为consumer group 中所有的consumer一定会消费topic中的partition,而一个partition只能同时被同一group中的一个consumer消费;

所以最优的设计就是:

  • consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
  • 一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率

参考文章

https://www.cnblogs.com/MrRightZhao/p/11498952.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/289493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android之运行PopupWindow提示Unable to add window -- token null is not valid; is your activity running?

1、问题 在使用PopupWindow的时候,我们构建好了直接放在Activity的onCreate函数里面直接运行,提示这个错误 Unable to add window -- token null is not valid; is your activity running? 2、原因分析 popupWindow显示依赖activity,并且要等activity所有的生命周期方法…

android开发板出错,为开发板编译时出错是什么原因

我也是怎么办Arduino:1.8.4 (Windows 7), 开发板:"Arduino/Genuino Uno"C:\Program Files\Arduino\arduino-builder -dump-prefs -loggermachine -hardware C:\Program Files\Arduino\hardware -hardware C:\Users\Administrator\AppData\Local\Arduino15…

python在web可以开发吗_Python Web开发

参考原文 WSGI接口 WSGI(Web Server Gateway Interface)是一个接口,用来屏蔽底部的细节(如TCP的建立连接,HTTP原始请求和响应格式等)。WSGI接口定义非常简单,只需要Web开发者实现一个函数&#…

更新丨.NET 7 预览版2 中的 ASP.NET Core

点击上方蓝字 关注我们(本文阅读时间:6分钟).NET 7 预览版2 现已推出,其中包括对 ASP.NET Core 的许多重大改进。以下是此预览版中新增内容的摘要:• 推断来自服务的 API 控制器操作参数;• SignalR 集线器方法的依赖注…

LoadRunner+Android模所器实现抓包并调试本地服务端

为了测试Android软件的服务端的功能,需要重现某些客户端操作,便于发现功能问题,性能问题。也方便客户端与本机服务端特别是服务端代码进行断点调试。这个时候需要对网络操作进行重现。loadRunner是hp公司开发的压力测试工具。功能比较强大&am…

Linux shell命令 cp 加上-f还是提示是否覆盖

这是由于环境变量中有 allias cpcp -i 为了去掉这个系统自带的别名,能够使用grep -r --include"*" "alias cp" /查找设置这个环境变量的脚本文件: 我的ubuntu机器上是: ./.bash_aliases:alias cpcp -i 于是我将./.bash_…

Android之让代码跑在主线程(无context上下文)的封装

1、问题 有一段代码需要跑在主线程里面,但是没有context上下文,一开始直接想到runOnUiThread,好像不行 runOnUiThread(new Runnable(){public void run(){//执行更新ui的操作} }); 2、解决办法 封装一个handler的单例类 import android.o…

Sub-process /usr/bin/dpkg returned an error code (1)

1.$ sudo mv /var/lib/dpkg/info /var/lib/dpkg/info_old //现将info文件夹更名 2.$ sudo mkdir /var/lib/dpkg/info //再新建一个新的info文件夹 3.$ sudo apt-get update,$ apt-get -f install //不用解释了吧 4.$ sudo mv /var/lib/dpkg/info/* /var/lib/dpkg/info_old //执…

架构师

系统架构师是一个既需要掌控整体又需要洞悉局部瓶颈并依据具体的业务场景给出解决方案的人。具体来说是一个确认和评估系统需求,给出开发规范,搭建系统实现的核心构架,并澄清技术细节、扫清主要难点的技术人员。主要着眼于系统的“技术实现”…

统信uos系统考试题_148款!富士通及旗下晟拓品牌系列打印机适配统信UOS

近日,南京富士通电子信息科技股份有限公司(简称:富士通)及其旗下晟拓子品牌148款主流打印机产品与统信桌面操作系统UOS的适配工作即将完成,这次适配涵盖了富士通及晟拓的常用主流机型。富士通正式成为统信软件产品生态合作伙伴。本次适配&…

手机浏览器html5游戏,移动浏览器都爱 HTML5 ?

目前中国第三方手机浏览器市场竞争正在愈演愈烈,但由于各应用开发商对手机浏览器的内容和资源的整合能力、技术研发能力、战略布局目的等方面均各不相同,浏览器产品也出现了同质化严重、内容匮乏等问题,亮点突出、吸引用户的产品较缺乏&#…

使用 C# 实现 URL 安全的 Base62 转码

Base62 编码与 Base64 编码类似,都用于数据内容编码。Base64 和 Base62 的目的在都在于将需要传输的内容进行编码,尤其是一些特殊字符(如不可见字符、传输时与协议头冲突的字符)。与 Base64 不同的是,Base62 编码是由1…

易成新能加码光伏产业链 作价28.29亿收购赛维两子公司

易成新能11月17日晚公告称,拟通过向债权人发行股份及支付现金方式,合计作价28.29亿元,取得破产重整后江西赛维100%股权和新余赛维100%股权。 同时,公司拟向控股股东中国平煤神马集团等5名对象,募集配套资金不超过21亿元…

sql学习笔记---公用表达式(CTE)

http://www.cnblogs.com/kissdodog/archive/2013/06/24/3153012.html转载于:https://www.cnblogs.com/changrulin/p/5108917.html

python中匿名函数的作用_什么是Python中的匿名函数

匿名函数 lambda x , y : xy 1.匿名的目的就是要没有名字,给匿名函数赋给一个名字是没有意义的。 2.匿名函数的参数规则、作用域关系与有名函数是一样的。 3.匿名函数的函数体通常应该是 一个表达式,该表达式必须要有一个返回值。 flambda x,n:x ** n print(f(2,3))…

IOS学习笔记二十NSSet和NSMutableSet

1、NSSet、NSMutableSet NSSet元素无序、不能重复 NSMutableSet元素无序、不能重复,有一些向集合中增加元素的功能、删除元素....... 2、测试Demo int main(int argc, char * argv[]) {autoreleasepool {NSSet *set [NSSet setWithObjects:"chenyu", "hello…

怎么快速了解自己的MySQL服务器?

From: http://www.cnblogs.com/benshan/archive/2013/01/09/2853097.html 1、查看数据库服务器状态:status Linux 下的MySQL服务器状态 该列表中主要包括MySQL的版本(为version 5.1.61)、运行平台(debian-linux-gnu(i686)&#xf…

八类网线和七类网线的区别_什么是七类网线?七类网线水晶头如何制作?

要了解七类网线如何使用?需要掌握这四个问题:1、什么是七类网线?2、七类网线与六类网线有什么区别?3、七类网线用什么水晶头?如何制作7类网线水晶头?4、7类网线的应用场景?带着这四个问题&#…

html 图片 保持长宽比,实现图片在页面中宽高一直保持16:9比例的方法

实现图片在页面中宽高一直保持16:9比例的方法发布时间:2020-08-31 14:25:10来源:亿速云阅读:133作者:小新小编给大家分享一下实现图片在页面中宽高一直保持16:9比例的方法,希望大家阅读完这篇文…

poj2632 累死了

题意: 给定A*B的格子,放入N个机器人,每个机器人初始位置及朝向给定。给定M条指令。指令类型有三种: 1、L:左转90 2、R:右转90 3、F:前进一格 问执行指令过程中机器人是否发生碰撞&am…