kafka(一)——简介

简介

Kafka 是一种分布式、支持分区、多副本的消息中间件,支持发布-订阅模式,多用于实时处理大量数据缓存的场景,类似于一个“缓存池”。

架构

在这里插入图片描述

  • Producer:消息生产者;
  • Consumer:消息消费者;
  • Broker:一台kafka服务器也称作一个broker,kafka集群包含多个broker;
  • Topic:一个topic为一个消息队列,生产者、消费者基于topic进行发布-订阅;
  • Partition:消息分区,一个topic可以分为多个partition,每个partition是一个消息队列;
  • Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader;
  • Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader;

安装部署

  1. 下载kafka安装包
# 下载地址,取最新包即可,最新版本为kafka_2.13-3.6.1.tgz
https://kafka.apache.org/downloads
  1. 解压
tar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafkacd kafka/
  1. 修改配置文件

    1)本次集群配置采用kraft模式,集群节点为192.168.1.12~14;

    2)在3个节点解压后,进入kafka/config/kraft目录,修改server.properties文件:

    • 192.168.1.12节点配置文件修改如下
    "node.id=1"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.12:9092"
    
    • 192.168.1.13节点配置文件修改如下
    "node.id=2"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.13:9092"
    
    • 192.168.1.14节点配置文件修改如下
    "node.id=3"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.14:9092"
    

    节点说明:

    # Broker节点
    Kafka集群中的数据节点(消息队列),它们负责接收客户端的消息和传递消息给客户端,默认情况下,每个Broker节点会监听9092端口,该端口用于与客户端进行通信,客户端可以将消息发送到这个端口,或者从这个端口接收消息,这个端口可以称作客户端通信端口。# Controller节点
    Kafka集群中的控制器节点,负责管理集群的状态和元数据,Controller节点监听的端口通常是9093,该端口用于集群中其他节点获取元数据或在混合节点选举新的Controller时进行通信,通过该端口,其他节点可以与Controller节点交互,获取集群的元数据信息或参与控制器的选举过程,这个端口可以称作控制器端口。# 混合节点
    同时担任Broker和Controller角色的节点,这两个端口都会被使用,默认情况下混合节点将监听9092端口接收和传递消息给客户端,并监听9093端口用于与其他节点进行元数据交换和控制器选举通信,可见混合节点会同时使用两个端口分别作为客户端通信端口与控制器端口。
    
  2. 生成集群id

# KRaft模式的kafka集群需要设定一个id,在任意节点执行如下命令:
sh /usr/local/incvs-kafka/bin/kafka-storage.sh random-uuid
  1. 格式化数据目录
# 在集群所有节点执行如下命令:
sh /home/kafka/bin/kafka-storage.sh format -t "步骤4)生成的id" -c /home/kafka/config/kraft/server.properties
  1. 启动Kakfa
# 在3个节点依次执行如下命令:
sh /home/kafka/bin/kafka-server-start.sh /home/kafka/config/kraft/server.properties

测试

  • 在192.168.1.13节点启动消费者脚本订阅“MykafkaTopic”
sh /home/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.13:9092 --topic MykafkaTopic
hello kafka
  • 在192.168.1.12节点启动生产者脚本往topic生产消息
sh /home/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.12:9092 --topic MykafkaTopic
>hello kafka
>

配置文件说明

server.properties配置示例:

