kafka部署以及常用命令详细总结

1环境准备

1.1ip规划

ip: 192.168.1.200

1.2配置主机名

#设置主机名
hostnamectl set-hostname node1

1.3配置hosts

[root@node1 ~]# cat >> /etc/hosts << 'EOF'192.168.1.200 node1
EOF

2部署

2.1安装包准备

将以下安装包从官网下载到本地

jdk-8u371-linux-x64.tar.gz

apache-zookeeper-3.7.1-bin.tar.gz

kafka_2.12-3.4.0.tgz

上述安装包已整理上传到我的资源,需要可自行下载

#将下载好的安装包上传到服务器
[root@node1 ~]# ll
总用量 252268
-rw-------. 1 root root      1257 329 2023 anaconda-ks.cfg
-rw-r--r--  1 root root  12649765 712 15:04 apache-zookeeper-3.7.1-bin.tar.gz
-rw-r--r--  1 root root 139219380 712 15:10 jdk-8u371-linux-x64.tar.gz
-rw-r--r--  1 root root 106441367 712 15:04 kafka_2.12-3.4.0.tgz

2.2安装jdk

#解压安装包
[root@node1 ~]# tar -xvf jdk-8u371-linux-x64.tar.gz -C /usr/local/#配置环境变量
[root@node1 ~]# cat >> /etc/profile << 'EOF'#Set for jdk
export JAVA_HOME=/usr/local/jdk1.8.0_371
export PATH=$PATH:$JAVA_HOME/bin
EOF[root@node1 ~]# source /etc/profile
[root@node1 ~]# java -version
openjdk version "1.8.0_262"
OpenJDK Runtime Environment (build 1.8.0_262-b10)
OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode)

2.3部署zookeeper

[root@node1 ~]# tar -xf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local/
[root@node1 ~]# cd /usr/local/
[root@node1 local]# mv apache-zookeeper-3.7.1-bin/ zookeeper
[root@node1 local]# cd zookeeper/conf/[root@node1 conf]# cp zoo_sample.cfg zoo.cfg[root@node1 conf]# vi zoo.cfg
dataDir=/usr/local/zookeeper/data
#server.1=node1:2888:3888
#server.2=node1:2888:3888
...
#多节点需要server.1= 配置,用于多节点之间的心跳检测和选举机制#配置myid,myid 文件用于标识 ZooKeeper 服务器的身份,即使是在单节点环境中,ZooKeeper 也需要知道自己是谁。内容应该是数字 1,表示这是一个独立的单节点部署。[root@node1 conf]# mkdir /usr/local/zookeeper/data
[root@node1 conf]# echo 1 > /usr/local/zookeeper/data/myid#设置环境变量
[root@node1 conf]# cd
[root@node1 ~]# cat >> .bash_profile << 'EOF'#set for zookeeper
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
EOF[root@node1 ~]# source .bash_profile#启动zookeeper
[root@node1 ~]# zkServer.sh start[root@node1 ~]# jps
2038 Jps
1995 QuorumPeerMain

2.4部署kafka

