kafka菜鸟教程

一、kafka原理

1、kafka是一个高性能消息队列系统,能够处理大规模的数据流,并提供低延迟的数据传输,它能够以每秒数十万条消息的速度进行读写操作。

二、kafka优点

1、服务解耦

(1)提高系统的可维护性

   通过服务解耦,可以将系统分解为独立的部分,当需要更新或修复某个服务时,可以独立地进行操作,而不会影响到其他服务的正常运作。这大大减少了维护工作的难度和所需时间。

‌(2)增强系统的可扩展性

      解耦后的系统更容易扩展。添加新功能或服务通常不会影响现有系统的其他部分,从而快速响应市场和用户的需求变化。

2、高吞吐量、低延迟

    kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

3、可扩展性

    集群支持热扩展(kafka-reassign-partitions.sh)分区重分配、迁移

4、持久性、可靠性


消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

5、容错性


允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

6、高并发


支持数千个客户端同时读写

三、主要概念


1. 主题 (Topic)


Kafka 中的消息以主题 (Topic) 为单位进行组织。每个主题代表一个消息流,消息生产者向主题发送消息,消息消费者从主题消费消息。

2. 分区 (Partition)


每个主题可以分为多个分区 (Partition),每个分区是一个有序、不可变的消息序列。分区的存在使得 Kafka 能够水平扩展,可以处理大量数据并提供高吞吐量。

3. 副本 (Replica)


为了保证数据的高可用性,Kafka 允许每个分区有多个副本 (Replica),这些副本存储在不同的服务器上。这样,即使某个服务器故障,数据仍然可用。

4. 生产者 (Producer)


生产者是向 Kafka 主题发送消息的客户端。生产者可以选择将消息发送到特定的分区,也可以让 Kafka 根据某种策略(如轮询)决定将消息发送到哪个分区。

5. 消费者 (Consumer)


消费者是从 Kafka 主题消费消息的客户端。消费者通常属于某个消费者组 (Consumer Group),一个消费者组中的多个消费者可以并行消费同一个主题的不同分区,提高消费速度和效率。

6. 经纪人 (Broker)


Kafka 集群由多个经纪人 (Broker) 组成,每个经纪人是一个 Kafka 实例。经纪人负责存储消息并处理消息的读写请求。

7. ZooKeeper


ZooKeeper 是一个分布式协调服务,Kafka 使用 ZooKeeper 来管理集群元数据,如主题、分区、经纪人等信息。

四、kafka安装教程

   1、 点此链接进入官网下载地址

   2、点击 图1红色方框DOWNLOAD KAFKA 

                            图1

3、点击图2选中的链接下载即可

图2

3、将kafka解压到服务器后修改配置项kafka_2.13-4.0.0\config\zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=true
admin.enable=true
audit.enable=true
# admin.serverPort=8080

4、修改配置项 kafka_2.13-4.0.0\config\server.properties     

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=1############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.11.22.122:9092# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/app/test/kafka/kafka_2.13-3.2.3/kafka-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=10.11.22.121:2181,10.11.22.122:2181,10.11.22.124:2181
zookeeper.connect=10.11.22.122:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=180000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

5、下载zookeeper

按照图3所示点击网站链接下载zookeeper

