Kafka 安装教程和基本操作

一、简介

Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等,Linkedin于2010年12月贡献给了 Apache基金会 并成为顶级开源项目。

应用特性

  • 分布式存储:数据被自动分区并分布在集群的节点中。
  • 消息有序性Kafka 能确保从生产者传到消费者的记录都是有序的。
  • 高容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  • 高吞吐量Kafka 支持单机每秒至少处理10万以上消息,通常可以达到数百万条消息。
  • 易扩展性:支持集群热扩展。
  • 高并发:支持数千个客户端同时读写。
  • 持久性:支持消息数据持久化到本地磁盘 并支持数据备份和灵活配置数据的持久化时间。
  • 实时处理/低延迟:在数据写入的同时对进行处理,消息延迟最低只有几毫秒。

应用场景

Kafka 本质是 支持分布式的消息系统/消息中间件 。分析 Kafka 的应用场景等同于分析 消息中级件 的应用场景。通常,使用 消息系统 的 发布/订阅模型 功能来连接 生产者消费者。实现以下三大功能:

  • 生产者和消费者的解耦
  • 消息持久化 / 消息冗余
  • 消息缓冲 / 流量消峰

具体应用场景有:

  • 日志收集或数据管道:作为日志收集系统或数据处理管道的一部分,以处理大量的日志数据或实时数据流。
  • 负载均衡:如果系统收到大量请求或数据流,可以使用消息队列把这些任务平均分配给多个处理器或服务,从而实现负载均衡。
  • 系统解耦:消息队列经常用作不同服务间的通信机制,以解耦系统的不同部分。
  • 分布式事务:如果一个事务需要跨多个服务进行,可以使用消息队列来协调不同服务之间的通信,确保事务的原子性。
  • 实时流数据处理:比如实时日志分析或者实时数据报警。Kafka 能接收实时数据流并保证它的可靠性和持久性,这样就可以在上游源源不断生产数据的同时,下游可以实时地进行分析。
  • 通知和实时更新:消息队列可以用作通知的中介,比如告知用户完成某个任务,或者在后端数据更新时实时通知前端。

设计目标

  • 高性能:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 消息系统:支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 横向扩展:支持在线水平热扩展

二、kafka安装和配置

1. zookeeper安装配置

需要说明一下, 为了支持 Kafka 的集群功能, Zookeeper 必须使用集群模式部署。
本文以部署 3 个Zookeeper 实例的伪集群为例。具体安装步骤参阅之前的文章:Zookeeper 安装教程和使用指南

2. kafka安装配置

下载链接:Kafka Downloads

下载页面中包含两种下载方式

  • : kafka-[version]-src.tgz:包含 Kafka 源码和API源码,需要自己编译

a) 安装

[root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
[root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
[root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka

b) 配置实例

配置第一个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9091端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9091
# 相当于listeners=PLAINTEXT://0.0.0.0:9091
# 消息日志存放地址
log.dirs=/usr/local/kafka/logs
# ZooKeeper 地址,多个用,分隔   /kafka指定在zk上的目录
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

配置第二个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9092端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9092
# 消息日志存放地址
log.dirs=/opt/kafka/logs
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

注:两个客户端的listeners中的port不能一样

4) 服务管理

# 启动服务 -daemon 表示后台启动
$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties# 查看服务
jps -l43330 org.apache.zookeeper.server.quorum.QuorumPeerMain14356 org.elasticsearch.bootstrap.Elasticsearch14583 org.logstash.Logstash45976 kafka.Kafka  # kafka服务进程netstat -anlpt | grep 9091tcp6       0      0 :::9091                 :::*                    LISTEN      45976/javatcp6       0      0 192.168.18.128:9091     192.168.18.128:49356    TIME_WAIT   -# 关闭服务
$KAFKA_HOME/bin/kafka-server-stop.sh

3. 常用操作

1) 创建topic

 #两条命令效果一样
bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu 

在kafka1上创建一个topic,会自动同步到其他客户端

  • --create表示创建操作
  • --zookeeper 指定了 Kafka 连接的 ZooKeeper
  • --partitions 表示每个主题4个分区
  • --replication-factor 表示创建每个分区创建2个副本(副本因子)
  • --topic 表示主题名称。
    注:副本因子不能超过存活的broker数量,否则报错:Replication factor: 20 larger than available brokers: xxx.

