7.kafka+ELK连接

文章目录

  • kafka+ELK连接
    • 部署Kafka
    • kafka操作命令
    • kafka架构深入
    • Filebeat+Kafka+ELK连接

kafka+ELK连接

部署Kafka

###关闭防火墙systemctl stop firewalld
systemctl disable firewalldsetenforce 0vim /etc/selinux/configSELINUX=disabled
###下载安装包官方下载地址:http://kafka.apache.org/downloads.htmlcd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
###安装 Kafkacd /opt/
tar xf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
###修改配置文件##备份配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}vim server.properties---21行--
broker.id=0    
###21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2---31行---
listeners=PLAINTEXT://192.168.242.70:9092    
###31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改---42行---
num.network.threads=3    
###42行,broker 处理网络请求的线程数量,一般情况下不需要去修改---45行---
num.io.threads=8            
#45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数---48行---
socket.send.buffer.bytes=102400       
#48行,发送套接字的缓冲区大小---51行---
socket.receive.buffer.bytes=102400    
#51行,接收套接字的缓冲区大小---54行---
socket.request.max.bytes=104857600    
#54行,请求套接字的缓冲区大小---60行---
log.dirs=/usr/local/kafka/logs        
#60行,kafka运行日志存放的路径,也是数据存放的路径---65行---
num.partitions=1    
#65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖---69行---
num.recovery.threads.per.data.dir=1    
#69行,用来恢复和清理data下数据的线程数量---103行---
log.retention.hours=168    
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除---110行---
log.segment.bytes=1073741824    
#110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件---123行---
zookeeper.connect=192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181    
###123行,配置连接Zookeeper集群地址
###修改环境变量vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin###加载配置
source /etc/profile###复制文件到zookeeper服务器哦scp -r /usr/local/kafka 192.168.242.71:/usr/local
scp -r /usr/local/kafka 192.168.242.72:/usr/local
###部署kafka启动脚本vim /etc/init.d/kafka#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac
###设置开机自启cd /etc/init.d/
chmod +x /etc/init.d/kafka
chkconfig --add kafka###分别启动 Kafka
service kafka start###另外一种启动方式cd /usr/local/kafka/bin./kafka-server-start.sh -daemon /usr/local/kafka/config/server.propertiesnetstat -lntp | grep 9092

在这里插入图片描述

kafka操作命令

####创建topickafka-topics.sh --create --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 --replication-factor 2 --partitions 3 --topic test--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
--replication-factor:定义分区副本数,1 代表单副本,建议为 2 
--partitions:定义分区数 
--topic:定义 topic 名称

在这里插入图片描述

###查看当前服务器中的所有 topickafka-topics.sh --list --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 
###查看某个 topic 的详情kafka-topics.sh  --describe --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 

在这里插入图片描述

###发布消息kafka-console-producer.sh --broker-list 192.168.242.70:9092,192.168.242.71:9092,192.168.242.72:9092  --topic test
###消费消息kafka-console-consumer.sh --bootstrap-server 192.168.242.70:9092,192.168.242.71:9092,192.168.242.72:9092 --topic test --from-beginning--from-beginning:会把主题中以往所有的数据都读取出来

在这里插入图片描述

###修改分区数kafka-topics.sh --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181  --alter --topic test --partitions 6

在这里插入图片描述

###删除 topickafka-topics.sh --delete --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 --topic test

在这里插入图片描述

kafka架构深入

  • Kafka 工作流程及文件存储机制

    • Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
  • topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。

  • Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。

  • 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

  • 由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。

  • 每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。

  • 这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

  • index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

    • “.index” 文件存储大量的索引信息,
    • “.log” 文件存储大量的数据,
    • 索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。
  • 数据可靠性保证

    • 为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
  • 数据一致性问题

    • LEO:指的是每个副本最大的 offset;
    • HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。
  • follower 故障

    • follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
  • leader 故障

    • leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。
  • 注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

  • ack 应答机制

    • 对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。
    • 所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。
  • 当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

    • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。
    • 1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。
    • -1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。
  • 三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