[root@node1 ~]# tar -xf kafka_2.12-3.4.0.tgz -C /usr/local/
[root@node1 ~]# cd /usr/local/
[root@node1 local]# mv kafka_2.12-3.4.0/ kafka
[root@node1 local]# cd kafka/config/[root@node1 config]# vi server.properties
listeners=PLAINTEXT://node1:9092
log.dirs=/usr/local/kafka/logs
zookeeper.connect=node1:2181[root@node1 config]# mkdir /usr/local/kafka/logs#设置环境变量
[root@node1 config]# cd
[root@node1 ~]# cat >> .bash_profile << 'EOF'#set for kafka
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF[root@node1 ~]# source .bash_profile#启动
[root@node1 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties[root@node1 ~]# jps
1995 QuorumPeerMain
2412 Kafka
2429 Jps

3kafka常用使用命令

3.1创建topic以及给用户授权

#创建topic
[root@node1 ~]# kafka-topics.sh --bootstrap-server 192.168.1.200:9092 --create --replication-factor 1 --partitions 1 --topic my-topic
Created topic my-topic.#给用户 mrloam 授予生产权限
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --add --allow-principal User:mrloam --producer --topic my-topic
Warning: support for ACL configuration directly through the authorizer is deprecated and will be removed in a future release. Please use --bootstrap-server instead to set ACLs through the admin client.
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=my-topic, patternType=LITERAL)`: (principal=User:mrloam, host=*, operation=CREATE, permissionType=ALLOW)(principal=User:mrloam, host=*, operation=WRITE, permissionType=ALLOW)(principal=User:mrloam, host=*, operation=DESCRIBE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=my-topic, patternType=LITERAL)`: (principal=User:mrloam, host=*, operation=CREATE, permissionType=ALLOW)(principal=User:mrloam, host=*, operation=WRITE, permissionType=ALLOW)(principal=User:mrloam, host=*, operation=DESCRIBE, permissionType=ALLOW) #创建消费组并授权,消费组不存在则授权消费权限报错
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic my-topic --group my-group#给用户 mrloam 授予消费权限,指定消费组 my-group
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --add --allow-principal User:mrloam --consumer --topic my-topic --group my-group#给用户 mrloam 授予消费权限,所有消费组
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --add --allow-principal User:mrloam --consumer --topic my-topic --group *
#查看所有topic
[root@node1 ~]# kafka-topics.sh --bootstrap-server 192.168.1.200:9092 --list#查看指定topic的详细信息
[root@node1 ~]# kafka-topics.sh --bootstrap-server 192.168.1.200:9092 --topic my-topic --describe

3.2生产、消费

#生产
[root@node1 ~]# kafka-console-producer.sh --bootstrap-server 192.168.1.200:9092 --topic my-topic
#创建或指定 groupid 进行消费
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic my-topic --group my-group
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic my-topic --consumer-property --group.id=my-group#将偏移量移动到最早时间位置
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.1.200:9092 --group my-group --reset-offsets --topic my-topic --to-earliest --execute#设置偏移量到最新时间
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.1.200:9092 --group my-group --topic my-topic --reset-offsets --to-latest --execute#移动偏移量到指定时间位置
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.1.200:9092 --group my-group --topic my-topic --reset-offsets --to-datetime "2022-12-08T02:00:00.000" --execute --command-config consumer.properties#移动偏移量到指定位置 200 处
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server 192.168.1.200:9092 --group my-group --topic my-topic --reset-offsets --to-offset 200 --execute
#显示key消费,消费出的消息将打印出消息体的key和value
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic my-topic --group my-group --property print.key=true#指定消费10条数据
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 \
--topic my-topic \
--group my-group \
--max-messages 10 \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--consumer.config consumer.properties#过滤指定时间的数据, 时间戳: $(date -d '2024-07-11 15:20:00.000' +%s%3N)
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 \
--topic my-topic \
--group my-group \
--property print.timestamp=true \
--consumer.config consumer.properties \
--from-beginning | awk -F 'CreateTime:' '$2>=1675742400000 && $2<=1675749600000 {print $0}'

3.3查看,添加,移除权限

