kafka(一)——简介

简介

Kafka 是一种分布式、支持分区、多副本的消息中间件,支持发布-订阅模式,多用于实时处理大量数据缓存的场景,类似于一个“缓存池”。

架构

在这里插入图片描述

  • Producer:消息生产者;
  • Consumer:消息消费者;
  • Broker:一台kafka服务器也称作一个broker,kafka集群包含多个broker;
  • Topic:一个topic为一个消息队列,生产者、消费者基于topic进行发布-订阅;
  • Partition:消息分区,一个topic可以分为多个partition,每个partition是一个消息队列;
  • Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader;
  • Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader;

安装部署

  1. 下载kafka安装包
# 下载地址,取最新包即可,最新版本为kafka_2.13-3.6.1.tgz
https://kafka.apache.org/downloads
  1. 解压
tar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafkacd kafka/
  1. 修改配置文件

    1)本次集群配置采用kraft模式,集群节点为192.168.1.12~14;

    2)在3个节点解压后,进入kafka/config/kraft目录,修改server.properties文件:

    • 192.168.1.12节点配置文件修改如下
    "node.id=1"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.12:9092"
    
    • 192.168.1.13节点配置文件修改如下
    "node.id=2"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.13:9092"
    
    • 192.168.1.14节点配置文件修改如下
    "node.id=3"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.14:9092"
    

    节点说明:

    # Broker节点
    Kafka集群中的数据节点(消息队列),它们负责接收客户端的消息和传递消息给客户端,默认情况下,每个Broker节点会监听9092端口,该端口用于与客户端进行通信,客户端可以将消息发送到这个端口,或者从这个端口接收消息,这个端口可以称作客户端通信端口。# Controller节点
    Kafka集群中的控制器节点,负责管理集群的状态和元数据,Controller节点监听的端口通常是9093,该端口用于集群中其他节点获取元数据或在混合节点选举新的Controller时进行通信,通过该端口,其他节点可以与Controller节点交互,获取集群的元数据信息或参与控制器的选举过程,这个端口可以称作控制器端口。# 混合节点
    同时担任Broker和Controller角色的节点,这两个端口都会被使用,默认情况下混合节点将监听9092端口接收和传递消息给客户端,并监听9093端口用于与其他节点进行元数据交换和控制器选举通信,可见混合节点会同时使用两个端口分别作为客户端通信端口与控制器端口。
    
  2. 生成集群id

# KRaft模式的kafka集群需要设定一个id,在任意节点执行如下命令:
sh /usr/local/incvs-kafka/bin/kafka-storage.sh random-uuid
  1. 格式化数据目录
# 在集群所有节点执行如下命令:
sh /home/kafka/bin/kafka-storage.sh format -t "步骤4)生成的id" -c /home/kafka/config/kraft/server.properties
  1. 启动Kakfa
# 在3个节点依次执行如下命令:
sh /home/kafka/bin/kafka-server-start.sh /home/kafka/config/kraft/server.properties

测试

  • 在192.168.1.13节点启动消费者脚本订阅“MykafkaTopic”
sh /home/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.13:9092 --topic MykafkaTopic
hello kafka
  • 在192.168.1.12节点启动生产者脚本往topic生产消息
sh /home/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.12:9092 --topic MykafkaTopic
>hello kafka
>

配置文件说明

server.properties配置示例:

############################# Server Basics ############################## The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
node.id=1# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093############################# Socket Server Settings ############################## The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
  • process.roles:服务器角色,包含broker和controller。如示例中使用kraft模式使用的混合模式,即“process.roles=broker,controller”;
  • node.id:节点ID,一个集群中的node.id不能重复;
  • controller.quorum.voters:控制选举的投票节点,格式为“node-id@host:port”;
  • listeners:服务器监听的地址,格式为“listener_name://host_name:port”;
  • inter.broker.listener.name:用于broker之间通信的监听器名称;
  • advertised.listeners:服务器向客户端宣告的监听器名称、主机名和端口;
  • controller.listener.names:控制器使用的监听器名称列表;
  • listener.security.protocol.map:监听器名称到安全协议的映射。默认情况下,它们是相同的;
  • num.network.threads:服务器用于从网络接收请求和向网络发送响应的线程数;
  • num.io.threads:服务器用于处理请求(可能包括磁盘 I/O)的线程数;
  • socket.send.buffer.bytes:服务器用于发送数据的缓冲区大小;
  • socket.receive.buffer.bytes:服务器用于接收数据的缓冲区大小;
  • socket.request.max.bytes:服务器接受的请求的最大大小(用于防止内存溢出);
  • log.dirs:用于存储日志文件的目录列表;(重要,需清楚日志存储的原理)
  • num.partitions:每个主题的默认日志分区数;
  • num.recovery.threads.per.data.dir:每个数据目录在启动时用于日志恢复和关闭时用于刷新的线程数;
  • offsets.topic.replication.factor:内部主题 “__consumer_offsets” 和 “__transaction_state” 的复制因子;
  • transaction.state.log.replication.factor:事务状态日志的复制因子;
  • transaction.state.log.min.isr:事务状态日志的最小同步副本数;
  • log.flush.interval.messages:强制将数据刷新到磁盘之前接受的消息数;
  • log.flush.interval.ms:消息在日志中停留的最大时间,超过这个时间就会强制刷新到磁盘;
  • log.retention.hours:由于年龄而使日志文件有资格被删除的最小年龄;
  • log.retention.bytes:基于大小的日志保留策略,默认1G;
  • log.segment.bytes:日志段文件的最大大小;
  • log.retention.check.interval.ms:检查日志段是否可以根据保留策略被删除的间隔;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/636196.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud之Nacos的学习、快速上手

1、什么是Nacos Nacos是阿里的一个开源产品,是针对微服务架构中的服务发现、配置管理、服务治理的综合型解决方案,用来实现配置中心和服务注册中心。 Nacos 快速开始 2、安装运行nacos nacos下载地址 下载地址: https://github.com/alibaba/nacos/rel…

【Linux】Linux系统的生态

Linux中安装软件 Linux中安装软件一般有三种方式: 源代码安装rpm包安装yum安装 1.源代码安装 有些软件本来就是开源的,如果不想用别人直接发布好的软件,我们就可以把源代码下载下来,在我们的环境中编译,自己安装 …

防伪技术行业研究:年复合增长率约为10%

近年来,我国各种新的防伪技术不断涌现,部分防伪技术已经达到国际先进水平,并广泛应用于产品防伪、票证防伪等领域,推动了防伪行业的持续、健康发展。 常见的产品防伪技术有:隐形分子技术、二维码防伪、揭开留底防伪、安…

「Kafka」Broker篇

「Kafka」Broker篇 主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。 Kafka Broker 总体工作流程 Zookeeper 存储的 Kafka 信息 启动 Zookeeper 客户端: [atguiguhadoop102 zookeeper-3.5.7]$ bin/zkCli.sh通…

使用 Docker 部署 的WAF: 雷池社区版

Web应用防火墙(WAF)是保护网站不受恶意攻击的关键组件。 使用 Docker 部署雷池社区版,可以大大简化安全管理工作。 一、WAF 雷池社区版简介 雷池社区版是一种流行的开源 Web 应用防火墙,它提供基本的安全保护,如防止…

多维表格产品vika多维表、Flowus、Wolai体验记录

昨天从下午6点肝到凌晨2点多体验低代码平台多维表格产品,体验了3个国内产品,vika多维表、Flowus、Wolai。 具有多维表格新型关系数据库的鼻祖是 Airtable,国内模仿产品有vika多维表、飞书多维表格等。 还有一种类型就是以在国内鼎鼎大名的N…

细讲Labview条件结构用法及易错点

本文讲解Labview条件结构的常用情景及易错点注意事项。帮助大家深刻理解并使用该结构,欢迎点赞关注加评论,有问题可以私聊或在下方评论区留言。 本文程序均附在文章结尾,可自行下载学习。 博主之前讲过Labview事件结构、For循环等的基础知识介…

第十四章 MyBatis