+ kafka会通过ack机制保证数据的可靠性

  • ack配置参数有
  • 0(效果类似异步复制):不等待follower同步完成就让生产者发送下一条消息
  • 1(效果类似半同步复制):至少等待一个follower同步完成才让生产者发送下一条消息
  • -1(效果类似全同步复制):等待所有follower同步完成才让生产者发送下一条消息

Filebeat+Kafka+ELK连接

在这里插入图片描述

###部署 Zookeeper+Kafka 集群###部署 Filebeat,修改配置文件 cd /usr/local/filebeatvim filebeat.ymlfilebeat.prospectors:
- type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]......#添加输出到 Kafka 的配置output.kafka:enabled: truehosts: ["192.168.242.70:9092","192.168.242.71:9092","192.168.242.72:9092"]    #指定 Kafka 集群配置topic: "httpd"    #指定 Kafka 的 topic

在这里插入图片描述
在这里插入图片描述

###启动 filebeat./filebeat -e -c filebeat.yml

在这里插入图片描述

###部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件cd /etc/logstash/conf.d/vim kafka.confinput {kafka {bootstrap_servers => "192.168.242.70:9092,192.168.242.71:9092,192.168.242.72:9092"  #kafka集群地址topics  => "httpd"     #拉取的kafka的指定topictype => "httpd_kafka"  #指定 type 字段codec => "json"        #解析json格式的日志数据auto_offset_reset => "latest"  #拉取最近数据,earliest为从头开始拉取decorate_events => true   #传递给elasticsearch的数据额外增加kafka的属性数据}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.242.66:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.242.66:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}
####启动 logstashlogstash -f kafka.conf

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/2264.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue自定义指令

需求1:定义一个v-big指令,和v-text功能类似,但会把绑定的数值放大10倍。 需求2:定义一个v-fbind指令,和v-bind功能类似,但可以让其所绑定的input元素默认获取焦点。 自定义指令函数式v-big: &l…

2023最新版本Activiti7系列-事件篇

事件篇 事件(event)通常用于为流程生命周期中发生的事情建模。事件总是图形化为圆圈。在BPMN 2.0中,有两种主要的事件分类:*捕获(catching)与抛出(throwing)*事件。 捕获: 当流程执…

文件共享服务器

文章目录 一、共享服务器概述二、创建共享三、访问共享四、创建隐藏的共享五、访问隐藏共享的方法六、共享相关命令七、屏蔽系统隐藏共享自动产生1. 打开注册表2. 定位共享注册表位置 八、查看本地网络连接状态(查看开放端口)九、关闭445服务 一、共享服…

List迭代器是如何实现的

我们知道当我们使用vector的迭代器时,它的操作可以让它指向下一个位置,解引用操作就可以找到这个位置的值,因为vector底层时用的一个顺序表,可以支持随机访问。对比list来说vector底层的迭代器是十分的简便可观的。虽然我们使用list的迭代器外观上和vector是大同小异的&#xf…

uniapp离线引入阿里巴巴图标

阿里巴巴图标地址 1.添加图标到购物车 2.点击购物车进入项目 3.下载到本地 4.解压后文件目录 5.放入项目目录中(比如说我经常放在common或者static下icon中) 6.在main.ts或者main.js中引入(注意路径,用相对的也行) import /static/iconfon…

超细,设计一个“完美“的测试用例,用户登录模块实例...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 好的测试用例一定…

java项目之足球赛会管理系统(ssm+mysql+jsp)

风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的足球赛会管理系统。技术交流和部署相关看文章末尾! 项目地址: https://download.csdn.net/download/sinat_26552841…

C#安装.Net平台科学计算库Math.Net Numerics

工作的时候需要使用到C#的Math.Net库来进行计算。 Math.Net库涵盖的主题包括特殊函数,线性代数,概率模型,随机数,插值,积分,回归,优化问题等。 这里记录一下,安装Math.Net库的过程…

Hugging Face开源库accelerate详解