############################# Server Basics ############################## The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
node.id=1# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093############################# Socket Server Settings ############################## The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
  • process.roles:服务器角色,包含broker和controller。如示例中使用kraft模式使用的混合模式,即“process.roles=broker,controller”;
  • node.id:节点ID,一个集群中的node.id不能重复;
  • controller.quorum.voters:控制选举的投票节点,格式为“node-id@host:port”;
  • listeners:服务器监听的地址,格式为“listener_name://host_name:port”;
  • inter.broker.listener.name:用于broker之间通信的监听器名称;
  • advertised.listeners:服务器向客户端宣告的监听器名称、主机名和端口;
  • controller.listener.names:控制器使用的监听器名称列表;
  • listener.security.protocol.map:监听器名称到安全协议的映射。默认情况下,它们是相同的;
  • num.network.threads:服务器用于从网络接收请求和向网络发送响应的线程数;
  • num.io.threads:服务器用于处理请求(可能包括磁盘 I/O)的线程数;
  • socket.send.buffer.bytes:服务器用于发送数据的缓冲区大小;
  • socket.receive.buffer.bytes:服务器用于接收数据的缓冲区大小;
  • socket.request.max.bytes:服务器接受的请求的最大大小(用于防止内存溢出);
  • log.dirs:用于存储日志文件的目录列表;(重要,需清楚日志存储的原理)
  • num.partitions:每个主题的默认日志分区数;
  • num.recovery.threads.per.data.dir:每个数据目录在启动时用于日志恢复和关闭时用于刷新的线程数;
  • offsets.topic.replication.factor:内部主题 “__consumer_offsets” 和 “__transaction_state” 的复制因子;
  • transaction.state.log.replication.factor:事务状态日志的复制因子;
  • transaction.state.log.min.isr:事务状态日志的最小同步副本数;
  • log.flush.interval.messages:强制将数据刷新到磁盘之前接受的消息数;
  • log.flush.interval.ms:消息在日志中停留的最大时间,超过这个时间就会强制刷新到磁盘;
  • log.retention.hours:由于年龄而使日志文件有资格被删除的最小年龄;
  • log.retention.bytes:基于大小的日志保留策略,默认1G;
  • log.segment.bytes:日志段文件的最大大小;
  • log.retention.check.interval.ms:检查日志段是否可以根据保留策略被删除的间隔;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/636196.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud之Nacos的学习、快速上手

1、什么是Nacos Nacos是阿里的一个开源产品,是针对微服务架构中的服务发现、配置管理、服务治理的综合型解决方案,用来实现配置中心和服务注册中心。 Nacos 快速开始 2、安装运行nacos nacos下载地址 下载地址: https://github.com/alibaba/nacos/rel…

【Linux】Linux系统的生态

Linux中安装软件 Linux中安装软件一般有三种方式: 源代码安装rpm包安装yum安装 1.源代码安装 有些软件本来就是开源的,如果不想用别人直接发布好的软件,我们就可以把源代码下载下来,在我们的环境中编译,自己安装 …

防伪技术行业研究:年复合增长率约为10%

近年来,我国各种新的防伪技术不断涌现,部分防伪技术已经达到国际先进水平,并广泛应用于产品防伪、票证防伪等领域,推动了防伪行业的持续、健康发展。 常见的产品防伪技术有:隐形分子技术、二维码防伪、揭开留底防伪、安…

【设计模式 创建型】单例模式

类的单例设计模式,就是采取一定的方法保证在整个的软件系统中,对某个类只能存在一个对象实例,并且该类只提供一个取得其对象实例的方法(静态方法) 指一个类只有一个实例,且该类能自行创建这个实例的一种模…

「Kafka」Broker篇

「Kafka」Broker篇 主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。 Kafka Broker 总体工作流程 Zookeeper 存储的 Kafka 信息 启动 Zookeeper 客户端: [atguiguhadoop102 zookeeper-3.5.7]$ bin/zkCli.sh通…

时间复杂度的排序

在计算机科学中,不同的算法有不同的时间复杂度。以下是一些常见的时间复杂度,并按照它们的增长速度从低到高排序: O(1) - 常数时间复杂度: 表示算法的执行时间是固定的,不随输入规模的增加而变化。例如,直接…

使用 Docker 部署 的WAF: 雷池社区版

Web应用防火墙(WAF)是保护网站不受恶意攻击的关键组件。 使用 Docker 部署雷池社区版,可以大大简化安全管理工作。 一、WAF 雷池社区版简介 雷池社区版是一种流行的开源 Web 应用防火墙,它提供基本的安全保护,如防止…

多维表格产品vika多维表、Flowus、Wolai体验记录

昨天从下午6点肝到凌晨2点多体验低代码平台多维表格产品,体验了3个国内产品,vika多维表、Flowus、Wolai。 具有多维表格新型关系数据库的鼻祖是 Airtable,国内模仿产品有vika多维表、飞书多维表格等。 还有一种类型就是以在国内鼎鼎大名的N…

细讲Labview条件结构用法及易错点

