【Kafka】Kafka 架构深入

Kafka 工作流程及文件存储机制

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。


数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。


数据一致性问题

LEO:指的是每个副本最大的 offset; 
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。

1)follower 故障 

follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

2)leader 故障 

leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 


ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
●0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。

●1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。

●-1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。


Filebeat+Kafka+ELK

环境准备
node1:192.168.67.11        elasticsearch  kibana
node2:192.168.67.12        elasticsearch
apache:192.168.67.10               logstash  apache/nginx/mysql
Filebeat节点:filebeat/192.168.67.13           Filebeat
zk-kfk01:192.168.67.21                zookeeper、kafka
zk-kfk02:192.168.67.22                zookeeper、kafka
zk-kfk03:192.168.67.23                zookeeper、kafkasystemctl stop firewalld
systemctl enable firewalld
setenforce 0

1、部署 Zookeeper+Kafka 集群

2、部署 Filebeat 

cd /usr/local/filebeatvim filebeat.yml
filebeat.prospectors:
- type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]......
#添加输出到 Kafka 的配置
output.kafka:enabled: true#指定 Kafka 集群配置hosts: ["192.168.67.21:9092","192.168.67.22:9092","192.168.67.23:9092"]#指定 Kafka 的 topictopic: "httpd"

 注释掉logstash 出口,留下kafka出口;出口只能有一个


  
启动 filebeat
systemctl restart filebeat.service
systemctl status filebeat.service# ./filebeat -e -c filebeat.yml

报错:服务起不来;查看日志;

原因:是filebeat.yml中将日志同时输出到了kafka和logstash

解决:注释掉logstash即可

3、部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

