Kafka 安装教程和基本操作

一、简介

Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等,Linkedin于2010年12月贡献给了 Apache基金会 并成为顶级开源项目。

应用特性

  • 分布式存储:数据被自动分区并分布在集群的节点中。
  • 消息有序性Kafka 能确保从生产者传到消费者的记录都是有序的。
  • 高容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  • 高吞吐量Kafka 支持单机每秒至少处理10万以上消息,通常可以达到数百万条消息。
  • 易扩展性:支持集群热扩展。
  • 高并发:支持数千个客户端同时读写。
  • 持久性:支持消息数据持久化到本地磁盘 并支持数据备份和灵活配置数据的持久化时间。
  • 实时处理/低延迟:在数据写入的同时对进行处理,消息延迟最低只有几毫秒。

应用场景

Kafka 本质是 支持分布式的消息系统/消息中间件 。分析 Kafka 的应用场景等同于分析 消息中级件 的应用场景。通常,使用 消息系统 的 发布/订阅模型 功能来连接 生产者消费者。实现以下三大功能:

  • 生产者和消费者的解耦
  • 消息持久化 / 消息冗余
  • 消息缓冲 / 流量消峰

具体应用场景有:

  • 日志收集或数据管道:作为日志收集系统或数据处理管道的一部分,以处理大量的日志数据或实时数据流。
  • 负载均衡:如果系统收到大量请求或数据流,可以使用消息队列把这些任务平均分配给多个处理器或服务,从而实现负载均衡。
  • 系统解耦:消息队列经常用作不同服务间的通信机制,以解耦系统的不同部分。
  • 分布式事务:如果一个事务需要跨多个服务进行,可以使用消息队列来协调不同服务之间的通信,确保事务的原子性。
  • 实时流数据处理:比如实时日志分析或者实时数据报警。Kafka 能接收实时数据流并保证它的可靠性和持久性,这样就可以在上游源源不断生产数据的同时,下游可以实时地进行分析。
  • 通知和实时更新:消息队列可以用作通知的中介,比如告知用户完成某个任务,或者在后端数据更新时实时通知前端。

设计目标

  • 高性能:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 消息系统:支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 横向扩展:支持在线水平热扩展

二、kafka安装和配置

1. zookeeper安装配置

需要说明一下, 为了支持 Kafka 的集群功能, Zookeeper 必须使用集群模式部署。
本文以部署 3 个Zookeeper 实例的伪集群为例。具体安装步骤参阅之前的文章:Zookeeper 安装教程和使用指南

2. kafka安装配置

下载链接:Kafka Downloads

下载页面中包含两种下载方式

  • : kafka-[version]-src.tgz:包含 Kafka 源码和API源码,需要自己编译

a) 安装

[root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
[root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
[root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka

b) 配置实例

配置第一个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9091端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9091
# 相当于listeners=PLAINTEXT://0.0.0.0:9091
# 消息日志存放地址
log.dirs=/usr/local/kafka/logs
# ZooKeeper 地址,多个用,分隔   /kafka指定在zk上的目录
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

配置第二个 Kafka 实例

# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9092端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9092
# 消息日志存放地址
log.dirs=/opt/kafka/logs
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka

注:两个客户端的listeners中的port不能一样

4) 服务管理

# 启动服务 -daemon 表示后台启动
$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties# 查看服务
jps -l43330 org.apache.zookeeper.server.quorum.QuorumPeerMain14356 org.elasticsearch.bootstrap.Elasticsearch14583 org.logstash.Logstash45976 kafka.Kafka  # kafka服务进程netstat -anlpt | grep 9091tcp6       0      0 :::9091                 :::*                    LISTEN      45976/javatcp6       0      0 192.168.18.128:9091     192.168.18.128:49356    TIME_WAIT   -# 关闭服务
$KAFKA_HOME/bin/kafka-server-stop.sh

3. 常用操作

1) 创建topic

 #两条命令效果一样
bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu 

在kafka1上创建一个topic,会自动同步到其他客户端

  • --create表示创建操作
  • --zookeeper 指定了 Kafka 连接的 ZooKeeper
  • --partitions 表示每个主题4个分区
  • --replication-factor 表示创建每个分区创建2个副本(副本因子)
  • --topic 表示主题名称。
    注:副本因子不能超过存活的broker数量,否则报错:Replication factor: 20 larger than available brokers: xxx.

2) 查看topic

