优质博文:IT-BLOG-CN
一、安装环境
【1】CenOS7
虚拟机三台
【2】已经搭建好的zookeeper集群。
【3】软件版本:kafka_2.11-1.0.0
二、创建目录并下载安装软件
【1】创建目录
cd /opt
mkdir kafka #创建项目目录
cd kafka
mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息
【2】下载软件:我用的是xshell
工具将下载好的软件传送到虚拟机的,放在了/opt/kafka/
目录下,并解压:
tar -zxvf kafka_2.11-1.0.0.tgz
三、修改配置文件
【1】进入到config
目录
cd /opt/kafka/kafka_2.11-1.0.0/config/
我们主要关心的文件只有一个:server.properties
我们可以发现在这个目录下有很多文件,还有zookeeper
的文件。我们可以直接使用kafka
自带的zookeeper
集群来启动,但是考虑到未来的项目需求,建议使用独立的zookeeper
集群。
-rw-r--r--. 1 root root 906 Oct 27 08:56 connect-console-sink.properties
-rw-r--r--. 1 root root 909 Oct 27 08:56 connect-console-source.properties
-rw-r--r--. 1 root root 5807 Oct 27 08:56 connect-distributed.properties
-rw-r--r--. 1 root root 883 Oct 27 08:56 connect-file-sink.properties
-rw-r--r--. 1 root root 881 Oct 27 08:56 connect-file-source.properties
-rw-r--r--. 1 root root 1111 Oct 27 08:56 connect-log4j.properties
-rw-r--r--. 1 root root 2730 Oct 27 08:56 connect-standalone.properties
-rw-r--r--. 1 root root 1221 Oct 27 08:56 consumer.properties
-rw-r--r--. 1 root root 4727 Oct 27 08:56 log4j.properties
-rw-r--r--. 1 root root 1919 Oct 27 08:56 producer.properties
-rw-r--r--. 1 root root 7030 Nov 22 18:10 server.properties
-rw-r--r--. 1 root root 1032 Oct 27 08:56 tools-log4j.properties
-rw-r--r--. 1 root root 1023 Oct 27 08:56 zookeeper.properties
【2】修改配置文件server.properties
:需要说明的是,我现在是在虚拟机1
上进行的操作,同样的操作要在三台虚拟机上都执行,只是有些细微的配置不同,其他配置信息完全相同。对于不同虚拟机上有差异的部分,我会一一指出。下面修改配置文件:
# 指定当前节点的brokerId,同一个集群中的brokerId需要唯一
broker.id=0
# 指定监听的地址及端口号,使用hostname 或者内网IP皆可
listeners=PLAINTEXT://hostname:9092
# 配置外网访问ip 如果为空 则会使用listeners(如果不为空) 其他的情况使用 InetAddress.getCanonicalHostName() 的值
# Hostname and port the broker will advertise to producers and consumers.
# If not set, it uses the value for "listeners" if configured. Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://hostname:9092
# 指定kafka日志文件的存储目录
log.dirs=/usr/local/kafka/logs
# 指定zookeeper的连接地址,若有多个地址则用逗号分隔
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# 配置可删除
delete.topic.enable=true
也就是说,zookeeper.connect
指的是zookeeper
集群的地址,可以是多个,多个之间用逗号分割。
使用scp
命令发送到其他的服务器 注意配置文件需要重点修改几个地方 broker.id listeners
以及advertised.listeners
scp -r /usr/local/develop/kafka_2.12-3.6.1 remote_ip:/usr/local/develop/kafka_2.12-3.6.1
需要搭建几个节点 就是发送几份,启动服务即可。
nohup sh ./bin/kafka-server-start.sh ./config/server.properties >./out.log 2>&1 &
四、启动kafka集群并测试
【1】启动服务:首先要启动kafka
集群,并且是三台都要手动去启动。
// 进入kafka的bin目录
cd /opt/kafka/kafka_2.11-1.0.0/bin/
// 启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
-daemon
代表着以后台模式运行kafka
集群,这样kafka
的启动就不会影响我们继续在控制台输入命令。
【2】检查服务是否启动:执行命令jps
,显示出了正在运行Kafka
这个进程,进程号是4855
。在运行这个命令时,你可能会出现错误,显示jps command not found
。文章的最后给出了解决方案。
jps
4161 QuorumPeerMain
4855 Kafka
6809 Jps
【3】创建topic
来验证是否创建成功
创建topic
./kafka-topics.sh --create --zookeeper 192.168.172.10:12181 --replication-factor 2 --partitions 1 --topic my-topic
参数解释:
--replication-factor 2 // 复制两份
--partitions 1 // 创建1个分区
--topic // 主题为my-topic
-- --zookeeper // 此处为为zookeeper监听的地址
创建生产者producer
./kafka-console-producer.sh --broker-list 192.168.172.10:19092 --topic my-topic //`这个IP地址可以写brokerlist中的任意一个
此时,console
处于阻塞状态,可以直接输入数据。
创建消费者: 此时要切换到另一台虚拟机的shell界面输入以下命令:
./kafka-console-consumer.sh --zookeeper 192.168.172.10:12181 --topic my-topic --from-beginning
此时,一旦有数据生成,此处的console
中就会显示数据。