本文讲解Labview条件结构的常用情景及易错点注意事项。帮助大家深刻理解并使用该结构,欢迎点赞关注加评论,有问题可以私聊或在下方评论区留言。 本文程序均附在文章结尾,可自行下载学习。 博主之前讲过Labview事件结构、For循环等的基础知识介…

每日OJ题_算法_滑动窗口⑥_力扣438. 找到字符串中所有字母异位词

目录 力扣438. 找到字符串中所有字母异位词 解析及代码1 解析及代码2 力扣438. 找到字符串中所有字母异位词 438. 找到字符串中所有字母异位词 - 力扣(LeetCode) 难度 中等 给定两个字符串 s 和 p,找到 s 中所有 p 的 异位词 的子串&am…

Mac Could not find the GDAL library 问题解决

目录 1. 参考2. 问题描述3. 解决步骤3.1. 安装依赖包3.2. 配置 GDAL_LIBRARY_PATH3.3. 配置 GEOS_LIBRARY_PATH 1. 参考 https://docs.djangoproject.com/en/5.0/ref/contrib/gis/install/#macos 2. 问题描述 苹果系统 Mac 中搭建 GeoDjango 开发环境时出现以下报错&#xf…

第十四章 MyBatis

第十四章 MyBatis 1.入门-课程介绍2.入门-快速入门程序3.配置SQL提示4.入门-JDBC5.入门-数据库连接池6.入门-lombok工具包介绍7.基础操作-环境准备8.基础操作-删除9.基础操作-删除(预编译SQL)10.基础操作-新增11.基础操作-新增(主键返回&…

Python初识——小小爬虫

一、找到网页端url 打开浏览器,打开百度官方网页点击图片,打开百度图片 鼠标齿轮向下滑,点击宠物图片 进入宠物图片网页,在网页空白处点击鼠标右键,弹出的框中最下方显示“检查”选项,点击(我是…

安全帽识别-赋能深圳自贸中心智慧工地

在当今的建筑行业中,安全管理一直是一个至关重要的议题。深圳自贸中心项目在这方面进行了一次有益的尝试——实施智慧工地安全帽识别系统。本文将对这一创新举措进行简要介绍。 项目背景 深圳自贸中心,作为一项标志性建设项目,承载着城市发展…

云的网络安全优势

在考虑迁移到云计算时,网络安全已经成为一个关键因素。毫无疑问,企业希望通过网络浏览器或移动应用为员工、合作伙伴和客户提供一致的数据和应用访问权限,以保持竞争力。 网络攻击的性质和重要性正变得越来越复杂,并造成严重的财…

【ASP.NET Core 基础知识】--路由和请求处理--Attribute路由

一、介绍 在ASP.NET Core中,路由是将传入的URL请求映射到正确的控制器和操作的方法。Attribute路由是一种基于属性,用于定义路由规则的方式,通过在控制器类和操作方法上应用特定的属性,来定义URL模板。 基本概念: **路…

线性代数逆矩阵的求法

在线性代数中,逆矩阵是一个非常重要且有趣的概念。一个 n 阶方阵 A 的逆矩阵,记作 A^-1,是指存在另一个 n 阶方阵 B,使得 A 和 B 的乘积等于单位矩阵 E,即: A * B E 或者等价地: B * A E 这里…

uniapp技术积累

2024.01.19 1.textarea (1)默认文字样式设置 placeholder-style"color:rgba(0,0,0,0.7)" 2024.01.18 1.hbuilderx运行uniapp到ipad(复用率低) (1)电脑端:首先会有一个运行工具包&…

只用Mysql搞一个分布式锁

在web开发中,分布式的锁的应用场景甚多,我们可以通过分布式锁来进行一些仅依赖于数据库的事务很难直接保证原子性的操作,比如多种不同数据源的访问,网络通信等等。多数情况下我们会使用memcache的add, redis中在set中指定nx参数等…

怎么用小程序将身份证转为结构化Excel?

随着科技的不断发展,我们的生活变得越来越智能化。现在,我们可以使用金鸣表格文字识别小程序来识别身份证并转为结构化的excel,并且可自动核对真伪,保留头像。金鸣表格文字识别小程序是一种基于人工智能技术的应用程序&#xff0c…