官网:https://huggingface.co/docs/accelerate/package_reference/accelerator Accelerate使用步骤 初始化accelerate对象accelerator Accelerator()调用prepare方法对model、dataloader、optimizer、lr_schedluer进行预处理删除掉代码中关于gpu的操作&#xff0…

【C++ 重要知识点总结】表达式

表达式 1 基础 组合运算 优先级结合律 类型转换 运算符重载 左值和右值 2 算数运算符 3 逻辑和关系运算法 短路求值 逻辑与,当第一个判定为否的时候,不再执行第二个判定,可以用来屏蔽第二步的计算,代替条件判断&#xff0…

String类

String类 String类是Java中的字符串类型,它是引用类型 三种常用的字符串构造 public class Test {public static void main(String[] args){String str1 "hello";String str2 new String("hello");char[] array {h,e,l,l,o};String str3 new String(…

云计算——虚拟化层架构

作者简介:一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭:低头赶路,敬事如仪 个人主页:网络豆的主页​​​​​ 前言 本章将会讲解云计算的虚拟化层架构,了解云计算虚拟化层都有哪些架构模式…

WPF嵌入外部exe应用程序-实现基本的嵌入

WPF嵌入外部exe应用程序 使用场景功能实现嵌入基本功能实现1.导入windows API2.运行外部程序3. 获取窗体句柄4. 嵌入窗体5.设置子窗体位置整个代码 嵌入存在的问题: 使用场景 在WPF桌面应用程序开发过程中,有时候需要将其他程序结合到一起,让…

mssql 以xml类型为存储过程传递不确定数量的参数

mssql 以xml类型传递不确定数量的参数 存储过程xml 处理在存储过程中参数在存储过程中使用 xml 作为参数存储过程 相信各位小伙伴在使用数据库的过程中,或多或少的建立了一些存储过程,并且带有一些参数,用来增加存储过程的适用性。 类似老顾的截图这样的,通常,我们需要将…

Redis基本全局命令(含key过期策略)

Redis基本全局命令 KEYEXISTSDELEXPIRETTLRedis的key过期策略TYPE KEY 返回所有满⾜样式(pattern)的key。⽀持如下统配样式。 h?llo 匹配 hello , hallo 和 hxlloh*llo 匹配 hllo 和 heeeelloh[ae]llo 匹配 hello 和 hallo 但不匹配 hilloh[^e]llo 匹配…

Debian 系统安装中文输入法-iTOP3588开发板

Debian 系统烧写完成之后,并没有中文输入功能。本文档将介绍如何安装 ibus pinyin 输入法。 首先安装 fcitx 对应的工具,如下图所示: apt-get install fcitx fcitx-tools fcitx-config* fcitx-frontend* fcitx-module* fcitx-ui-* presage …

2023-7-13-第十八式观察者模式

🍿*★,*:.☆( ̄▽ ̄)/$:*.★* 🍿 💥💥💥欢迎来到🤞汤姆🤞的csdn博文💥💥💥 💟💟喜欢的朋友可以关注一下&#xf…

opencv-06 使用numpy.array 操作图片像素值

opencv-06 使用numpy.array 操作图片像素值 **1.二值图像及灰度图像****利用item 读取某一个像素值****利用itemset 修改像素值****彩色图像numpy.arry 像素值操作** numpy.array 提供了 item()和 itemset()函数来访问和修改像素值,而且这两个函数都是经…

基于MATLAB的无人机遥感数据预处理与农林植被性状估算

查看原文>>>基于MATLAB的无人机遥感数据预处理与农林植被性状估算 在新一轮互联网信息技术大发展的现今,无人机、大数据、人工智能、物联网等新兴技术在各行各业都处于大爆发的前夜。为了将人工智能方法引入农业生产领域。首先在种植、养护等生产作业环节…

Offset Explorer2 监视kafka的利器

kafka作为一个生产者和消费者集为一体的框架,消费者必须一直保持打开的状态,并且每隔一段时间接收一次数据,才能够保持生产者放入的数据及时被处理掉,而生产者则可以每隔一段时间发送一波数据,这样消费者就能够接收到了…