Kafka集群部署CentOS 7

一、前言

1、Kafka简介

Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)

Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

2、Kafka架构

Kafka架构图

在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

概念/对象简单说明
BrokerKafka节点
Topic主题,用来承载消息
Partition分区,用于主题分片存储
Producer生产者,向主题发布消息的应用
Consumer消费者,从主题订阅消息的应用
Consumer Group消费者组,由多个消费者组成

3、准备工作

1、Kafka服务器

准备3台CentOS服务器,并配置好静态IP、主机名

服务器名IP说明
kafka01192.168.88.51Kafka节点1
kafka02192.168.88.52Kafka节点2
kafka03192.168.88.53Kafka节点3

软件版本说明

说明
Linux ServerCentOS 7
Kafka2.3.0

2、ZooKeeper集群

Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK

服务器名IP说明
zk01192.168.88.21ZooKeeper节点
zk02192.168.88.22ZooKeeper节点
zk03192.168.88.23ZooKeeper节点

部署过程参考:https://ken.io/note/zookeeper-cluster-deploy-guide

二、部署过程

1、应用&数据目录

#创建应用目录
mkdir /usr/kafka#创建Kafka数据目录
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka

2、下载&解压

Kafka官方下载地址:https://kafka.apache.org/downloads
这次我下载的是2.3.0版本

#创建并进入下载目录
mkdir /home/downloads
cd /home/downloads#下载安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz #解压到应用目录
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka

kafka_2.12-2.3.0.tgz 其中2.12是Scala编译器的版本,2.3.0才是Kafka的版本

3、Kafka节点配置

#进入应用目录
cd /usr/kafka/kafka_2.12-2.3.0/#修改配置文件
vi config/server.properties

通用配置

配置日志目录、指定ZooKeeper服务器

# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs# root directory for all kafka znodes.
zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181

分节点配置

  • Kafka01
broker.id=0#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.51:9092
  • Kafka02
broker.id=1#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.52:9092
  • Kafka03
broker.id=2#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.53:9092

4、防火墙配置

#开放端口
firewall-cmd --add-port=9092/tcp --permanent#重新加载防火墙配置
firewall-cmd --reload

5、启动Kafka

#进入kafka根目录
cd /usr/kafka/kafka_2.12-2.3.0/
#启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties &#启动成功输出示例(最后几行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

三、Kafka测试

1、创建Topic

