【星海随笔】云解决方案学习日志篇(二) kafka、Zookeeper、Fielbeat

Elastic 中国社区官方博客
https://blog.csdn.net/ubuntutouch/category_9209092.html

Kafka

kafka的源代码是基于Scala语言编写的,运行在Java虚拟机(即:JVM)上。因此,在安装kafka之前需要先安装JDK

Kafka 为什么依赖 Zookeeper

  • 1.协调分布式系统:Kafka是一个分布式系统,各个节点之间需要进行协调和同步,而Zookeeper正是为分布式系统提供协调和同步的服务的。
  • 2.元数据管理:Kafka的元数据包括了集群的配置、broker的状态等信息,而这些信息需要被所有的Kafka节点共享和维护。Zookeeper提供了一个分布式的文件系统,可以方便地存储和管理这些元数据信息。
  • 3.领导选举:Kafka的一个分区只会分配给一个broker进行读写,而这个broker就是该分区的leader。当leader宕机后,需要从剩余的broker中选举一个新的leader。而Zookeeper可以提供分布式锁和选举的功能,因此Kafka可以利用Zookeeper来实现leader选举。

综上所述,Kafka依赖Zookeeper主要是为了协调分布式系统、元数据管理和领导选举。


ZK安装
来源于apache

1.下载
下载地址:https://zookeeper.apache.org/releases.html

2.解压安装包

 tar -zxf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local/
/usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper-3.7.1/

3.拷贝配置文件,

cp /usr/local/zookeeper-3.7.1/conf/zoo_sample.cfg /usr/local/zookeeper-3.7.1/conf/zoo.cfg

4.修改配置文件

#在配置文件中加一行监听本机 IP 即可
clientPortAddress=10.0.5.163

zookeeper默认会占用8080端口,如果你本机已有服务在使用8080,可以把下面参数添加到zoo.cfg 文件里,自定义端口
admin.serverPort=8001

5.启动zk

/usr/local/zookeeper-3.7.1/bin/zkServer.sh start

6.查看端口是否监听

netstat -lntp |grep 2181

如果服务未监听,请查看日志排查问题
more zookeeper-root-server-VM-5-163-centos.out


kafka 部署

1.下载

下载地址:https://kafka.apache.org/downloads

2.解压安装包

tar -zxf kafka_2.12-3.4.0.tgz -C /usr/local/

3.修改kafka配置

vim /usr/local/kafka_2.12-3.4.0/config/server.properties 
#修改 zk 的IP
zookeeper.connect=10.0.5.163:2181#修改监听地址
listeners=PLAINTEXT://10.0.5.163:9092

4.启动kafka

nohup /usr/local/kafka_2.12-3.4.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.4.0/config/server.properties >/tmp/kafka.log 2>&1 &

5.查看端口是否监听

netstat -lntp |grep 9092

Flebeat部署

原理流程如下:
首先是input输入,可以指定多个数据输入源,然后通过通配符进行日志文件的匹配
匹配到日志后,就会使用Harvester(收割机),将日志源源不断的读取到来
然后收割机收割到的日志,就传递到Spooler(卷轴),然后卷轴就在将他们传到对应的地方

1.下载

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.16.1-linux-x86_64.tar.gz

2.解压二进制包

tar zxf filebeat-7.16.1-linux-x86_64.tar.gz -C /usr/local/
mv /usr/local/filebeat-7.16.1-linux-x86_64/ /usr/local/filebeat-7.16.1

3.创建 Filebeat 配置文件

#备份模板文件
mv /usr/local/filebeat-7.16.1/filebeat.yml /usr/local/filebeat-7.16.1/filebeat.yml.bak
#创建配置文件
cat > /usr/local/filebeat-7.16.1/filebeat.yml << "EOF"
filebeat.inputs:
- type: logtail_files: truebackoff: "1s"paths:- /var/log/nginx/access.json.logfields:type: accessfields_under_root: true
- type: logtail_files: truebackoff: "1s"paths:- /var/log/messagesfields:type: messagesfields_under_root: true
output:kafka:hosts: ["10.0.5.163:9092"]topic: hosts_10-0-5-163
EOF

4.启动Fielbeat

#查看是否已存在进程,将其停止
ps -ef |grep filebeat |grep -v grep |awk '{print $2}' |xargs kill -9#启动Filebeat
nohup /usr/local/filebeat-7.16.1/filebeat  -e -c /usr/local/filebeat-7.16.1/filebeat.yml >/tmp/filebeat.log 2>&1 &#查看进程
ps -ef |grep filebeat#查看是否与ZK建立连接
netstat -ntp |egrep -w '9092|filebeat'