# 查看topic列表     #两条命令效果一样
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka __consumer_offsetstopic-demoyumu# 查看topic详细信息   #两条命令效果一样
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu Topic: yumu	PartitionCount: 2	ReplicationFactor: 2	Configs:Topic: yumu	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2Topic: yumu	Partition: 1	Leader: 1	Replicas: 2,1	Isr: 1,2

3) 测试通信

# 窗口1,启动生产者,向yumu主题发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu# 窗口2,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu# 窗口3,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu=====结果=====
# 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
>hello, kafka!
>once again.
>
# 消费者1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
hello, kafka!
once again.# 消费者2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
hello, kafka!
once again.# 查看所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning# 删除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9091  --topic yumu

三、遇到的问题

1. 第一次启动kafka成功后,关闭kafka并修改配置,再次启动失败,报错如下:

[2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
[2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.at kafka.server.KafkaServer.startup(KafkaServer.scala:235)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)at kafka.Kafka$.main(Kafka.scala:82)at kafka.Kafka.main(Kafka.scala)
[2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
[2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
[2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
[2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)

原因:
kafka启动之后会生成一些日志和配置,导致这个问题的原因是第一次启动之后生成了log/meta.properties文件

cat meta.properties
#
#Sat Nov 07 21:43:51 CST 2020
broker.id=1
version=0
cluster.id=MChFWWMBT9GJClVEriND5A

第二次改完配置后再去启动的时候生成应该会生成一个新的id,新的id和旧的ID不一致导致无法启动,删除log/meta.properties文件后重新启动即可(疑问:是不是我关闭的方法不对呢?)

推荐阅读:

  • Kafka介绍
  • ELK介绍
  • Kafka安装
  • C语言操作kafka以及安装librdkafka库

下一篇:Kafka消息系统原理

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/16524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于YOLO算法实现网球运动实时分析(附源码)

大家好,我是小F~ 今天给大家介绍一个计算机视觉实战的项目。 该项目使用YOLO算法检测球员和网球,并利用cnn提取球场关键点。 进而分析视频中的网球运动员,测量他们的速度、击球速度和击球次数。 使用win10电脑,Python …

【源码】java + uniapp交易所源代码/带搭建教程java交易所/完整源代码

java uniapp交易所源代码/带搭建教程java交易所/完整源代码 带简洁教程,未测 java uniapp交易所源代码/带搭建教程java交易所/完整源代码 - 吾爱资源网

【古董技术】ms-dos应用程序的结构

序 制定一个MS-DOS应用程序计划需要认真分析程序的大小。这种分析可以帮助程序员确定MS-DOS支持的两种程序风格中哪一种最适合该应用程序。.EXE程序结构为大型程序提供了好处,因为所有.EXE文件之前都有额外的512字节(或更多)的文件头。另一方…

C++第十七弹---string使用(下)

✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】【C详解】 目录 1、标准库中的string类 1.1、string类的常用接口说明 1.1.1、string类对象的修改操作 1.1.2、string类对象非成员函数重载 总结 1、标准库中的…

牛客热题:有效括号

📟作者主页:慢热的陕西人 🌴专栏链接:力扣刷题日记 📣欢迎各位大佬👍点赞🔥关注🚓收藏,🍉留言 文章目录 牛客热题:有效括号题目链接方法一&#x…

MySQL视图教程(01):创建视图

MySQL 创建视图 在 MySQL 中, CREATE VIEW 语句用于创建一个数据库视图(View)。 MySQL 是一种常用的关系型数据库管理系统,提供了 CREATE VIEW 语法,用于创建视图(View)。视图是一种虚拟的表&…

Mycat+Mysql搭建数据集群实现数据分片存储

前言 MyCAT介绍 * 一个彻底开源的,面向企业应用开发的“大数据库集群”; * 支持事务、ACID、可以替代MySQL的加强版数据库; * 一个可以视为“MySQL”集群的企业级数据库,用来替代昂贵的Oracle集群; * 一个融合内存缓存技术、Nosql技术、HDFS大数据的新型SQL; * 一个新颖…

QCC---DFU升级变更设备名和地址

QCC---DFU升级变更设备名和地址 这个很多人碰到这个疑问,升级了改不了设备名和地址 /******************************************************************************* Copyright (c) 2018 Qualcomm Technologies International, Ltd. FILE NAME sink_dfu_ps.c DESCRIPT…

2024.5.1学习记录

1、代码随想录:贪心刷题 2、react 高级使用( hoc render、props、函数组件、serState 传送门等) 3、游山玩水

《拯救大学生课设不挂科第四期之蓝桥杯是什么?我是否要参加蓝桥杯?选择何种语言?如何科学备赛?方法思维教程》【官方笔记】

背景: 有些同学在大一或者大二可能会被老师建议参加蓝桥杯,本视频和文章主要是以一个过来人的身份来给与大家一些思路。 比如蓝桥杯是什么?我是否要参加蓝桥杯?参加蓝桥杯该选择何种语言?如何科学备赛?等…

2023年信息素养大赛小学组C++智能算法复赛试题解析

2023年信息素养大赛小学组C++智能算法复赛真题 智能算法挑战复赛小学组(总共4道题)T1. 判断数字出现了几次 【题目描述】 给定一个正整数 n,判断从 1 到这个数本身的所有数中,一共出现了多少次数字k。 【输入格式】 输入共1行,包括一个正整数n和一个正整数k。(0<n<…

JavaEE之线程(7)_单例模式(设计模式概念、单例模式优点、懒汉、饿汉模式)

一、什么是设计模式&#xff1f; 单例模式是设计模式中较为常见的一种。那么&#xff0c;什么是单例模式&#xff1f; 设计模式&#xff08;Design Pattern&#xff09;都是一些相对优秀的解决方案&#xff0c;很多问题都是典型的、有代表性的问题&#xff0c;学习设计模式&am…

C#面:如果出现ASP.NET中的事件不能触发可能由于什么原因造成

当 ASP.NET 中的事件不能触发时&#xff0c;可能由以下几个原因造成&#xff1a; 事件绑定错误&#xff1a;请确保事件正确地绑定到相应的控件上。在 ASP.NET 中&#xff0c;可以通过在前端代码或者后端代码中使用事件处理程序来绑定事件。如果事件没有正确地绑定到控件上&…

为什么本科毕业后我坚定地选择了就业而不是考研?

大家好&#xff0c;我是小布丁。今天来聊聊我为什么本科毕业后选择了就业而不是考研。 在整个大学期间&#xff0c;我被亲戚拷问最多的问题就是&#xff1a;准备考研吗&#xff1f;相信很多大学生都遇到过这种情况吧。 如果你说准备还好&#xff0c;亲戚大概率就不会问下去&a…

js计算字符串大小存储所占字节数

在JavaScript中&#xff0c;计算字符串所占的大小&#xff08;占用的字节数&#xff09;并不直接&#xff0c;但可以通过一些方法间接得到。 我们需要知道一个前提&#xff0c;英文字母 lenght 和字节数是一样的&#xff1a;都是1&#xff0c;而中文 lenght1&#xff0c;字节数…

golang sqlite主从数据同步插件开发

### golang sqlite主从数据同步插件开发思路 参考Mysql的主从同步机制&#xff0c;Mysql是产生binlog&#xff0c;然后把binlog日志同步到从服务上。 同理&#xff0c;我们按sql执行顺序记录所有的增删改查的sql语句&#xff0c;然后调用接口把sql语句传到从服务上执行。 数…

关于软件设计模式的理解

系列文章 关于时间复杂度o(1), o(n), o(logn), o(nlogn)的理解 关于HashMap的哈希碰撞、拉链法和key的哈希函数设计 关于JVM内存模型和堆内存模型的理解 关于代理模式的理解 关于Mysql基本概念的理解 关于软件设计模式的理解 文章目录 前言一、软件设计模式遵循的六大原则…

前端面试题日常练-day35 【面试题】

题目 希望这些选择题能够帮助您进行前端面试的准备&#xff0c;答案在文末。 1. 以下哪个是使用jQuery选择所有具有CSS类名"myClass"的元素的正确语法&#xff1f; a) $(".myClass") b) $("myClass") c) $("#myClass") d) $("…

FURNet问题

1. 为什么选择使用弱监督学习&#xff1f; 弱监督学习减少了对精确标注数据的依赖&#xff0c;这在医学图像处理中尤为重要&#xff0c;因为高质量标注数据通常需要大量专业知识和时间。弱监督学习通过利用少量标注数据或粗略标注数据来训练模型&#xff0c;降低了数据准备的成…

元组推导式

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 使用元组推导式可以快速生成一个元组&#xff0c;它的表现形式和列表推导式类似&#xff0c;只是将列表推导式中的“[]”修改为“()”。例如&#xf…