在kafka01(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区

bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io

Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka02、kafka03

2、查看Topic

我们可以通过命令列出指定Broker的

bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092

3、发送消息

这里我们向Broker(id=0)的Topic=test-ken-io发送消息

bin/kafka-console-producer.sh --broker-list  192.168.88.51:9092  --topic test-ken-io#消息内容
> test by ken.io

4、消费消息

在Kafka02上消费Broker03的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning

在Kafka03上消费Broker02的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning

然后均能收到消息

test by ken.io

这是因为这两个消费消息的命令是建立了两个不同的Consumer
如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,1个消息同时只会被一个Consumer消费到

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_kenbin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken

四、备注

1、Kafka常用配置项说明

Kafka常用Broker配置说明:

配置项默认值/示例值说明
broker.id0Broker唯一标识
listenersPLAINTEXT://192.168.88.53:9092监听信息,PLAINTEXT表示明文传输
log.dirskafka/logskafka数据存放地址,可以填写多个。用”,”间隔
message.max.bytesmessage.max.bytes单个消息长度限制,单位是字节
num.partitions1默认分区数
log.flush.interval.messagesLong.MaxValue在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.msLong.MaxValue在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.msLong.MaxValue检查数据是否要写入到硬盘的时间间隔。
log.retention.hours24控制一个log保留时间,单位:小时
zookeeper.connect192.168.88.21:2181ZooKeeper服务器地址,多台用”,”间隔

2、附录

  • https://kafka.apache.org/
  • https://zh.wikipedia.org/zh-cn/Kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/547793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Asp.net中Js、Css文件压缩辅助类

类名&#xff1a;WebCompressUtility.cs 代码如下&#xff1a; /// <summary>/// Js、Css文件压缩辅助类/// Stone_W/// 2011.6.21/// </summary>public class WebCompressUtility{public WebCompressUtility() { }#region 判断浏览器是否支持指定压缩/// <sum…

mysql+e+文件+xls_TP5+PHPexcel导入xls,xlsx文件读取数据

首先:在extend里面引入PHPexcel文件,直接根目录导入进去html创建上传按钮上传excel上传文件立即提交重置layui.use([form,upload],function(){var formlayui.form;var uploadlayui.upload;upload.render({ //允许上传的文件后缀elem: #myfile,url: "{:url(sale/do_uploa…

SQL Server 2008 FILESTREAM特性管理文件

在SQL Server 2008中&#xff0c;新的FILESTREAM&#xff08;文件流&#xff09;特性和varbinary列配合&#xff0c;你可以在服务器的文件系统上存储真实的数据&#xff0c;但可以在数据库上下文内管理和访问&#xff0c;这个特性让SQL Server不仅可以维护好数据库内记录的完整…

IP地址、手机归属和身份证查询接口

1. 查询手机&#xff1a;http://www.yodao.com/smartresult-xml/search.s?typemobile&q手机号码 2. 查询IP&#xff1a;http://www.yodao.com/smartresult-xml/search.s?typeip&qIP地址 3. 查询身份证&#xff1a;http://www.yodao.com/smartresult-xml/search.s?…

mysql5.6.24安装perl_mysql5.6源码安装

背景&#xff1a;现有mysql版本为5.5.40&#xff0c;考虑到以后需做主从&#xff0c;而5.6在主从方面、mysql读写方面都有很大提升。所以&#xff0c;准备升级。官网&#xff1a;http://www.mysql.com/环境&#xff1a;centos6.5 X86_64安装目录&#xff1a;/database/mysql解压…

把字符串转化为类型

问题&#xff1a;可以得到类型的String格式的名称&#xff0c;想要转化为相应的类型&#xff1f; ps&#xff1a;今天定义了好多个枚举类型&#xff0c;把枚举名称存放在一个ComboBox类名&#xff0c;控件值改变的时候要查询出这个枚举的所有属性集合&#xff0c;刚开始想到反…

System.Drawing.Color转System.Windows.Media.Color

2019独角兽企业重金招聘Python工程师标准>>> //这是两个不同的类 System.Windows.Media.Color color (System.Windows.Media.Color)System.Windows.Media.ColorConverter.ConvertFromString(transItemList[i].color.Name); 转载于:https://my.oschina.net/SearchVe…

Prometheus 监控Mysql服务器及Grafana可视化

Prometheus 监控Mysql服务器及Grafana可视化 mysql_exporter&#xff1a;用于收集MySQL性能信息。 使用版本mysqld_exporter 0.11.0官方地址使用文档&#xff1a;https://github.com/prometheus/mysqld_exporter图标模板&#xff1a;https://grafana.com/dashboards/7362下载…

visual studio 2010 如何修改assemblyInfo.cs默认值

这个应该是安装系统时的单位名称。修改HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\RegisteredOrganization的值即可。另外&#xff0c;可以修改项目模板里面的内容C:\Program Files\Microsoft Visual Studio 9.0\Common7\IDE\ProjectTemplates\CSharp\W…

mysql重要的监控参数_zabbix3.0.2使用percona mysql插件来监控mysql5.7   以及必须监控的性能参数...

http://tongcheng.blog.51cto.com/6214144/1620158http://www.cnblogs.com/caoxiaojian/p/5706992.htmlhttp://blog.csdn.net/mchdba/article/details/51447750对比zabbix全方位监控mysql&#xff0c;尤其注意以下监控项MySQL InnoDB Buffer Poolpool sizedatabase pages In…

Microsoft SQL Server 2005 提供了一些工具来监控数据库

--WL 09-07-03/*Microsoft SQL Server 2005 提供了一些工具来监控数据库。方法之一是动态管理视图。动态管理视图 (DMV) 和动态管理函数 (DMF) 返回的服务器状态信息可用于监控服务器实例的运行状况、诊断问题和优化性能。常规服务器动态管理对象包括&#xff1a;dm_db_*&#…

关于ORA-01187: cannot read from file because it failed verification tests 的处理方法

环境&#xff1a;OELOracle 11.2.0.3physical standby 问题描述&#xff1a;查询dba_temp_files试图时显示Ora-01187错误号&#xff1b; ORA-01187: cannot read from file because it failed verification tests ORA-01110: data file 201: /oradata/seven/temp01.dbf 重现下…

Kafka集群部署搭建完美标准版

Kafka集群部署并启动 在本文中将从演示如何搭建一个Kafka集群开始&#xff0c;然后简要介绍一下关于Kafka集群的一些基础知识点。但本文仅针对集群做介绍&#xff0c;对于Kafka的基本概念不做过多说明&#xff0c;这里假设读者拥有一定的Kafka基础知识。 首先&#xff0c;我们…

java字符流和字节流的区别_java字符流与字节流的区别是什么

java中字符流与字节流的区别&#xff1a;1、字节流操作的基本单元为字节&#xff1b;字符流操作的基本单元为Unicode码元。2、字节流默认不使用缓冲区&#xff1b;字符流使用缓冲区。3、字节流通常用于处理二进制数据&#xff0c;实际上它可以处理任意类型的数据&#xff0c;但…

IEPNGFix:Unclickable children of element 解决办法

以前我有写过一篇关于让IE6支持png半透明图片的方法&#xff0c;这期间这一神器一直发挥了很大的作用&#xff0c;并且没有出现过什么差错&#xff0c;直到昨天。 昨天同事做的一个项目因为设计图的关系&#xff0c;所以实现起来用到了很多position定位的属性&#xff0c;这里…

ASP.NET MVC 重点教程一周年版 第九回 HtmlHelper

许多时候我们会遇到如下场景 在写一个编辑数据的页面时&#xff0c;我们通常会写如下代码 1: <input type"text" value<%ViewData["title"] %> name"title" /> 由前篇我们所讲的Helper演化&#xff0c;我们思考&#xff0c;对于这种…

Tensorflow中查看gpu是否可用

Tensorflow中查看gpu是否可用 使用tf.test.is_gpu_available()函数可直接返回 import tensorflow as tf tf.test.is_gpu_available() Tensorflow测试程序 # Python import tensorflow as tf hello tf.constant(Hello, TensorFlow!) sess tf.Session() print(sess.run(hell…

websphere一直安装部署_WebSphere集群安装配置及部署应用说明

《WebSphere集群安装配置及部署应用说明》由会员分享&#xff0c;可在线阅读&#xff0c;更多相关《WebSphere集群安装配置及部署应用说明(27页珍藏版)》请在人人文库网上搜索。1、WebSphere6.1集群安装配置及部署应用说明1 安装Webphere1.1 安装WebSphere ND 6.11、 双击launc…

设置devenv命令的启动版本

机子上装了有vs 05、vs 08、vs 10如果按上顺序安装的话&#xff0c;在 运行 > devenv 就可以打开最新的vs10 工具&#xff0c;如果顺序是乱的话&#xff0c;就没那么幸运了&#xff0c;也就是说你最后安装的版本将会被你的 devenv 命令打开&#xff0c;原因是最后一次安装vs…

Yolov4训练自己的数据集

Yolov4训练自己的数据集 代码运行环境Ubuntu18.04python3.6显卡1080TiCUDA10.0cudnn7.5.1OpenCV3.4.6Cmake3.12.2&#xff0c;详细环境配置安装步骤就不讲解拉&#xff0c;网上教程一大堆。从github克隆下载源码&#xff0c;链接地址&#xff1a;https://github.com/AlexeyAB/…