Kafka搭建(集群版)

Kafka单机版

部署前提

VMware环境 : 两台centos系统

Jdk包:jdk-8u202-linux-x64.tar.gz
Kafka包:kafka_2.12-3.5.0.tgz
Zookeeper包:apache-zookeeper-3.7.2-bin.tar.gz

百度网盘自取: 链接: https://pan.baidu.com/s/11EWuhBoSmH3musd_3Rgodw?pwd=e32t 提取码: e32t

Kafka搭建(集群版)

创建服务器

  • 修改克隆机主机名,以下以kafka-broker1举例说明
#使用root用户登录
#修改主机名
vim /etc/hostname
kafka-broker1 #末尾追加
  • 配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
# 修改主机名称映射vim /etc/hosts添加如下内容:
192.168.233.106 kafka-broker1
192.168.233.107 kafka-broker2
  • 重启克隆机kafka-broker1
# 重启
reboot
  • 其他机器也同样配置

以下操作默认在kafka-broker1里操作

创建xsync文件

cd /root
mkdir bin
cd bin
vim xsync
  • 增加如下内容
#!/bin/bash#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi#2. 遍历集群所有机器
for host in kafka-broker1 kafka-broker2
doecho ====================  $host  ====================#3. 遍历所有目录,挨个发送for file in $@do#4 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done
  • 修改xsync文件权限
# 进入/root/bin目录
cd /root/bin
# 修改权限
chmod 777 xsync

SSH无密登录配置

分发文件时,需要通过脚本切换主机进行指令操作,切换主机时,是需要输入密码的,每一次都输入就显得有点麻烦,所以这里以虚拟机kafka-broker1为例配置SSH免密登录(其他节点执行同样步骤即可),配置完成后,脚本执行时就不需要输入密码了。

生成公钥和私钥

# 公钥和私钥
ssh-keygen -t rsa#然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
  • 将公钥拷贝到要免密登录的目标机器上,拷贝过程需要输入目标机器密码
# ssh-copy-id 目标机器
ssh-copy-id kafka-broker1
ssh-copy-id kafka-broker2

安装JDK1.8

卸载现有JDK

  • 不同节点都要执行操作

    rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

上传Java压缩包

  • 将jdk-8u202-linux-x64.tar.gz文件上传到虚拟机的/opt/software目录中

解压Java压缩包

  • 进入/opt/software目录

    cd /opt/software/

  • 解压缩文件到指定目录

    tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

  • 进入/opt/module目录

    cd /opt/module

  • 改名

    mv jdk1.8.0_212/ java

配置Java环境变量

  • 新建 /etc/profile.d/my_env.sh文件

    vim /etc/profile.d/my_env.sh

  • 添加内容

    #JAVA_HOME
    export JAVA_HOME=/opt/module/java
    export PATH=$PATH:$JAVA_HOME/bin
    
  • 让环境变量生效

    source /etc/profile.d/my_env.sh

安装测试

java -version

分发软件

  • 分发环境变量文件

    xsync /etc/profile.d/my_env.sh

  • 进入/opt/module路径

    cd /opt/module

  • 调用分发脚本将本机得Java安装包分发到其他两台机器

    xsync java

在每个节点让环境变量生效

安装ZooKeeper

上传ZooKeeper压缩包

将apache-zookeeper-3.7.2-bin.tar.gz文件上传到三台虚拟机的/opt/software目录中

解压ZooKeeper压缩包

  • 进入到/opt/software目录中

    cd /opt/software/

  • 解压缩文件到指定目录

    tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/module/

  • 进入/opt/module目录

    cd /opt/module

  • 文件目录改名

    mv apache-zookeeper-3.7.1-bin/ zookeeper

配置服务器编号

  • 在/opt/module/zookeeper/目录下创建zkData

  • 进入/opt/module/zookeeper目录

    cd /opt/module/zookeeper

  • 创建zkData文件目录

    mkdir zkData

  • 创建myid文件

  • 进入/opt/module/zookeeper/zkData目录

    cd /opt/module/zookeeper/zkData

  • 创建myid文件

    vim myid

  • 在文件中增加内容

    1

修改配置文件

  • 重命名/opt/module/zookeeper/conf目录下的zoo_sample.cfg文件为zoo.cfg文件

  • 进入cd /opt/module/zookeeper/conf文件目录

    cd /opt/module/zookeeper/conf

  • 修改文件名称

    mv zoo_sample.cfg zoo.cfg

  • 修改文件内容

    vim zoo.cfg

  • 修改zoo.cfg文件