cd /etc/logstash/conf.d/vim kafka.conf
input {kafka {#kafka集群地址bootstrap_servers => "192.168.67.21:9092,192.168.67.22:9092,192.168.67.23:9092"#拉取的kafka的指定topictopics  => "httpd"#指定 type 字段type => "httpd_kafka"#解析json格式的日志数据codec => "json"#拉取最近数据,earliest为从头开始拉取auto_offset_reset => "latest"#传递给elasticsearch的数据额外增加kafka的属性数据decorate_events => true}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.67.11:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.67.11:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}

 

启动 logstash

logstash -f kafka.conf


注:生产黑屏操作es时查看所有的索引:curl -X GET "localhost:9200/_cat/indices?v"

4、浏览器访问

浏览器访问 http://192.168.67.11:5601 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat_test-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/814514.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

30 超级数据查看器 视频 详情界面的便捷功能

30 超级数据查看器 视频 详情界面的便捷功能 【超级数据查看器 详情界面便捷功能-哔哩哔哩】 https://b23.tv/ACnsIXm 最下方有 讲解稿全文 有兴趣的朋友可以看看 超级数据查看器是安卓手机上的APP,软件。 具有导入excel表格数据,存入手机内置的数…

elasticSearch从零整合springboot项目实操

type会被弃用 ,就是说之后的elasticSearch中只会存在 索引(indices) 和 一行(document) 和字段(fields) elasticSearch 和solr的区别最大的就是 es对应的 是 json的格式 。 solr有xml和josn等…

自定义神经网络时的注意事项

问题描述 通过继承tf.keras.Model自定义神经网络模型时遇到的一系列问题。 代码如下, class STFT_ConV2D(tf.keras.Model):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.pre_layer tf.keras.Sequential([tf.keras.layers.Flatten()…

Spring Security Oauth2 之 理解OAuth 2.0授权流程

1. Oauth 定义 1.1 角色 OAuth定义了四个角色: 资源所有者 一个能够授权访问受保护资源的实体。当资源所有者是一个人时,它被称为最终用户。 资源服务器 托管受保护资源的服务器能够使用访问令牌接受和响应受保护的资源请求。 客户 代表资源所有…

Linux系统编程---文件IO

一、系统调用 由操作系统实现并提供给外部应用程序的编程接口(Application Programming Interface,API),用户程序可以通过这个特殊接口来获得操作系统内核提供的服务 系统调用和库函数的区别: 系统调用(系统函数) 内核提供的函数 库调用 …

一起学习python——基础篇(19)

今天来说一下python的如何修改文件名称、获取文件大小、读取文中指定的某一行内容。 1、修改文件名称: import os testPath"D:/pythonFile/test.txt" testPath2"D:/pythonFile/test2.txt" #修改文件名称使用rename方法, #第一个参…

TQ15EG开发板教程:在MPSOC上运行ADRV9009(vivado2018.3)

首先需要在github上下载两个文件,本例程用到的文件以及最终文件我都会放在网盘里面, 地址放在最后面。在github搜索hdl选择第一个,如下图所示 GitHub网址:https://github.com/analogdevicesinc/hdl/releases 点击releases选择版…

31省结婚、离婚、再婚等面板数据(1990-2022年)

01、数据介绍 一般来说,经济发达地区的结婚和离婚率相对较高,而经济欠发达地区的结婚和离婚率相对较低。此外,不同省份的文化、习俗、社会观念等因素也会对结婚和离婚情况产生影响。 本数据从1990年至2022年,对各地区的结婚、离…

Vue-router的编程式导航有哪些方法

Vue Router 的编程式导航主要提供了以下方法&#xff1a; push&#xff1a;这个方法会向 history 栈添加一个新的记录&#xff0c;所以当用户点击浏览器后退按钮时&#xff0c;则回到之前的 URL。当你点击 <router-link> 时&#xff0c;这个方法会在内部被调用&#xff…

6-169 删除递增链表两个值之间的元素 - 人邮DS(C 第2版)线性表习题2(8)

设计一个算法,删除递增有序链表中值大于mink且小于maxk的所有元素(mink和maxk是给定的两个参数,其值可以和表中的元素相同,也可以不同 )。 函数接口定义: void DeleteMinMax(LinkList const &L, int mink, int maxk); L - 递增链表的指针 mink - 被删除元素值的最…

【C++】每日一题 392 判断子序列

给定字符串 s 和 t &#xff0c;判断 s 是否为 t 的子序列。 字符串的一个子序列是原始字符串删除一些&#xff08;也可以不删除&#xff09;字符而不改变剩余字符相对位置形成的新字符串。&#xff08;例如&#xff0c;"ace"是"abcde"的一个子序列&#…

014_files_in_MATLAB中的文件读写

MATLAB中的文件读写 这一篇就要简单介绍MATLAB中的典型文件类型和文件操作。 基于字节流的接口 Matlab本身提供的文件操作是比较接近底层的&#xff0c;这一套底层的文件原语&#xff0c;主要是fopen、fclose、fread、fwrite、fseek、ftell、feof、ferror等函数。这些函数的…

Github 2024-04-14 php开源项目日报Top9

根据Github Trendings的统计,今日(2024-04-14统计)共有9个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量PHP项目9TypeScript项目1Laravel: 以优雅语法简化Web开发 创建周期:4028 天开发语言:PHP协议类型:MIT LicenseStar数量:30824 个Fork数量:1…

《青少年成长管理2024》046 “成长目标:你是谁呀?”2/3

《青少年成长管理2024》046 “成长目标&#xff1a;你是谁呀&#xff1f;”2/3 七、机器智能&#xff1f;八、天赋没有对错&#xff08;一&#xff09;天赋的客观性&#xff08;二&#xff09;我笨我没错&#xff08;三&#xff09;我聪明只是我幸运&#xff08;四&#xff09;…

在Linux驱动中,如何确保中断上下文的正确保存和恢复?

大家好&#xff0c;今天给大家介绍在Linux驱动中&#xff0c;如何确保中断上下文的正确保存和恢复&#xff1f;&#xff0c;文章末尾附有分享大家一个资料包&#xff0c;差不多150多G。里面学习内容、面经、项目都比较新也比较全&#xff01;可进群免费领取。 在Linux驱动中&am…

windows系统搭建OCR半自动标注工具PaddleOCR

深度学习 文章目录 深度学习前言一、环境搭建准备方式1&#xff1a;安装Anaconda搭建1. Anaconda下载地址: [点击](https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/?CM&OD)2. 创建新的conda环境 方式2. 直接安装python 二、安装CPU版本1. 安装PaddlePaddle2、安装…

中国省级人口结构数据集(2002-2022年)

01、数据简介 人口结构数据不仅反映了地域特色&#xff0c;更是预测地区未来发展趋势的重要工具。在这些数据中&#xff0c;总抚养比、少年儿童抚养比和老年人口抚养比是三大核心指标。 少儿抚养比0-14周岁人口数/15-64周岁人口数 老年抚养比65周岁及以上人口数/15-64周岁人…

leetcode每日一题(1702. 修改后的最大二进制字符串)

题目描述 题解 这道题贪心的思想&#xff0c;我们只需要尽可能多的把0变成1&#xff0c;而且进行操作1才能使其变大。观察发现以下几点&#xff1a; 不论原字符串有多少个0&#xff0c;最后都会剩余1个0。 假设原字符串只有一个0&#xff0c;不能进行任何操作&#xff0c;显然…

一招将vscode自动补全的双引号改为单引号

打开设置&#xff0c;搜索quote&#xff0c;在结果的HTML选项下找到自动完成&#xff0c;设置默认引号类型即可。 vscode版本&#xff1a;1.88.1&#xff0c; vscode更新日期&#xff1a;2024-4-10

利用Java代码调用Lua脚本改造分布式锁

4.8 利用Java代码调用Lua脚本改造分布式锁 lua脚本本身并不需要大家花费太多时间去研究&#xff0c;只需要知道如何调用&#xff0c;大致是什么意思即可&#xff0c;所以在笔记中并不会详细的去解释这些lua表达式的含义。 我们的RedisTemplate中&#xff0c;可以利用execute方…