#查看topic权限
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --list --topic my-topic#添加权限
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --add --allow-principal User:mrloam --allow-host * --producer --topic my-topic
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --add --allow-principal User:mrloam --allow-host 192.168.1.* --producer --topic my-topic
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --add --allow-principal User:mrloam --allow-host * --consumer --topic my-topic --group my-group
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --add --allow-principal User:mrloam --allow-host * --consumer --topic my-topic --group *
#移除权限
[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --remove --allow-principal User:mrloam --allow-host 192.168.1.* --producer --topic my-topic --force[root@node1 ~]# kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.200:2181 --remove --allow-principal User:mrloam --allow-host 192.168.1.* --consumer --group my-group --topic my-topic --force

增加topic分区数

[root@node1 ~]# kafka-topics.sh --alter --bootstrap-server 192.168.1.200:9092 --topic my-topic --partitions 3

消费组信息查询

#查看所有消费组
[root@node1 ~]# kafka-consumer-groups.sh --list --bootstrap-server 192.168.1.200:9092#查看消费组详细信息
[root@node1 ~]# kafka-consumer-groups.sh --describe --bootstrap-server 192.168.1.200:9092 --group my-group

删除topic

[root@node1 ~]# kafka-topics.sh --delete --bootstrap-server 192.168.1.200:9092 --topic my-topic
#查看kafka版本
[root@node1 ~]# kafka-topics.sh --version
[root@node1 ~]# kafka-server-start.sh --version[root@node1 ~]# cd $KAFKA_HOME
[root@node1 ~]# find ./libs -name 'kafka_*'|grep -o 'kafka[^\\n]*'	#kafka_2.12-3.4.0.jar #scala版本:2.11   kafka版本:3.4.0#查看zk版本
[root@node1 ~]# zkServer.sh version#连接到 ZooKeeper 服务器
[root@node1 ~]# zkCli.sh
version[root@node1 ~]# echo stat | nc 192.168.1.200 2181

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/45744.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用键盘优雅地使用浏览器

来自&#x1f96c;&#x1f436;程序员 Truraly | 田园 的博客&#xff0c;最新文章首发于&#xff1a;田园幻想乡 | 原文链接 | github &#xff08;欢迎关注&#xff09; 目录 浏览器快捷键 参考资料&#xff1a; 教你如何用键盘轻松浏览网页 这两天出门旅游&#xff0c;高铁…

Vue单路由的独享守卫怎么设置

在Vue.js中&#xff0c;特别是在使用Vue Router时&#xff0c;路由守卫&#xff08;Route Guards&#xff09;是一种强大的机制&#xff0c;允许我们在路由发生变化时执行一些逻辑&#xff0c;比如检查用户是否登录、加载数据等。Vue Router提供了全局守卫、路由独享守卫和组件…

美团一面,你碰到过CPU 100%的情况吗?你是怎么处理的?

本文主要分为三部分 分析一下CPU 100%的常见原因 CPU 100%如何排查 回答这个问题的一个参考答案 CPU被打满的常见原因 1. 死循环 在实际工作中&#xff0c;可能每个开发都写过死循环的代码。 死循环有两种&#xff1a; 在 while、for、forEach 循环中的死循环。 无限递…

centos安装minio文件系统服务器(踩坑版)

centos安装minio文件系统服务器&#xff08;踩坑版&#xff09; 引安装1. 下载2. 启动3. 创建access keys4. 创建buckets 坑 引 本来安装挺简单的&#xff0c;网上的教程一大堆&#xff0c;有些写的也挺详细的。不过自己还是踩到坑了&#xff0c;耽误了个把小时&#xff0c;特…

【分库】分库的核心原则

目录 分库的核心原则 前言 分区透明性与一致性保证 弹性伸缩性与容错性设计 数据安全与访问控制机制 分库的核心原则 前言 在设计和实施分库策略时&#xff0c;遵循一系列核心原则是至关重要的&#xff0c;以确保系统不仅能够在当前规模下高效运行&#xff0c;还能够随着…

Vue的生命周期函数有哪些?

Vue的生命周期函数是指Vue实例从创建到销毁的过程中&#xff0c;会调用的一系列特殊函数&#xff0c;这些函数允许开发者在Vue的不同阶段执行特定的代码。Vue 2.x和Vue 3.x的生命周期函数有所差异&#xff0c;但总体思路是一致的。以下是Vue生命周期函数的主要分类和具体函数&a…

单目测距 单目相机测距 图片像素坐标转实际坐标的一种转换方案

需要相机位置固定 原图 红色的点是我们标注的像素点&#xff0c;这些红色的点我们知道它的像素坐标&#xff0c;以及以右下角相机位置为原点的x y 实际坐标数值 通过转换&#xff0c;可以得到整个图片内部其余像素点的实际坐标&#xff0c; 这些红色的点是通过转换关系生成的&…

Python | Leetcode Python题解之第231题2的幂

题目&#xff1a; 题解&#xff1a; class Solution:BIG 2**30def isPowerOfTwo(self, n: int) -> bool:return n > 0 and Solution.BIG % n 0

el-table 动态添加删除 -- 鼠标移入移出显隐删除图标

<el-table class"list-box" :data"replaceDataList" border><el-table-column label"原始值" prop"original" align"center" ><template slot-scope"scope"><div mouseenter"showClick…

小妙招使用sysctl hw.realmem查看实际物理内存@FreeBSD

使用sysctl hw.realmem查看实际物理内存&#xff1a;The realmem value is memory before the kernel and modules are loaded, whereas hw.physmem is what is left after they were loaded. 使用hw.physmem查看去掉kernel和模块调用后剩余的内存 sysctl hw.ncpu是机器的cpu…

Java三剑客:封装、继承、多态的魔法世界

第一章&#xff1a;封装的艺术 —— 保护你的宝藏 案例分析&#xff1a;银行账户系统 想象一下&#xff0c;你正在构建一个银行账户系统。每个账户都有一个余额&#xff0c;这个余额需要受到严格的保护&#xff0c;不能被随意修改。我们可以通过封装来实现这一目标。 示例代…

Sentinel和hystric的运用详解

Hystrix是一个由Netflix开发的开源Java库&#xff0c;用于实现延迟容忍和容错逻辑&#xff0c;以增强分布式服务之间的交互的弹性。Hystrix通过隔离服务之间的访问点&#xff0c;阻止级联故障&#xff0c;并提供后备选项来实现这一目标。Hystrix的核心功能包括服务降级、服务熔…

nvide shortcuts table

快捷键中文功能描述n nvim-tree: 打开预览n nvim-tree: 打开n -nvim-tree: 上一级目录n .nvim-tree: 运行命令n <nvim-tree: 上一个同级节点n >nvim-tree: 下一个同级节点n Bnvim-tree: 切换过滤器&#xff1a;无缓冲区n Cnvim-tree: 切换过滤器&#xff1a;Git 干净n Dn…

JavaWeb(四:Ajax与Json)

一、Ajax 1.定义 Ajax&#xff08;Asynchronous JavaScript And XML&#xff09;&#xff1a;异步的 JavaScript 和 XML AJAX 不是新的编程语言&#xff0c;指的是⼀种交互方式&#xff1a;异步加载。 客户端和服务器的数据交互更新在局部页面的技术&#xff0c;不需要刷新…

Openerstry + lua + redis根据请求参数实现动态路由转发

文章目录 一、需求分析二、准备1、软件安装2、redis-lua封装优化 三、实现1、nginx.conf2、dynamic.lua注意 3、准备两个应用4、访问nginx 一、需求分析 根据用户访问url的参数&#xff0c;将请求转发到对应指定IP的服务器上。 二、准备 1、软件安装 安装openrestyredis&am…

Database数据库 vs Data Warehouse数据仓库 vs Data Mart数据集市 vs Data Lake数据湖

1.DATABASE 数据库 数据库是一个结构化的数据集合&#xff0c;用于存储、管理和检索数据。数据库设计用于支持事务处理&#xff08;OLTP&#xff0c;Online Transaction Processing&#xff09;和日常操作。 数据库通常由数据库管理系统&#xff08;DBMS&#xff09;控制&…

golang json反序列化科学计数法的坑

问题背景 func CheckSign(c *gin.Context, signKey string, singExpire int) (string, error) {r : c.Requestvar formParams map[string]interface{}if c.Request.Body ! nil {bodyBytes, _ : io.ReadAll(c.Request.Body)defer c.Request.Body.Close()if len(bodyBytes) >…

PostgreSQL(二十二)缓冲区管理器

目录 一、缓冲区概述 1、缓冲区结构 2、buffer_tag结构 3、Backend进程读取操作 4、写脏块 二、缓冲区管理器结构 1、第一层&#xff1a;Buffer Table layer&#xff08;缓冲区表层&#xff09; 2、第二层&#xff1a;Buffer Descriptor Layer&#xff08;缓冲区描述层…

秋招Java后端开发冲刺——Mybatis使用总结

一、基本知识 1. 介绍 MyBatis 是 Apache 的一个开源项目&#xff0c;它封装了 JDBC&#xff0c;使开发者只需要关注 SQL 语句本身&#xff0c;而不需要再进行繁琐的 JDBC 编码。MyBatis 可以使用简单的 XML 或注解来配置和映射原生类型、接口和 Java POJO&#xff08;Plain …

Elasticsearch 建议(Suggesters):实现自动补全和拼写检查

引言 在现代搜索引擎中&#xff0c;自动补全和拼写检查功能已成为提升用户体验的重要工具。Elasticsearch&#xff0c;作为一款强大的分布式搜索和分析引擎&#xff0c;提供了多种Suggesters API来帮助开发者实现这些功能。本文将详细介绍Elasticsearch中的四种主要Suggester—…