以下内容为修改内容
# **以下内容为修改内容**
dataDir=/opt/module/zookeeper/zkData#以下内容为新增内容####################### cluster ##########################
# server.A=B:C:D
#
# A是一个数字,表示这个是第几号服务器
# B是A服务器的主机名
# C是A服务器与集群中的主服务器(Leader)交换信息的端口
# D是A服务器用于主服务器(Leader)选举的端口
#########################################################server.1=kafka-broker1:2888:3888
server.2=kafka-broker2:2888:3888

设置防火墙端口

firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent
firewall-cmd --reload
firewall-cmd --zone=public --list-ports

启动ZooKeeper

在每个节点下执行如下操作
  • 进入zookeeper目录

    cd /opt/module/zookeeper

  • 启动ZK服务

    bin/zkServer.sh start

关闭ZooKeeper

在每个节点下执行如下操作
  • 进入zookeeper目录

    cd /opt/module/zookeeper

  • 关闭ZK服务

    bin/zkServer.sh stop

查看ZooKeeper状态

在每个节点下执行如下操作
  • 进入zookeeper目录

    cd /opt/module/zookeeper

  • 查看ZK服务状态

    bin/zkServer.sh status

分发软件

  • 进入/opt/module路径

    cd /opt/module

  • 调用分发脚本将本机得ZooKeeper安装包分发到其他机器

    xsync zookeeper

分别将不同虚拟机/opt/module/zookeeper/zkData目录下myid文件进行修改

vim /opt/module/zookeeper/zkData/myid
# kafka-broker1:1
# kafka-broker2:2

启停脚本

ZooKeeper软件的启动和停止比较简单,但是每一次如果都在不同服务器节点执行相应指令,也会有点麻烦,所以我们这里将指令封装成脚本文件,方便我们的调用。

  • 在虚拟机kafka-broker1的/root/bin目录下创建zk.sh脚本文件

  • 在/root/bin这个目录下存放的脚本,root用户可以在系统任何地方直接执行

  • 进入/root/bin目录

    cd /root/bin

  • 创建zk.sh脚本文件

    vim zk.sh

在脚本中增加内容:
#!/bin/bash
case $1 in
"start"){for i in kafka-broker1 kafka-broker2doecho ---------- zookeeper $i 启动 ------------ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"done
};;
"stop"){for i in kafka-broker1 kafka-broker2doecho ---------- zookeeper $i 停止 ------------ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"done
};;
"status"){for i in kafka-broker1 kafka-broker2doecho ---------- zookeeper $i 状态 ------------ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"done
};;
esac
增加脚本文件权限
  • 给zk.sh文件授权

    chmod 777 zk.sh

脚本调用方式
  • 启动ZK服务

    zk.sh start

  • 查看ZK服务状态

    zk.sh status

  • 停止ZK服务

    zk.sh stop

安装Kafka

上传Kafka压缩包

将kafka_2.12-3.5.0.tgz文件上传到虚拟机的/opt/software目录中

解压Kafka压缩包

  • 进入/opt/software目录

    cd /opt/software

  • 解压缩文件到指定目录

    tar -zxvf kafka_2.12-3.5.0.tgz -C /opt/module/

  • 进入/opt/module目录

    cd /opt/module

  • 修改文件目录名称

    mv kafka_2.12-3.5.0/ kafka

修改配置文件
  • 进入cd /opt/module/kafka/config文件目录

    cd /opt/module/kafka/config

  • 修改配置文件

    vim server.properties

  • 输入以下内容:

    #broker的全局唯一编号,每个服务节点不能重复,只能是数字。
    broker.id=1
    #broker对外暴露的IP和端口 (每个节点单独配置)
    advertised.listeners=PLAINTEXT://**kafka-broker1**:9092
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘IO的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/opt/module/kafka/datas
    #topic在当前broker上的分区个数
    num.partitions=1
    #用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #每个topic创建时的副本数,默认时1个副本
    offsets.topic.replication.factor=1
    #segment文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个segment文件的大小,默认最大1G
    log.segment.bytes=1073741824
    #检查过期数据的时间,默认5分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
    zookeeper.connect=kafka-broker1:2181,kafka-broker2:2181/kafka
    

分发kafka软件

  • 进入 /opt/module目录

    cd /opt/module

  • 执行分发指令

    xsync kafka

按照上面的配置文件内容,在每一个Kafka节点进行配置

vim /opt/module/kafka/config/server.properties

配置环境变量

  • 修改 /etc/profile.d/my_env.sh文件

    vim /etc/profile.d/my_env.sh

  • 添加内容

    #KAFKA_HOME
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
  • 让环境变量生效

    source /etc/profile.d/my_env.sh

  • 分发环境变量,并让环境变量生效

    xsync /etc/profile.d/my_env.sh

  • 每个节点执行刷新操作

    source /etc/profile.d/my_env.sh

启动Kafka