第十四章 MyBatis 1.入门-课程介绍2.入门-快速入门程序3.配置SQL提示4.入门-JDBC5.入门-数据库连接池6.入门-lombok工具包介绍7.基础操作-环境准备8.基础操作-删除9.基础操作-删除(预编译SQL)10.基础操作-新增11.基础操作-新增(主键返回&…

Python初识——小小爬虫

一、找到网页端url 打开浏览器,打开百度官方网页点击图片,打开百度图片 鼠标齿轮向下滑,点击宠物图片 进入宠物图片网页,在网页空白处点击鼠标右键,弹出的框中最下方显示“检查”选项,点击(我是…

安全帽识别-赋能深圳自贸中心智慧工地

在当今的建筑行业中,安全管理一直是一个至关重要的议题。深圳自贸中心项目在这方面进行了一次有益的尝试——实施智慧工地安全帽识别系统。本文将对这一创新举措进行简要介绍。 项目背景 深圳自贸中心,作为一项标志性建设项目,承载着城市发展…

怎么用小程序将身份证转为结构化Excel?

随着科技的不断发展,我们的生活变得越来越智能化。现在,我们可以使用金鸣表格文字识别小程序来识别身份证并转为结构化的excel,并且可自动核对真伪,保留头像。金鸣表格文字识别小程序是一种基于人工智能技术的应用程序&#xff0c…

将字符串中的制表符替换为指定数量的空格expandtabs()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 将字符串中的制表符 替换为指定数量的空格 expandtabs() [太阳]选择题 请问以下代码text3的空白处是? text1 "这里有一个\t制表符" text2 "这里有一个12345制表符…

第二次作业+第三次作业

第二次作业第三次作业 第二次作业 题目: 网站需求: ​ 1.基于域名[www.openlab.com](http://www.openlab.com)可以访问网站内容为 welcome to openlab!!! 2.给该公司创建三个子界面分别显示学生信息,教学资料和缴费网站,基于[ww…

利用HTML+CSS+JS打造炫酷时钟网页的完整指南

引言 在现代Web开发中,制作一个引人注目的时钟网页是一种常见而令人愉悦的体验。本文将介绍如何使用HTML、CSS和JavaScript来创建一个炫酷的时钟网页,通过这个项目,你将学到如何结合这三种前端技术,制作一个动态且美观的时钟效果…

深入探索 Android 中的 Runtime

深入探索 Android 中的 Runtime 一、什么是 Runtime二、Android 中的 Runtime 类型2.1. Dalvik Runtime2.2. ART(Android Runtime) 三、Runtime 的作用和特点3.1. 应用程序执行环境3.2. 跨平台支持3.3. 性能优化3.4. 应用程序优化 四、与应用开发相关的重…

Unity3D Pico VR 手势识别物体交互 适配 MRTK3

当前Pico已经支持手势识别了,但是提供的PICO Unity Integration SDK 中是没有手势和物体交互的功能,Unity XR Interaction Toolkit提供的手势识别物体交互对 Quest适配的挺好的,Pico 当前只能用指尖点触还不能对物体进行抓握以及手势控制射线…

JS-WebAPIs-其他事件(三)

• 页面加载事件 页面加载事件主要有二种事件,分别是load和DOMContentLoaded 加载外部资源(如图片、外联CSS和JavaScript等)加载完毕时触发的事件为什么要学? 有些时候需要等页面资源全部处理完了做一些事情老代码喜欢把 scrip…

Hadoop详解

Hadoop 概念 就是一个大数据解决方案。它提供了一套分布式系统基础架构。 核心内容包含 hdfs 和mapreduce。hadoop2.0 以后引入 yarn. hdfs 是提供数据存储的,mapreduce 是方便数据计算的。 hdfs 又对应 namenode 和 datanode. namenode 负责保存元数据的基本信息…

YZ系列工具之YZ04:文本批量替换使用说明文档

我给VBA下的定义:VBA是个人小型自动化处理的有效工具。利用好了,可以大大提高自己的工作效率,而且可以提高数据的准确度。我的教程一共九套一部VBA手册,教程分为初级、中级、高级三大部分。是对VBA的系统讲解,从简单的…

解锁新身份:无忧秘书智脑-AI智能直播的10宫格姓氏头像制作秘籍

在这个信息爆炸的时代,一个独特的标识是个人或品牌在众多竞争者中脱颖而出的关键。而头像作为我们日常在线身份的一部分,更是我们展示个性和风格的重要窗口。无忧秘书智脑-AI智能直播最新推出的专属姓氏10宫格头像功能(ai6ai69),为…