2) 查看topic

# 查看topic列表     #两条命令效果一样
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka __consumer_offsetstopic-demoyumu# 查看topic详细信息   #两条命令效果一样
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu Topic: yumu	PartitionCount: 2	ReplicationFactor: 2	Configs:Topic: yumu	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2Topic: yumu	Partition: 1	Leader: 1	Replicas: 2,1	Isr: 1,2

3) 测试通信

# 窗口1,启动生产者,向yumu主题发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu# 窗口2,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu# 窗口3,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu=====结果=====
# 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
>hello, kafka!
>once again.
>
# 消费者1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
hello, kafka!
once again.# 消费者2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
hello, kafka!
once again.# 查看所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning# 删除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9091  --topic yumu

三、遇到的问题

1. 第一次启动kafka成功后,关闭kafka并修改配置,再次启动失败,报错如下:

[2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
[2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.at kafka.server.KafkaServer.startup(KafkaServer.scala:235)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)at kafka.Kafka$.main(Kafka.scala:82)at kafka.Kafka.main(Kafka.scala)
[2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
[2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
[2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
[2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)

原因:
kafka启动之后会生成一些日志和配置,导致这个问题的原因是第一次启动之后生成了log/meta.properties文件

cat meta.properties
#
#Sat Nov 07 21:43:51 CST 2020
broker.id=1
version=0
cluster.id=MChFWWMBT9GJClVEriND5A

第二次改完配置后再去启动的时候生成应该会生成一个新的id,新的id和旧的ID不一致导致无法启动,删除log/meta.properties文件后重新启动即可(疑问:是不是我关闭的方法不对呢?)

推荐阅读:

  • Kafka介绍
  • ELK介绍
  • Kafka安装
  • C语言操作kafka以及安装librdkafka库

下一篇:Kafka消息系统原理

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/16524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于YOLO算法实现网球运动实时分析(附源码)

大家好,我是小F~ 今天给大家介绍一个计算机视觉实战的项目。 该项目使用YOLO算法检测球员和网球,并利用cnn提取球场关键点。 进而分析视频中的网球运动员,测量他们的速度、击球速度和击球次数。 使用win10电脑,Python …

【源码】java + uniapp交易所源代码/带搭建教程java交易所/完整源代码

java uniapp交易所源代码/带搭建教程java交易所/完整源代码 带简洁教程,未测 java uniapp交易所源代码/带搭建教程java交易所/完整源代码 - 吾爱资源网

【古董技术】ms-dos应用程序的结构

序 制定一个MS-DOS应用程序计划需要认真分析程序的大小。这种分析可以帮助程序员确定MS-DOS支持的两种程序风格中哪一种最适合该应用程序。.EXE程序结构为大型程序提供了好处,因为所有.EXE文件之前都有额外的512字节(或更多)的文件头。另一方…

C++第十七弹---string使用(下)

✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】【C详解】 目录 1、标准库中的string类 1.1、string类的常用接口说明 1.1.1、string类对象的修改操作 1.1.2、string类对象非成员函数重载 总结 1、标准库中的…

牛客热题:有效括号

📟作者主页:慢热的陕西人 🌴专栏链接:力扣刷题日记 📣欢迎各位大佬👍点赞🔥关注🚓收藏,🍉留言 文章目录 牛客热题:有效括号题目链接方法一&#x…

Mycat+Mysql搭建数据集群实现数据分片存储

前言 MyCAT介绍 * 一个彻底开源的,面向企业应用开发的“大数据库集群”; * 支持事务、ACID、可以替代MySQL的加强版数据库; * 一个可以视为“MySQL”集群的企业级数据库,用来替代昂贵的Oracle集群; * 一个融合内存缓存技术、Nosql技术、HDFS大数据的新型SQL; * 一个新颖…

2024.5.1学习记录

1、代码随想录:贪心刷题 2、react 高级使用( hoc render、props、函数组件、serState 传送门等) 3、游山玩水

《拯救大学生课设不挂科第四期之蓝桥杯是什么?我是否要参加蓝桥杯?选择何种语言?如何科学备赛?方法思维教程》【官方笔记】

背景: 有些同学在大一或者大二可能会被老师建议参加蓝桥杯,本视频和文章主要是以一个过来人的身份来给与大家一些思路。 比如蓝桥杯是什么?我是否要参加蓝桥杯?参加蓝桥杯该选择何种语言?如何科学备赛?等…

JavaEE之线程(7)_单例模式(设计模式概念、单例模式优点、懒汉、饿汉模式)

一、什么是设计模式? 单例模式是设计模式中较为常见的一种。那么,什么是单例模式? 设计模式(Design Pattern)都是一些相对优秀的解决方案,很多问题都是典型的、有代表性的问题,学习设计模式&am…

为什么本科毕业后我坚定地选择了就业而不是考研?

大家好,我是小布丁。今天来聊聊我为什么本科毕业后选择了就业而不是考研。 在整个大学期间,我被亲戚拷问最多的问题就是:准备考研吗?相信很多大学生都遇到过这种情况吧。 如果你说准备还好,亲戚大概率就不会问下去&a…

元组推导式

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 使用元组推导式可以快速生成一个元组,它的表现形式和列表推导式类似,只是将列表推导式中的“[]”修改为“()”。例如&#xf…

python深入解析字符串操作的八大神技

新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、字符串的长度与切片 示例代码 二、去除多余的空格 示例代码 三、字符串的开头与包含…

元组的创建和删除

目录 使用赋值运算符直接创建元组 创建空元组 创建数值元组 删除元组 自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 元组(tuple)是Python中另一个重要的序列结构&#…

关于堆排序

今天我们不刷力扣了,我们来复习(手撕)一下数据结构中的八大排序算法之一,堆排序 基本概念: 堆是一种特殊的树形数据结构,即完全二叉树。 堆分为大顶堆和小顶堆: 大顶堆:每个节点的值…

OrangePi KunPengPro | 开发板开箱测评之学习与使用

OrangePi KunPengPro | 开发板开箱测评之学习与使用 时间:2024年5月23日20:51:12 文章目录 OrangePi KunPengPro | 开发板开箱测评之学习与使用概述1.参考2.资料、工具3.使用3-1.通过串口登录系统3-2.通过SSH登录系统3-3.安装交叉编译工具链3-4.复制文件到设备3-5.第…

【组合数学】常考试题答案

一、单项选择题(每小题3分,共15分) 1. 用3个“1”和4个“0”能组成( )个不同的二进制数字。 A. 35 B. 36, C. 37, D. 38 2. 整除300的正整数的个数为(  )。 A. 14…

Anaconda+CUDA+CUDNN+Pycharm+Pytorch安装教程(第一节 Anconda安装)

1.选择和对应的anconda版本 官网地址:Index of / (anaconda.com) 下载地址:Index of /anaconda/archive/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 2.安装流程 (1)下载安装包 (2)点击next (3)点击I agree &a…

解决Flutter位于悬浮窗口时,应用Logo不更新问题

问题描述 我已经更换了应用Logo,但是发现应用处于悬浮窗口时,logo还是更改之前的?下面的图片只是示意。 解决方案 终端命令 rm -rf ~/Library/Developer/Xcode/DerivedData2.xcode视图内解决 先在顶部找到 Xcode --> Setting --> Lo…

操作系统入门系列-MIT6.828(操作系统工程)学习笔记(二)----课程实验环境搭建(wsl2+ubuntu+quem+xv6)

MIT6.S081(操作系统)学习笔记 操作系统入门系列-MIT6.828(操作系统)学习笔记(一)---- 操作系统介绍与接口示例 操作系统入门系列-MIT6.828(操作系统工程)学习笔记(二&am…

福昕PDF编辑器自定义快捷方式

你是否为用不惯福昕PDF编辑器自带的快捷键而发愁?今天,我和大家分享一下如何设置自己想要的快捷键方式,希望能对大家有帮助。 步骤一:打开福昕PDF编辑,并找到更多命令 步骤二:切换到键盘一栏,并…