启动前请先启动ZooKeeper服务

  • 进入/opt/module/kafka目录

    cd /opt/module/kafka

  • 执行启动指令

    bin/kafka-server-start.sh -daemon config/server.properties

关闭Kafka

  • 进入/opt/module/kafka目录

    cd /opt/module/kafka

  • 执行关闭指令

    bin/kafka-server-stop.sh

启停脚本

(1) 在虚拟机kafka-broker1的/root/bin目录下创建kfk.sh脚本文件,对kafka服务的启动停止等指令进行封装

  • 进入/root/bin目录

    cd /root/bin

  • 创建kfk.sh脚本文件

    vim kfk.sh

  • 在脚本中增加内容:

    #! /bin/bash
    case $1 in
    "start"){for i in kafka-broker1 kafka-broker2doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
    };;
    "stop"){for i in kafka-broker1 kafka-broker2doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
    };;
    esac
    
增加脚本文件权限
  • 给文件授权

    chmod 777 kfk.sh

脚本调用方式
  • 启动kafka

    kfk.sh start

  • 停止Kafka

    kfk.sh stop

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

联合脚本

因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样,操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。

  • 在虚拟机kafka-broker1的/root/bin目录下创建xcall脚本文件

  • 进入/root/bin目录

    cd /root/bin

  • 创建xcall文件

    vim xcall

  • 在脚本中增加内容:

    #! /bin/bashfor i in kafka-broker1 kafka-broker2
    doecho --------- $i ----------ssh $i "$*"
    done
    
增加脚本文件权限
  • 进入/root/bin目录

    cd /root/bin

  • 增加权限

    chmod 777 xcall

创建cluster.sh脚本文件
  • 进入/root/bin目录

    cd /root/bin

  • 创建cluster.sh脚本文件

    vim cluster.sh

  • 在脚本中增加内容:

    #!/bin/bashcase $1 in
    "start"){echo ================== 启动 Kafka集群 ==================#启动 Zookeeper集群zk.sh start#启动 Kafka采集集群kfk.sh start};;
    "stop"){echo ================== 停止 Kafka集群 ==================#停止 Kafka采集集群kfk.sh stop#循环直至 Kafka 集群进程全部停止kafka_count=$(xcall jps | grep Kafka | wc -l)while [ $kafka_count -gt 0 ]dosleep 1kafka_count=$(xcall | grep Kafka | wc -l)echo "当前未停止的 Kafka 进程数为 $kafka_count"done#停止 Zookeeper集群zk.sh stop
    };;
    esac
    
增加脚本文件权限
  • 进入/root/bin目录

    cd /root/bin

  • 增加权限

    chmod 777 cluster.sh

脚本调用方式

  • 集群启动

    cluster.sh start

  • 集群关闭

    cluster.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/866509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

运行程序报错died with <Signals.SIGKILL: 9>问题定位及解决方法