(1点击链接到zookeeper下载网址

           图3

解压到服务器进入到apache-zookeeper-3.9.3-bin\conf目录下新建文件zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/app/install-test/zk/zookeeper-3.4.6/zkdata
dataLogDir=/app/install-test/zk/zookeeper-3.4.6/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
audit.enable=true
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=trueserver.1=10.11.22.122:2888:2889
#server.2=10.11.22.123:2888:2889
#server.3=10.11.22.124:2888:2889

  6、修改各配置项后先启动zookeeper服务,进入到kafka_2.13-4.0.0文件夹启动命令如下

./bin/zookeeper-server-start.sh  -daemon ./config/zookeeper.properties 

启动成功后查看进程

ps -ef| grep zookeeper

启动成功后如图4所示

图4

7、切换到目录apache-zookeeper-3.9.3-bin\bin目录下输入命令启动zookeeper客户端

./zkCli.sh  -daemon

启动成功后如图5所示

图5

8、切换到kafka_2.13-4.0.0文件夹输入kafka启动命令

./bin/kafka-server-start.sh  -daemon ./config/server.properties 

启动成功后查看进程如图6所示

ps -ef| grep kafka

     图6 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/901964.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQLMap工具使用

一、SQLMap介绍 SQLMap 是一款强大的开源自动化 SQL 注入工具,用于检测和利用 Web 应用程序中的 SQL 注入漏洞。其工作原理是SQLMap 通过向目标 URL 发送带有特殊构造的 SQL 语句的请求,观察目标应用程序的响应,来判断是否存在 SQL 注入漏洞…

virtualbox安装xp系统卡顿的解决

安装virtualbox的增强功能即可。 先去下载 — Oracle VirtualBox下载 VirtualBox Guest Additions iso镜像 然后在这里导入iso镜像 再按照这几步操作 virtualbox按键 强制关闭xp-cuckoo的虚拟机 VBoxManage controlvm "xp-cuckoo" poweroff

观察者 ➜ 事件总线:一路走来的碎碎念

写给未来的自己:每次手敲事件模型都要 Google,干脆把思路和踩坑一次性记清楚。文章很长,都是唠叨,目的是让自己看两眼就能把设计理由找回来。 目录 为什么我要折腾事件模型?V0 ─ 单一事件的观察者模式V1 ─ 多事件同步总线(类型拆分)V2 ─ 订阅者优先级(链式调用可控)…

windwos脚本 | 基于scrcpy,只投声音、只投画面

安装scrcpy,scrcpy自带adb 写脚本命名为 .bat 结尾 注意这里的set "PATHD:\tools\scrcpy-win64-v3.2;%PATH%" 替换成scrcpy的安装目录 echo off :: 设置UTF-8编码 chcp 65001 > nul :: 设置标题 title 手机投屏工具:: 添加 scrcpy 路径到 PATH set &q…

Android device PCO (protocol configuration options) intro

术语 英文缩写英文全称中文PCOprotocol configuration options协议配置选项RILradio interface layer 无线电接口层PCO介绍 PCO(Protocol Configuration Options) 是 3GPP 标准协议(TS 24.008)中定义的核心概念,用于在 LTE/5G 网络建立 PDN 连接时传递动态配置参数(如 D…

Spring Boot配置文件优先级全解析:如何优雅覆盖默认配置?

📚 一、为什么需要了解配置文件优先级? 想象一下,你正在玩一个游戏🎮,游戏里有默认设置,但你可以通过不同的方式修改这些设置: 游戏内置的默认设置(就像Spring Boot的默认配置&…

汽车行驶工况特征参数:从“速度曲线”到“驾驶DNA”的硬核解码

作为新能源汽车行业的从业者,你是否曾困惑于这些问题: 为什么同一款电动车,不同用户的实际续航差异高达30%?如何精准量化驾驶行为对电池寿命的影响?车企标定的“NEDC续航”与真实路况差距的根源是什么? 这…

HTTP 2.0 协议特性详解

1. 使用二进制协议,简化传输的复杂性,提高了效率 2. 支持一个 TCP 链接发起多请求,移除 pipeline HTTP/2 移除了 HTTP/1.1中的管道化(pipeline)机制,转而采用多路复用(Multiplexing&#xff0…

完美解决浏览器不能复制的问题(比如赛氪网的中题库练习题)

仅供复制题库题目进行打印学习使用! 最近想把赛氪网题库中的题目打印出来做练习,发现题库中的题目不能复制,不能在试卷上勾画标记太难受了,而且不能留作材料以后复习,故出此策。 而且CtrlP打印出的pdf会缺少题目。(我…

std::set (C++)

std::set 1. 概述定义特点 2. 内部实现3. 性能特征4. 常用 API5. 使用示例6. 自定义比较器7. 注意事项与优化8. 使用建议 1. 概述 定义 template<class Key,class Compare std::less<Key>,class Allocator std::allocator<Key> > class std::set;特点 有…

SSM省市区三级联动和三表联查附带数据库

SSM省市区三级联动和三表联查 ------附带数据库码云地址&#xff1a;https://gitee.com/Mr_ZKC/NO1 数据库在项目中

曲棍球·棒球1号位

中国女子曲棍球队曾涌现过马弋博、李红侠等优秀选手&#xff0c;但“李红”这一名字可能为信息误差。以下为您系统介绍曲棍球&#xff0c;并结合棒球进行对比分析&#xff1a; 曲棍球&#xff08;Hockey&#xff09;核心特点 运动形式 分为草地曲棍球&#xff08;夏季奥运会项…

12芯束装光纤不同包层线颜色之间的排列顺序

为什么光纤线必须按照以下颜色顺序进行排序&#xff1f;这其实是为了防止光污染的问题&#xff0c;不同颜色在传递光时从包层表皮漏光传感到梳妆的其它纤芯上&#xff0c;会有光污染的问题&#xff0c;而为了减少并防止光污染的现象&#xff0c;所以在光通信之中&#xff0c;需…

c++程序的打包编译cmake+make

c打包编译 1 在不用系统中打包介绍1.1 linux中打包c程序的2种方式1.2 windows中打包c程序1.3 cmakeNinja和cmakemake的两种方式对比1.3.1 Ninja是什么&#xff08;可以认为是make工具的一个替代产品&#xff09;1.3.2 cmakeNinja可以用于linux和windows系统中&#xff0c;编译效…

Spark on K8s 在 vivo 大数据平台的混部实战与优化

一、Spark on K8s 简介 (一)定义与架构 Spark on K8s 是一种将 Spark 运行在 Kubernetes(K8s)集群上的架构,由 K8s 直接创建 Driver 和 Executor 的 Pod 来运行 Spark 作业。其架构如下。 Driver Pod:相当于 Spark 集群中的 Driver,负责作业的调度和管理,它会根据作业…

MDA测量数据查看器【内含工具和源码地址】

一、工具介绍 MDA测量数据查看器用于显示和分析以MDF格式提供的测量数据。 支持MDF3.3之前含MDF3.3的二进制格式&#xff0c;支持Vector CANape and ETAS Inca. Kvaser CAN Logger (MDF 3.2) 文件。 MDF (Measurement Data Format)是一种二进制文件&#xff0c;用来记录、交换…

番外篇 | SEAM-YOLO:引入SEAM系列注意力机制,提升遮挡小目标的检测性能

前言:Hello大家好,我是小哥谈。SEAM(Squeeze-and-Excitation Attention Module)系列注意力机制是一种高效的特征增强方法,特别适合处理遮挡和小目标检测问题。该机制通过建模通道间关系来自适应地重新校准通道特征响应。在遮挡小目标检测中的应用优势包括:1)通道注意力增强…

使用VHDL语言实现TXT文件的读写操作

使用FPGA进行图像处理时&#xff0c;通常需要将TXT文件中的图像数据读出到TestBench中&#xff0c;并将仿真的结果写入到TXT文件中&#xff0c;用于确认图像处理的结果是否正确。 VHDL中TXT文件的读写操作如下所示&#xff0c; --------------------------------------------…

基于Redis的4种延时队列实现方式

延时队列是一种特殊的消息队列&#xff0c;它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中&#xff0c;延时队列扮演着关键角色。例如&#xff0c;订单超时自动取消、定时提醒、延时支付等都依赖延时队列实现。 Redis作为高性能的内存数据库&#x…