Fielbeat使用

启动

./filebeat -e -c shengxia.yml

yaml文件介绍

filebeat.inputs: # filebeat input输入
- type: stdin    # 标准输入enabled: true  # 启用标准输入
setup.template.settings: index.number_of_shards: 3 # 指定下载数
output.console:  # 控制台输出pretty: true   # 启用美化功能enable: true

输送至ElasticSearch或者Logstash,在Kibana中实现可视化
然后我们在控制台输入hello,就能看到我们会有一个json的输出,是通过读取到我们控制台的内容后输出的,内容如下

{"@timestamp": "2023-05-31T22:57:58.700Z","@metadata": {#元数据信息"beat": "filebeat","type": "_doc","version": "8.8.1"},"log": {"offset": 0,"file": {"path": ""}},"message": "hello",#元数据信息"input": {#控制台标准输入"type": "stdin"#元数据信息},"ecs": {"version": "8.0.0"},"host": {"name": "elk-node1"},"agent": {#版本以及主机信息"id": "5d5e4b99-8ee3-42f5-aae3-b0492d723730","name": "elk-node1","type": "filebeat","version": "8.8.1","ephemeral_id": "24b4fd16-5466-4d7e-b4b8-b73d41f77de0"}
}
参考文档:https://blog.csdn.net/qq_52589631/article/details/131216188

再次创建一个文件,叫 shengxia-log.yml,然后在文件里添加如下内容

filebeat.inputs:
- type: logenabled: truepaths:- /opt/elk/logs/*.log
setup.template.settings:index.number_of_shards: 3
output.console:pretty: trueenable: true

添加完成后,我们在到下面目录创建一个日志文件

# 创建文件夹
mkdir -p /opt/elk/logs# 进入文件夹
cd /opt/elk/logs# 追加内容
echo "hello world" >> test.log

然后再次启动filebeat

 ./filebeat -e -c shengxia-log.yml

能够发现,它已经成功加载到了我们的日志文件 test.log
同时我们还可以继续往文件中追加内容
追加后,我们再次查看filebeat,也能看到刚刚我们追加的内容
检测到日志文件有更新,立刻就会读取到更新的内容,并且输出到控制台。

自定义字段
   当我们的元数据没办法支撑我们的业务时,我们还可以自定义添加一些字段
filebeat.inputs:
- type: logenabled: truepaths:- /opt/elk/logs/*.logtags: ["web", "test"]  #添加自定义tag,便于后续的处理fields:  #添加自定义字段from: web-testfields_under_root: true #true为添加到根节点,false为添加到子节点中
setup.template.settings:index.number_of_shards: 3
output.console:pretty: trueenable: true

添加完成后,重启 filebeat

./filebeat -e -c shengxia-log.yml
filebeat.inputs:
- type: logenabled: truepaths:- /opt/elk/logs/*.logtags: ["web", "test"]fields:from: web-testfields_under_root: false 
setup.template.settings:index.number_of_shards: 1
output.elasticsearch:hosts: ["192.168.40.150:9200","192.168.40.137:9200","192.168.40.138:9200"]
Logstash 配置

1.修改Logstash 配置文件(下面 output 将日志打印到本地,观察日志是否采集到,日志格式是否正确)

cat > /usr/local/logstash-7.16.1/config/logstash.conf << "EOF"
input {kafka {bootstrap_servers => "10.0.5.163:9092"topics => ["hosts_10-0-5-163"]group_id => "test"codec => "json"}
}filter {if [type] == "access" {json {source => "message"remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]}}
}output {stdout {codec=>rubydebug}
}
EOF

2.执行前台启动命令

#查看是否已存在进程,将其停止
ps -ef |grep logstash |grep -v grep |awk '{print $2}' |xargs kill -9#启动 Logstash
logstash -f /usr/local/logstash-7.16.1/config/logstash.conf 

3.查看kafka Group 和队列信息

#进入kafka 安装目录
cd /usr/local/kafka_2.12-3.4.0/bin
#查看所有topic
./kafka-topics.sh  --bootstrap-server 10.0.5.163:9092 --lis
#查看Group
./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --list
#查看队列
./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --group test --describe

在这里插入图片描述
4.修改配置文件,将output 将日志写入elasticsearch

cat > /usr/local/logstash-7.16.1/config/logstash.conf << "EOF"
input {kafka {bootstrap_servers => "10.0.5.163:9092"topics => ["hosts_10-0-5-163"]group_id => "test"codec => "json"}
}
filter {if [type] == "access" {json {source => "message"remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]}}
}output{if [type] == "access" {elasticsearch {hosts => ["http://127.0.0.1:9200"]user => "elastic"password => "elk@2023"index => "access-%{+YYYY.MM.dd}"}}else if [type] == "messages" {elasticsearch {hosts => ["http://127.0.0.1:9200"]user => "elastic"password => "elk@2023"index => "messages-%{+YYYY.MM.dd}"}}
}
EOF

4.后台启动 Logstash

#查看是否已存在进程,将其停止
ps -ef |grep logstash |grep -v grep |awk '{print $2}' |xargs kill -9#启动 Logstash
nohup logstash -f /usr/local/logstash-7.16.1/config/logstash.conf  >/tmp/logstash.log 2>&1 &

查看服务日志是否正常

查看日志是否有 ERROR 持续输出
tailf /tmp/logstash.log#查看logstash 端口是否监听
netstat -lntp |grep 9600

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/28320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

52. QT插件开发--插件程序(带ui文件)的创建与编译

1. 说明 一般情况下,针对代码量比较小的QT程序不需要进行插件集成化开发,但是针对大型程序来说,代码结构比较复杂,使用插件开发的方式可以提高代码开发和维护效率,团队之间的分工合作也会更加的明确。所谓插件式开发,实际上就是把程序的一部分功能封装起来,编译成一个单…

一血c++

题目描述 每一个竞赛选手都无法拒绝拿一血 "一血"其实就是同学们在榜单上看到的深绿色的标记&#xff0c;代表着某道题目&#xff0c;他是第一个通过的。 叶苡朋老师是一名资深信奥选手&#xff0c;在大学多次获奖&#xff0c;也是一个资深抢一血爱好者&#xff0…

认识Redis 主从同步、事务和Memcached的区别

08- 什么是 Redis 主从同步&#xff1f; Redis 的主从同步(replication)机制&#xff0c;允许 Slave 从 Master 那里&#xff0c;通过网络传输拷贝到完整的数据备份&#xff0c;从而达到主从机制。 主数据库可以进行读写操作&#xff0c;当发生写操作的时候自动将数据同步到从…

AES加密、DES加密和RC4加密的区别

AES加密、DES加密和RC4加密在多个方面存在显著区别。以下是这些加密算法的详细对比&#xff1a; AES加密 算法原理&#xff1a; AES&#xff08;Advanced Encryption Standard&#xff09;采用对称密钥加密&#xff0c;利用分组密码的原理&#xff0c;将明文分成多个128位的组…

【C语言】解决C语言报错:Use of Uninitialized Variable

文章目录 简介什么是Use of Uninitialized VariableUse of Uninitialized Variable的常见原因如何检测和调试Use of Uninitialized Variable解决Use of Uninitialized Variable的最佳实践详细实例解析示例1&#xff1a;局部变量未初始化示例2&#xff1a;数组未初始化示例3&…

分布式物联网平台特点

随着物联网&#xff08;IoT&#xff09;技术的飞速发展&#xff0c;我们正步入一个万物互联的新时代。在这个时代&#xff0c;设备、数据和服务的无缝集成是实现智能化的关键。分布式物联网平台作为这一进程的核心&#xff0c;正在成为构建智能世界的基石。 一、分布式物联网平…

0403用代入法求解递归式-分治策略-算法导论第三版

文章目录 1.代入法求解递归式步骤1.1 求解步骤1.2 边界条件 2.做出好的猜测3.微妙的细节4.避免陷阱5.改变变量 结语 1.代入法求解递归式步骤 1.1 求解步骤 代入法求解递归式分两步&#xff1a; 猜测解的形式。用数学归纳法求出解中的常数&#xff0c;并证明解是正确的。 当…

C++ 47 之 函数调用运算符重载

#include <iostream> #include <string> using namespace std;class MyPrint{ public:// 重载小括号() 重载谁operator后就紧跟谁的符号void operator()(string txt){cout << txt << endl;} };class MyAdd{ public:int operator()(int a, int b){retur…

【Pandas驯化-04】Pandas中drop_duplicates、describe、翻转操作

【Pandas驯化-04】Pandas中drop_duplicates、describe、翻转操作 本次修炼方法请往下查看 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我工作、学习、实践 IT领域、真诚分享 踩坑集合&#xff0c;智慧小天地&#xff01; &#x1f387; 相关内容文档获取 微信公…

基于深度学习的光流预测

基于深度学习的光流预测 光流&#xff08;Optical Flow&#xff09;是指图像序列中像素的运动&#xff0c;即在连续的帧之间每个像素的移动向量。光流预测是计算机视觉中的一个重要任务&#xff0c;广泛应用于运动检测、视频分析、机器人导航等领域。基于深度学习的方法近年来…

redis大key优化

1.什么是大key以及可能造成的异常 1.1 什么大key redis是key,val型存储结构&#xff0c;key允许的最大大小为512MB&#xff0c;空字符串也是有效的键。大key是指value很大(占用大内存)。 常见的大key&#xff0c;大致可以这么分(根据具体redis规格以及实际压测而定): (1)单个…

搭建Python虚拟环境(五):Pyenv

使用Pyenv搭建虚拟环境的详细指南 Pyenv 是一个Python版本管理工具&#xff0c;可以让你在同一台机器上安装和管理多个Python版本。对于Windows用户&#xff0c;可以使用pyenv-win&#xff0c;这是一个专为Windows平台设计的Pyenv版本。本文将详细介绍如何使用Pyenv&#xff0…

判断子字符串是否存在

java判断字符串是否包含特定内容&#xff0c;用到contains语句 语法格式是 str.contains(string) 其中 str是字符串 string是查询字符串 示例代码如下 public class Stringcontains {public static void main(String[] args) {String str"今天的菜谱有:蒸羊羔&…

CleanMyMac X软件下载附加详细安装教程

​首先要介绍的是CleanMyMac X&#xff0c;这是一款极受欢迎的苹果电脑清理软件&#xff0c;它能够全面扫描你的电脑系统&#xff0c;清理无用的文件和垃圾&#xff0c;以释放硬盘空间&#xff0c;除了清理功能之外&#xff0c;CleanMyMac X 还可协助管理应用程序、优化性能、修…

[2024-06]-[大模型]-[Ollama]- WebUI

主要涉及要部署的前端webui是来源于:https://github.com/open-webui/open-webui 正常就使用: docker run -d -p 3000:8080 --add-host=host.docker.internal:host-gateway -v open-webui:/app/backend/data --name open-webui --restart always ghcr.io/open-webui/open-web…

建造者模式(大话设计模式)C/C++版本

建造者模式 C 参考&#xff1a;https://www.cnblogs.com/Galesaur-wcy/p/15907863.html #include <iostream> #include <vector> #include <algorithm> #include <string> using namespace std;// Product Class&#xff0c;产品类&#xff0c;由多个…

小白跟做江科大32单片机之定时器输出比较

原理部分 背景 GPIO口是数字输出端口&#xff0c;只能输出1和0。但是通过PWM&#xff0c;可以使其控制LED呼吸灯亮灭的程度 1.通过CNT和CCR进行比较&#xff0c;可以输出一定频率和占空比的PWM波形 2.通用定时器有4个CCR&#xff0c;可同时输出4路PWM波形&#xff0c;但只有…

智慧消防新篇章:可视化数据分析平台引领未来

一、什么是智慧消防可视化数据分析平台&#xff1f; 智慧消防可视化数据分析平台&#xff0c;运用大数据、云计算、物联网等先进技术&#xff0c;将消防信息以直观、易懂的图形化方式展示出来。它不仅能够实时监控消防设备的运行状态&#xff0c;还能对火灾风险进行预测和评估…

Unity | Tilemap系统

目录 一、准备工作 1.插件导入 2.资源导入 二、相关组件介绍 1.Grid组件 2.Tilemap组件 3.Tile 4.Tile Palette 5.Brushes 三、动态创建地图 四、其他功能 1.移动网格上物体 2.拖拽缩放地图 Unity Tilemap系统为2D游戏开发提供了一个直观且功能强大的平台&#xff…

【知识点】std::thread::detach std::lock_guard std::unique_lock

在 C11 中&#xff0c;std::thread 提供了并发编程的基础设施&#xff0c;使得我们可以创建和管理线程。std::thread 的 detach 方法是一种常用的线程管理方式&#xff0c;允许线程在后台独立运行&#xff0c;而不必与主线程同步或等待其完成。 std::thread::detach 方法 当你…