运行程序报错died with 报错 当我在运行程序时,报错: Traceback (most recent call last):File "/home/anaconda3/envs/scene_graph_benchmark/lib/python3.8/runpy.py", line 194, in _run_module_as_mainreturn _run_code(code, main_glo…

免密登录ssh

前言: 1.基于口令的认证(password认证): 客户端向服务器发出password认证请求,将用户名和密码加密后发送给服务器,服务器将该信息解密后得到用户名和密码的明文,与设备上保存的用户名和密码进行…

香橙派AIpro开发板评测:部署yolov5模型实现图像和视频中物体的识别

OrangePi AIpro 作为业界首款基于昇腾深度研发的AI开发板,自发布以来就引起了我的极大关注。其配备的8/20TOPS澎湃算力,堪称目前开发板市场中的顶尖性能,实在令人垂涎三尺。如此强大的板子,当然要亲自体验一番。今天非常荣幸地拿到…

边界无限陈佩文:红蓝对抗安全演练常态化的各方分析

虽然常态化演练尚未正式开始,但我们仍然希望对各方的表现进行一些分析和预测,以辅助我们对市场的判断和决策。同时,也希望通过这些初步的见解,抛砖引玉,引发更多有价值的讨论和观点。 “船停在码头是最安全的&#xf…

Hi3861 OpenHarmony嵌入式应用入门--SNTP

sntp(Simple Network Time Protocol)是一种网络时间协议,它是NTP(Network Time Protocol)的一个简化版本。 本项目是从LwIP中抽取的SNTP代码; Hi3861 SDK中已经包含了一份预编译的lwip,但没有…

怎么把数据转换成百度k线图

要将数据转换成百度K线图,您需要按照百度K线图的要求对数据进行处理和格式化。以下是一个简单的示例,演示如何将数据转换成百度K线图的格式: python import json # 假设您有以下数据 data [ {"date": "2021-01-01"…

【Python123题库】#计算整数各位数字之和 #分类统计字符个数 #用户登录C #二分法求平方根B

禁止转载,原文:https://blog.csdn.net/qq_45801887/article/details/140079918 参考教程:B站视频讲解——https://space.bilibili.com/3546616042621301 有帮助麻烦点个赞 ~ ~ Python123题库 计算整数各位数字之和分类统计字符个数用户登录C…

Java基础之关键字

关键字 transient 被transient修饰的成员变量在序列化(serialization)时,值会被忽略,在反序列化时会被设为初始值(对象为null) synchronized 既可以修饰方法也可以修饰方法块,被synchronized修饰的代码块及方法,在同一时间只能…

nacos占位符配置

有的时候,我们的nacos会出现一个配置文件里,有多个配置项对应的值都是一样的,这个时候nacos就可以用到占位符${}进行参数配置。 auth:api1:host: http://aaa.comapi2:host: http://aaa.com可以将共同的参数提取出来统一配置,后期…

线程间的通信

文章目录 线程间的通讯技术就是通过等待和唤醒机制,来实现多个线程协同操作完成某一项任务,例如经典的生产者和消费者案例。等待唤醒机制其实就是让线程进入等待状态或者让线程从等待状态中唤醒,需要用到两种方法,如下&#xff1a…

Java中的内存数据库与缓存技术

Java中的内存数据库与缓存技术 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 1. 内存数据库的概念与优势 1.1 什么是内存数据库? 内存数据库是…

python写的工具代码总结

目录 python画图&处理图片python增大图片的大小(比如将几百K的照片增加到几mb)解决python画图无法显示中文的问题python画折线图 & 一张图上三条折线 & 设置折线marker & chatgpt画折线图的提示词python将png格式的图片转换为jpg格式的图…

红蓝对抗下的内网横向移动渗透技术详解

一、利用Windows计划任务横向移动 Windows计划任务是一个非常实用的功能,可以帮助我们自动完成一些重复性的任务。比如,我们可以设定一个计划任务来自动备份文件、更新软件、执行脚本等,本文主要介绍了如何利用Windows计划任务进行横向渗透。 &#xf…

列举操作redis set的命令

Redis中的Redis中的Set是一种无序的Set是一种无序的、不包含重复元素的字符串集合。、不包含重复元素的字符串集合。操作Redis Set的命令非常丰富,下面列举了一些常用的命令: **S操作Redis Set的命令非常丰富,下面列举了一些常用的命令&#…

C#委托事件的实现

1、事件 在C#中事件是一种特殊的委托类型,用于在对象之间提供一种基于观察者模式的通知机制。 1.1、事件的发送方定义了一个委托,委托类型的声明包含了事件的签名,即事件处理器方法的签名。 1.2、事件的订阅者可以通过运算符来注册事件处理器…

Python基础小知识问答系列-过滤列表元素

1. 问题: 如何根据单一条件过滤列表的元素? 如何根据复杂条件过滤列表的元素? 2. 解决方式: 可以使用推导式生成器,进行单一条件的列表元素过滤,尤其是列表内容较多时; 也可以使用filter函数进行列…

快速排序c++java代码实现

快速排序的思想 &#xff08;基于分治法&#xff09;: 每次选一个基准元素x&#xff0c;通过一次遍历将排序表划分为独立的两部分a[l,k-1],a[k1,r]; 其中左边的元素<x,右边的1元素>x,然后递归下去&#xff0c;直到每个块的大小为1 ; c #include<bits/stdc.h> …

Linux Static Keys和jump label机制

文章目录 前言一、asm goto二、API使用2.1 低版本API2.2 高版本API 三、jump label四、源码分析4.1 数据结构4.2 static_key_false4.3 jump_label_init4.4 __jump_label_transform4.5 static_key_slow_inc/dec 五、__jump_table节5.1 内核5.2 内核模块 六、修改内存代码6.1 x86…

24西安电子科技大学数学与统计学院—考研录取情况

24西安电子科技大学—数学与统计学院—考研录取统计 01、数学与统计学院各个方向 02、24数学与统计学院近三年复试分数线对比 数统院24年院线相对于23年院线增加高达30分&#xff0c;确实增长浮动比较高&#xff0c;接近30分的水平&#xff0c;因此大家更需要好好去努力&#x…

GTest和Catch2单元测试学习(附Cmake测试代码库)

kevin_CTest CTest 单元测试学习 Gitee库&#xff1a; https://gitee.com/bigearrabbit/kevin_ctest.git 示例多是从网页文章上摘取的&#xff0c;大部分记录在下面&#xff0c;或者源码内。供学习参考。 CTest 学习Catch2 框架 单个文档的测试架构&#xff0c;使用方便&am…